Skip to content

Commit 6e0097d

Browse files
authored
Expose parquet reader settings using normal DataFusion ConfigOptions (#3822)
* Expose parquet reader settings as DataFusion config settings * fix logical conflit * Update tests
1 parent e8ea218 commit 6e0097d

File tree

16 files changed

+250
-102
lines changed

16 files changed

+250
-102
lines changed

benchmarks/src/bin/parquet_filter_pushdown.rs

Lines changed: 41 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -23,16 +23,18 @@ use arrow::datatypes::{DataType, Field, Int32Type, Schema, SchemaRef, TimeUnit};
2323
use arrow::record_batch::RecordBatch;
2424
use arrow::util::pretty;
2525
use datafusion::common::{Result, ToDFSchema};
26+
use datafusion::config::{
27+
ConfigOptions, OPT_PARQUET_ENABLE_PAGE_INDEX, OPT_PARQUET_PUSHDOWN_FILTERS,
28+
OPT_PARQUET_REORDER_FILTERS,
29+
};
2630
use datafusion::datasource::listing::{ListingTableUrl, PartitionedFile};
2731
use datafusion::datasource::object_store::ObjectStoreUrl;
2832
use datafusion::execution::context::ExecutionProps;
2933
use datafusion::logical_expr::{lit, or, Expr};
3034
use datafusion::optimizer::utils::disjunction;
3135
use datafusion::physical_expr::create_physical_expr;
3236
use datafusion::physical_plan::collect;
33-
use datafusion::physical_plan::file_format::{
34-
FileScanConfig, ParquetExec, ParquetScanOptions,
35-
};
37+
use datafusion::physical_plan::file_format::{FileScanConfig, ParquetExec};
3638
use datafusion::physical_plan::filter::FilterExec;
3739
use datafusion::prelude::{col, SessionConfig, SessionContext};
3840
use object_store::path::Path;
@@ -109,6 +111,13 @@ async fn main() -> Result<()> {
109111
Ok(())
110112
}
111113

114+
#[derive(Debug, Clone)]
115+
struct ParquetScanOptions {
116+
pushdown_filters: bool,
117+
reorder_filters: bool,
118+
enable_page_index: bool,
119+
}
120+
112121
async fn run_benchmarks(
113122
ctx: &mut SessionContext,
114123
object_store_url: ObjectStoreUrl,
@@ -117,15 +126,21 @@ async fn run_benchmarks(
117126
debug: bool,
118127
) -> Result<()> {
119128
let scan_options_matrix = vec![
120-
ParquetScanOptions::default(),
121-
ParquetScanOptions::default()
122-
.with_page_index(true)
123-
.with_pushdown_filters(true)
124-
.with_reorder_predicates(true),
125-
ParquetScanOptions::default()
126-
.with_page_index(true)
127-
.with_pushdown_filters(true)
128-
.with_reorder_predicates(false),
129+
ParquetScanOptions {
130+
pushdown_filters: false,
131+
reorder_filters: false,
132+
enable_page_index: false,
133+
},
134+
ParquetScanOptions {
135+
pushdown_filters: true,
136+
reorder_filters: true,
137+
enable_page_index: true,
138+
},
139+
ParquetScanOptions {
140+
pushdown_filters: true,
141+
reorder_filters: true,
142+
enable_page_index: false,
143+
},
129144
];
130145

131146
let filter_matrix = vec![
@@ -193,6 +208,18 @@ async fn exec_scan(
193208
debug: bool,
194209
) -> Result<usize> {
195210
let schema = BatchBuilder::schema();
211+
212+
let ParquetScanOptions {
213+
pushdown_filters,
214+
reorder_filters,
215+
enable_page_index,
216+
} = scan_options;
217+
218+
let mut config_options = ConfigOptions::new();
219+
config_options.set_bool(OPT_PARQUET_PUSHDOWN_FILTERS, pushdown_filters);
220+
config_options.set_bool(OPT_PARQUET_REORDER_FILTERS, reorder_filters);
221+
config_options.set_bool(OPT_PARQUET_ENABLE_PAGE_INDEX, enable_page_index);
222+
196223
let scan_config = FileScanConfig {
197224
object_store_url,
198225
file_schema: schema.clone(),
@@ -206,6 +233,7 @@ async fn exec_scan(
206233
projection: None,
207234
limit: None,
208235
table_partition_cols: vec![],
236+
config_options: config_options.into_shareable(),
209237
};
210238

211239
let df_schema = schema.clone().to_dfschema()?;
@@ -217,9 +245,7 @@ async fn exec_scan(
217245
&ExecutionProps::default(),
218246
)?;
219247

220-
let parquet_exec = Arc::new(
221-
ParquetExec::new(scan_config, Some(filter), None).with_scan_options(scan_options),
222-
);
248+
let parquet_exec = Arc::new(ParquetExec::new(scan_config, Some(filter), None));
223249

224250
let exec = Arc::new(FilterExec::try_new(physical_filter_expr, parquet_exec)?);
225251

datafusion/core/src/config.rs

Lines changed: 60 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -21,8 +21,10 @@ use arrow::datatypes::DataType;
2121
use datafusion_common::ScalarValue;
2222
use itertools::Itertools;
2323
use log::warn;
24+
use parking_lot::RwLock;
2425
use std::collections::HashMap;
2526
use std::env;
27+
use std::sync::Arc;
2628

2729
/// Configuration option "datafusion.optimizer.filter_null_join_keys"
2830
pub const OPT_FILTER_NULL_JOIN_KEYS: &str = "datafusion.optimizer.filter_null_join_keys";
@@ -43,13 +45,25 @@ pub const OPT_COALESCE_BATCHES: &str = "datafusion.execution.coalesce_batches";
4345
pub const OPT_COALESCE_TARGET_BATCH_SIZE: &str =
4446
"datafusion.execution.coalesce_target_batch_size";
4547

48+
/// Configuration option "datafusion.execution.time_zone"
49+
pub const OPT_TIME_ZONE: &str = "datafusion.execution.time_zone";
50+
51+
/// Configuration option "datafusion.execution.parquet.pushdown_filters"
52+
pub const OPT_PARQUET_PUSHDOWN_FILTERS: &str =
53+
"datafusion.execution.parquet.pushdown_filters";
54+
55+
/// Configuration option "datafusion.execution.parquet.reorder_filters"
56+
pub const OPT_PARQUET_REORDER_FILTERS: &str =
57+
"datafusion.execution.parquet.reorder_filters";
58+
59+
/// Configuration option "datafusion.execution.parquet.enable_page_index"
60+
pub const OPT_PARQUET_ENABLE_PAGE_INDEX: &str =
61+
"datafusion.execution.parquet.enable_page_index";
62+
4663
/// Configuration option "datafusion.optimizer.skip_failed_rules"
4764
pub const OPT_OPTIMIZER_SKIP_FAILED_RULES: &str =
4865
"datafusion.optimizer.skip_failed_rules";
4966

50-
/// Configuration option "datafusion.execution.time_zone"
51-
pub const OPT_TIME_ZONE: &str = "datafusion.execution.time_zone";
52-
5367
/// Definition of a configuration option
5468
pub struct ConfigDefinition {
5569
/// key used to identifier this configuration option
@@ -173,11 +187,11 @@ impl BuiltInConfigs {
173187
false,
174188
),
175189
ConfigDefinition::new_u64(
176-
OPT_BATCH_SIZE,
177-
"Default batch size while creating new batches, it's especially useful for \
178-
buffer-in-memory batches since creating tiny batches would results in too much metadata \
179-
memory consumption.",
180-
8192,
190+
OPT_BATCH_SIZE,
191+
"Default batch size while creating new batches, it's especially useful for \
192+
buffer-in-memory batches since creating tiny batches would results in too much metadata \
193+
memory consumption.",
194+
8192,
181195
),
182196
ConfigDefinition::new_bool(
183197
OPT_COALESCE_BATCHES,
@@ -191,23 +205,43 @@ impl BuiltInConfigs {
191205
ConfigDefinition::new_u64(
192206
OPT_COALESCE_TARGET_BATCH_SIZE,
193207
format!("Target batch size when coalescing batches. Uses in conjunction with the \
194-
configuration setting '{}'.", OPT_COALESCE_BATCHES),
208+
configuration setting '{}'.", OPT_COALESCE_BATCHES),
195209
4096,
196210
),
211+
ConfigDefinition::new_string(
212+
OPT_TIME_ZONE,
213+
"The session time zone which some function require \
214+
e.g. EXTRACT(HOUR from SOME_TIME) shift the underline datetime according to the time zone,
215+
then extract the hour.",
216+
"UTC".into()
217+
),
218+
ConfigDefinition::new_bool(
219+
OPT_PARQUET_PUSHDOWN_FILTERS,
220+
"If true, filter expressions are be applied during the parquet decoding operation to \
221+
reduce the number of rows decoded.",
222+
false,
223+
),
224+
ConfigDefinition::new_bool(
225+
OPT_PARQUET_REORDER_FILTERS,
226+
"If true, filter expressions evaluated during the parquet decoding opearation \
227+
will be reordered heuristically to minimize the cost of evaluation. If false, \
228+
the filters are applied in the same order as written in the query.",
229+
false,
230+
),
231+
ConfigDefinition::new_bool(
232+
OPT_PARQUET_ENABLE_PAGE_INDEX,
233+
"If true, uses parquet data page level metadata (Page Index) statistics \
234+
to reduce the number of rows decoded.",
235+
false,
236+
),
197237
ConfigDefinition::new_bool(
198238
OPT_OPTIMIZER_SKIP_FAILED_RULES,
199239
"When set to true, the logical plan optimizer will produce warning \
200240
messages if any optimization rules produce errors and then proceed to the next \
201241
rule. When set to false, any rules that produce errors will cause the query to fail.",
202242
true
203243
),
204-
ConfigDefinition::new_string(
205-
OPT_TIME_ZONE,
206-
"The session time zone which some function require \
207-
e.g. EXTRACT(HOUR from SOME_TIME) shift the underline datetime according to the time zone,
208-
then extract the hour",
209-
"UTC".into()
210-
)]
244+
]
211245
}
212246
}
213247

@@ -255,8 +289,16 @@ impl ConfigOptions {
255289
Self { options }
256290
}
257291

258-
/// Create new ConfigOptions struct, taking values from environment variables where possible.
259-
/// For example, setting `DATAFUSION_EXECUTION_BATCH_SIZE` to control `datafusion.execution.batch_size`.
292+
/// Create a new [`ConfigOptions`] wrapped in an RwLock and Arc
293+
pub fn into_shareable(self) -> Arc<RwLock<Self>> {
294+
Arc::new(RwLock::new(self))
295+
}
296+
297+
/// Create new ConfigOptions struct, taking values from
298+
/// environment variables where possible.
299+
///
300+
/// For example, setting `DATAFUSION_EXECUTION_BATCH_SIZE` will
301+
/// control `datafusion.execution.batch_size`.
260302
pub fn from_env() -> Self {
261303
let built_in = BuiltInConfigs::new();
262304
let mut options = HashMap::with_capacity(built_in.config_definitions.len());

datafusion/core/src/datasource/file_format/mod.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -84,6 +84,7 @@ pub trait FileFormat: Send + Sync + fmt::Debug {
8484
#[cfg(test)]
8585
pub(crate) mod test_util {
8686
use super::*;
87+
use crate::config::ConfigOptions;
8788
use crate::datasource::listing::PartitionedFile;
8889
use crate::datasource::object_store::ObjectStoreUrl;
8990
use crate::test::object_store::local_unpartitioned_file;
@@ -122,6 +123,7 @@ pub(crate) mod test_util {
122123
projection,
123124
limit,
124125
table_partition_cols: vec![],
126+
config_options: ConfigOptions::new().into_shareable(),
125127
},
126128
&[],
127129
)

datafusion/core/src/datasource/listing/table.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -404,6 +404,7 @@ impl TableProvider for ListingTable {
404404
projection: projection.clone(),
405405
limit,
406406
table_partition_cols: self.options.table_partition_cols.clone(),
407+
config_options: ctx.config.config_options(),
407408
},
408409
filters,
409410
)

datafusion/core/src/execution/context.rs

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1184,7 +1184,7 @@ impl SessionConfig {
11841184
/// Create an execution config with config options read from the environment
11851185
pub fn from_env() -> Self {
11861186
Self {
1187-
config_options: Arc::new(RwLock::new(ConfigOptions::from_env())),
1187+
config_options: ConfigOptions::from_env().into_shareable(),
11881188
..Default::default()
11891189
}
11901190
}
@@ -1324,6 +1324,13 @@ impl SessionConfig {
13241324
map
13251325
}
13261326

1327+
/// Return a handle to the shared configuration options.
1328+
///
1329+
/// [`config_options`]: SessionContext::config_option
1330+
pub fn config_options(&self) -> Arc<RwLock<ConfigOptions>> {
1331+
self.config_options.clone()
1332+
}
1333+
13271334
/// Add extensions.
13281335
///
13291336
/// Extensions can be used to attach extra data to the session config -- e.g. tracing information or caches.

datafusion/core/src/execution/options.rs

Lines changed: 26 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,12 @@ use crate::datasource::{
3434
listing::ListingOptions,
3535
};
3636

37-
/// CSV file read option
37+
/// Options that control the reading of CSV files.
38+
///
39+
/// Note this structure is supplied when a datasource is created and
40+
/// can not not vary from statement to statement. For settings that
41+
/// can vary statement to statement see
42+
/// [`ConfigOptions`](crate::config::ConfigOptions).
3843
#[derive(Clone)]
3944
pub struct CsvReadOptions<'a> {
4045
/// Does the CSV file have a header?
@@ -150,7 +155,12 @@ impl<'a> CsvReadOptions<'a> {
150155
}
151156
}
152157

153-
/// Parquet read options
158+
/// Options that control the reading of Parquet files.
159+
///
160+
/// Note this structure is supplied when a datasource is created and
161+
/// can not not vary from statement to statement. For settings that
162+
/// can vary statement to statement see
163+
/// [`ConfigOptions`](crate::config::ConfigOptions).
154164
#[derive(Clone)]
155165
pub struct ParquetReadOptions<'a> {
156166
/// File extension; only files with this extension are selected for data input.
@@ -160,10 +170,12 @@ pub struct ParquetReadOptions<'a> {
160170
pub table_partition_cols: Vec<String>,
161171
/// Should DataFusion parquet reader use the predicate to prune data,
162172
/// overridden by value on execution::context::SessionConfig
173+
// TODO move this into ConfigOptions
163174
pub parquet_pruning: bool,
164175
/// Tell the parquet reader to skip any metadata that may be in
165176
/// the file Schema. This can help avoid schema conflicts due to
166177
/// metadata. Defaults to true.
178+
// TODO move this into ConfigOptions
167179
pub skip_metadata: bool,
168180
}
169181

@@ -217,7 +229,12 @@ impl<'a> ParquetReadOptions<'a> {
217229
}
218230
}
219231

220-
/// Avro read options
232+
/// Options that control the reading of AVRO files.
233+
///
234+
/// Note this structure is supplied when a datasource is created and
235+
/// can not not vary from statement to statement. For settings that
236+
/// can vary statement to statement see
237+
/// [`ConfigOptions`](crate::config::ConfigOptions).
221238
#[derive(Clone)]
222239
pub struct AvroReadOptions<'a> {
223240
/// The data source schema.
@@ -261,7 +278,12 @@ impl<'a> AvroReadOptions<'a> {
261278
}
262279
}
263280

264-
/// Line-delimited JSON read options
281+
/// Options that control the reading of Line-delimited JSON files (NDJson)
282+
///
283+
/// Note this structure is supplied when a datasource is created and
284+
/// can not not vary from statement to statement. For settings that
285+
/// can vary statement to statement see
286+
/// [`ConfigOptions`](crate::config::ConfigOptions).
265287
#[derive(Clone)]
266288
pub struct NdJsonReadOptions<'a> {
267289
/// The data source schema.

datafusion/core/src/physical_optimizer/repartition.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -240,6 +240,7 @@ mod tests {
240240
use arrow::datatypes::{DataType, Field, Schema, SchemaRef};
241241

242242
use super::*;
243+
use crate::config::ConfigOptions;
243244
use crate::datasource::listing::PartitionedFile;
244245
use crate::datasource::object_store::ObjectStoreUrl;
245246
use crate::physical_plan::aggregates::{
@@ -269,6 +270,7 @@ mod tests {
269270
projection: None,
270271
limit: None,
271272
table_partition_cols: vec![],
273+
config_options: ConfigOptions::new().into_shareable(),
272274
},
273275
None,
274276
None,

datafusion/core/src/physical_plan/file_format/avro.rs

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -208,6 +208,7 @@ mod private {
208208
#[cfg(test)]
209209
#[cfg(feature = "avro")]
210210
mod tests {
211+
use crate::config::ConfigOptions;
211212
use crate::datasource::file_format::{avro::AvroFormat, FileFormat};
212213
use crate::datasource::listing::PartitionedFile;
213214
use crate::datasource::object_store::ObjectStoreUrl;
@@ -237,6 +238,7 @@ mod tests {
237238
projection: Some(vec![0, 1, 2]),
238239
limit: None,
239240
table_partition_cols: vec![],
241+
config_options: ConfigOptions::new().into_shareable(),
240242
});
241243
assert_eq!(avro_exec.output_partitioning().partition_count(), 1);
242244

@@ -306,6 +308,7 @@ mod tests {
306308
projection,
307309
limit: None,
308310
table_partition_cols: vec![],
311+
config_options: ConfigOptions::new().into_shareable(),
309312
});
310313
assert_eq!(avro_exec.output_partitioning().partition_count(), 1);
311314

@@ -374,6 +377,7 @@ mod tests {
374377
statistics: Statistics::default(),
375378
limit: None,
376379
table_partition_cols: vec!["date".to_owned()],
380+
config_options: ConfigOptions::new().into_shareable(),
377381
});
378382
assert_eq!(avro_exec.output_partitioning().partition_count(), 1);
379383

0 commit comments

Comments
 (0)