Skip to content

#2109 schema infer max #2159

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 3 commits into from
Apr 5, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 3 additions & 6 deletions datafusion/fuzz-utils/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ pub use env_logger;
pub fn batches_to_vec(batches: &[RecordBatch]) -> Vec<Option<i32>> {
batches
.iter()
.map(|batch| {
.flat_map(|batch| {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

assert_eq!(batch.num_columns(), 1);
batch
.column(0)
Expand All @@ -35,16 +35,14 @@ pub fn batches_to_vec(batches: &[RecordBatch]) -> Vec<Option<i32>> {
.unwrap()
.iter()
})
.flatten()
.collect()
}

/// extract values from batches and sort them
pub fn partitions_to_sorted_vec(partitions: &[Vec<RecordBatch>]) -> Vec<Option<i32>> {
let mut values: Vec<_> = partitions
.iter()
.map(|batches| batches_to_vec(batches).into_iter())
.flatten()
.flat_map(|batches| batches_to_vec(batches).into_iter())
.collect();

values.sort_unstable();
Expand All @@ -60,14 +58,13 @@ pub fn add_empty_batches(

batches
.into_iter()
.map(|batch| {
.flat_map(|batch| {
// insert 0, or 1 empty batches before and after the current batch
let empty_batch = RecordBatch::new_empty(schema.clone());
std::iter::repeat(empty_batch.clone())
.take(rng.gen_range(0..2))
.chain(std::iter::once(batch))
.chain(std::iter::repeat(empty_batch).take(rng.gen_range(0..2)))
})
.flatten()
.collect()
}
5 changes: 3 additions & 2 deletions datafusion/src/datasource/file_format/csv.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ use async_trait::async_trait;
use futures::StreamExt;

use super::FileFormat;
use crate::datasource::file_format::DEFAULT_SCHEMA_INFER_MAX_RECORD;
use crate::datasource::object_store::{ObjectReader, ObjectReaderStream};
use crate::error::Result;
use crate::logical_plan::Expr;
Expand All @@ -46,7 +47,7 @@ pub struct CsvFormat {
impl Default for CsvFormat {
fn default() -> Self {
Self {
schema_infer_max_rec: None,
schema_infer_max_rec: Some(DEFAULT_SCHEMA_INFER_MAX_RECORD),
has_header: true,
delimiter: b',',
}
Expand All @@ -55,7 +56,7 @@ impl Default for CsvFormat {

impl CsvFormat {
/// Set a limit in terms of records to scan to infer the schema
/// - default to `None` (no limit)
/// - default to `DEFAULT_SCHEMA_INFER_MAX_RECORD`
pub fn with_schema_infer_max_rec(mut self, max_rec: Option<usize>) -> Self {
self.schema_infer_max_rec = max_rec;
self
Expand Down
13 changes: 11 additions & 2 deletions datafusion/src/datasource/file_format/json.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ use futures::StreamExt;

use super::FileFormat;
use super::FileScanConfig;
use crate::datasource::file_format::DEFAULT_SCHEMA_INFER_MAX_RECORD;
use crate::datasource::object_store::{ObjectReader, ObjectReaderStream};
use crate::error::Result;
use crate::logical_plan::Expr;
Expand All @@ -40,14 +41,22 @@ use crate::physical_plan::Statistics;
/// The default file extension of json files
pub const DEFAULT_JSON_EXTENSION: &str = ".json";
/// New line delimited JSON `FileFormat` implementation.
#[derive(Debug, Default)]
#[derive(Debug)]
pub struct JsonFormat {
schema_infer_max_rec: Option<usize>,
}

impl Default for JsonFormat {
fn default() -> Self {
Self {
schema_infer_max_rec: Some(DEFAULT_SCHEMA_INFER_MAX_RECORD),
}
}
}

impl JsonFormat {
/// Set a limit in terms of records to scan to infer the schema
/// - defaults to `None` (no limit)
/// - defaults to `DEFAULT_SCHEMA_INFER_MAX_RECORD`
pub fn with_schema_infer_max_rec(mut self, max_rec: Option<usize>) -> Self {
self.schema_infer_max_rec = max_rec;
self
Expand Down
3 changes: 3 additions & 0 deletions datafusion/src/datasource/file_format/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,9 @@

//! Module containing helper methods for the various file formats

/// default max records to scan to infer the schema
pub const DEFAULT_SCHEMA_INFER_MAX_RECORD: usize = 1000;

pub mod avro;
pub mod csv;
pub mod json;
Expand Down
9 changes: 5 additions & 4 deletions datafusion/src/execution/options.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ use std::sync::Arc;
use arrow::datatypes::{Schema, SchemaRef};

use crate::datasource::file_format::json::DEFAULT_JSON_EXTENSION;
use crate::datasource::file_format::DEFAULT_SCHEMA_INFER_MAX_RECORD;
use crate::datasource::{
file_format::{avro::AvroFormat, csv::CsvFormat},
listing::ListingOptions,
Expand All @@ -40,7 +41,7 @@ pub struct CsvReadOptions<'a> {
/// An optional schema representing the CSV files. If None, CSV reader will try to infer it
/// based on data in file.
pub schema: Option<&'a Schema>,
/// Max number of rows to read from CSV files for schema inference if needed. Defaults to 1000.
/// Max number of rows to read from CSV files for schema inference if needed. Defaults to `DEFAULT_SCHEMA_INFER_MAX_RECORD`.
pub schema_infer_max_records: usize,
/// File extension; only files with this extension are selected for data input.
/// Defaults to ".csv".
Expand All @@ -59,7 +60,7 @@ impl<'a> CsvReadOptions<'a> {
Self {
has_header: true,
schema: None,
schema_infer_max_records: 1000,
schema_infer_max_records: DEFAULT_SCHEMA_INFER_MAX_RECORD,
delimiter: b',',
file_extension: ".csv",
}
Expand Down Expand Up @@ -161,7 +162,7 @@ pub struct NdJsonReadOptions<'a> {
/// The data source schema.
pub schema: Option<SchemaRef>,

/// Max number of rows to read from CSV files for schema inference if needed. Defaults to 1000.
/// Max number of rows to read from JSON files for schema inference if needed. Defaults to `DEFAULT_SCHEMA_INFER_MAX_RECORD`.
pub schema_infer_max_records: usize,

/// File extension; only files with this extension are selected for data input.
Expand All @@ -173,7 +174,7 @@ impl<'a> Default for NdJsonReadOptions<'a> {
fn default() -> Self {
Self {
schema: None,
schema_infer_max_records: 1000,
schema_infer_max_records: DEFAULT_SCHEMA_INFER_MAX_RECORD,
file_extension: DEFAULT_JSON_EXTENSION,
}
}
Expand Down
2 changes: 1 addition & 1 deletion datafusion/tests/sql/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -883,7 +883,7 @@ async fn nyc() -> Result<()> {
},
_ => unreachable!(),
},
_ => unreachable!(false),
_ => unreachable!("{}", false),
}

Ok(())
Expand Down