Skip to content

Commit e7a72b4

Browse files
authored
#2109 schema infer max (#2159)
* set default schema infer max record * fix unrelated issue "error: format argument must be a string literal" during `cargo test` * fix clippy same as https://github.com/apache/arrow-datafusion/pull/1885/files which already in master
1 parent ca765d5 commit e7a72b4

File tree

6 files changed

+26
-15
lines changed

6 files changed

+26
-15
lines changed

datafusion/fuzz-utils/src/lib.rs

Lines changed: 3 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,7 @@ pub use env_logger;
2626
pub fn batches_to_vec(batches: &[RecordBatch]) -> Vec<Option<i32>> {
2727
batches
2828
.iter()
29-
.map(|batch| {
29+
.flat_map(|batch| {
3030
assert_eq!(batch.num_columns(), 1);
3131
batch
3232
.column(0)
@@ -35,16 +35,14 @@ pub fn batches_to_vec(batches: &[RecordBatch]) -> Vec<Option<i32>> {
3535
.unwrap()
3636
.iter()
3737
})
38-
.flatten()
3938
.collect()
4039
}
4140

4241
/// extract values from batches and sort them
4342
pub fn partitions_to_sorted_vec(partitions: &[Vec<RecordBatch>]) -> Vec<Option<i32>> {
4443
let mut values: Vec<_> = partitions
4544
.iter()
46-
.map(|batches| batches_to_vec(batches).into_iter())
47-
.flatten()
45+
.flat_map(|batches| batches_to_vec(batches).into_iter())
4846
.collect();
4947

5048
values.sort_unstable();
@@ -60,14 +58,13 @@ pub fn add_empty_batches(
6058

6159
batches
6260
.into_iter()
63-
.map(|batch| {
61+
.flat_map(|batch| {
6462
// insert 0, or 1 empty batches before and after the current batch
6563
let empty_batch = RecordBatch::new_empty(schema.clone());
6664
std::iter::repeat(empty_batch.clone())
6765
.take(rng.gen_range(0..2))
6866
.chain(std::iter::once(batch))
6967
.chain(std::iter::repeat(empty_batch).take(rng.gen_range(0..2)))
7068
})
71-
.flatten()
7269
.collect()
7370
}

datafusion/src/datasource/file_format/csv.rs

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@ use async_trait::async_trait;
2626
use futures::StreamExt;
2727

2828
use super::FileFormat;
29+
use crate::datasource::file_format::DEFAULT_SCHEMA_INFER_MAX_RECORD;
2930
use crate::datasource::object_store::{ObjectReader, ObjectReaderStream};
3031
use crate::error::Result;
3132
use crate::logical_plan::Expr;
@@ -46,7 +47,7 @@ pub struct CsvFormat {
4647
impl Default for CsvFormat {
4748
fn default() -> Self {
4849
Self {
49-
schema_infer_max_rec: None,
50+
schema_infer_max_rec: Some(DEFAULT_SCHEMA_INFER_MAX_RECORD),
5051
has_header: true,
5152
delimiter: b',',
5253
}
@@ -55,7 +56,7 @@ impl Default for CsvFormat {
5556

5657
impl CsvFormat {
5758
/// Set a limit in terms of records to scan to infer the schema
58-
/// - default to `None` (no limit)
59+
/// - default to `DEFAULT_SCHEMA_INFER_MAX_RECORD`
5960
pub fn with_schema_infer_max_rec(mut self, max_rec: Option<usize>) -> Self {
6061
self.schema_infer_max_rec = max_rec;
6162
self

datafusion/src/datasource/file_format/json.rs

Lines changed: 11 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@ use futures::StreamExt;
3030

3131
use super::FileFormat;
3232
use super::FileScanConfig;
33+
use crate::datasource::file_format::DEFAULT_SCHEMA_INFER_MAX_RECORD;
3334
use crate::datasource::object_store::{ObjectReader, ObjectReaderStream};
3435
use crate::error::Result;
3536
use crate::logical_plan::Expr;
@@ -40,14 +41,22 @@ use crate::physical_plan::Statistics;
4041
/// The default file extension of json files
4142
pub const DEFAULT_JSON_EXTENSION: &str = ".json";
4243
/// New line delimited JSON `FileFormat` implementation.
43-
#[derive(Debug, Default)]
44+
#[derive(Debug)]
4445
pub struct JsonFormat {
4546
schema_infer_max_rec: Option<usize>,
4647
}
4748

49+
impl Default for JsonFormat {
50+
fn default() -> Self {
51+
Self {
52+
schema_infer_max_rec: Some(DEFAULT_SCHEMA_INFER_MAX_RECORD),
53+
}
54+
}
55+
}
56+
4857
impl JsonFormat {
4958
/// Set a limit in terms of records to scan to infer the schema
50-
/// - defaults to `None` (no limit)
59+
/// - defaults to `DEFAULT_SCHEMA_INFER_MAX_RECORD`
5160
pub fn with_schema_infer_max_rec(mut self, max_rec: Option<usize>) -> Self {
5261
self.schema_infer_max_rec = max_rec;
5362
self

datafusion/src/datasource/file_format/mod.rs

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,9 @@
1717

1818
//! Module containing helper methods for the various file formats
1919
20+
/// default max records to scan to infer the schema
21+
pub const DEFAULT_SCHEMA_INFER_MAX_RECORD: usize = 1000;
22+
2023
pub mod avro;
2124
pub mod csv;
2225
pub mod json;

datafusion/src/execution/options.rs

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@ use std::sync::Arc;
2222
use arrow::datatypes::{Schema, SchemaRef};
2323

2424
use crate::datasource::file_format::json::DEFAULT_JSON_EXTENSION;
25+
use crate::datasource::file_format::DEFAULT_SCHEMA_INFER_MAX_RECORD;
2526
use crate::datasource::{
2627
file_format::{avro::AvroFormat, csv::CsvFormat},
2728
listing::ListingOptions,
@@ -40,7 +41,7 @@ pub struct CsvReadOptions<'a> {
4041
/// An optional schema representing the CSV files. If None, CSV reader will try to infer it
4142
/// based on data in file.
4243
pub schema: Option<&'a Schema>,
43-
/// Max number of rows to read from CSV files for schema inference if needed. Defaults to 1000.
44+
/// Max number of rows to read from CSV files for schema inference if needed. Defaults to `DEFAULT_SCHEMA_INFER_MAX_RECORD`.
4445
pub schema_infer_max_records: usize,
4546
/// File extension; only files with this extension are selected for data input.
4647
/// Defaults to ".csv".
@@ -59,7 +60,7 @@ impl<'a> CsvReadOptions<'a> {
5960
Self {
6061
has_header: true,
6162
schema: None,
62-
schema_infer_max_records: 1000,
63+
schema_infer_max_records: DEFAULT_SCHEMA_INFER_MAX_RECORD,
6364
delimiter: b',',
6465
file_extension: ".csv",
6566
}
@@ -161,7 +162,7 @@ pub struct NdJsonReadOptions<'a> {
161162
/// The data source schema.
162163
pub schema: Option<SchemaRef>,
163164

164-
/// Max number of rows to read from CSV files for schema inference if needed. Defaults to 1000.
165+
/// Max number of rows to read from JSON files for schema inference if needed. Defaults to `DEFAULT_SCHEMA_INFER_MAX_RECORD`.
165166
pub schema_infer_max_records: usize,
166167

167168
/// File extension; only files with this extension are selected for data input.
@@ -173,7 +174,7 @@ impl<'a> Default for NdJsonReadOptions<'a> {
173174
fn default() -> Self {
174175
Self {
175176
schema: None,
176-
schema_infer_max_records: 1000,
177+
schema_infer_max_records: DEFAULT_SCHEMA_INFER_MAX_RECORD,
177178
file_extension: DEFAULT_JSON_EXTENSION,
178179
}
179180
}

datafusion/tests/sql/mod.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -883,7 +883,7 @@ async fn nyc() -> Result<()> {
883883
},
884884
_ => unreachable!(),
885885
},
886-
_ => unreachable!(false),
886+
_ => unreachable!("{}", false),
887887
}
888888

889889
Ok(())

0 commit comments

Comments
 (0)