Skip to content

Commit ec6de25

Browse files
committed
Push down more predicates into parquet exec
1 parent a584ff5 commit ec6de25

File tree

1 file changed

+27
-8
lines changed
  • datafusion/core/src/physical_plan/file_format

1 file changed

+27
-8
lines changed

datafusion/core/src/physical_plan/file_format/parquet.rs

Lines changed: 27 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -75,6 +75,8 @@ pub struct ParquetExec {
7575
projected_schema: SchemaRef,
7676
/// Execution metrics
7777
metrics: ExecutionPlanMetricsSet,
78+
/// Optional predicate for row filtering during parquet scan
79+
predicate: Option<Expr>,
7880
/// Optional predicate for pruning row groups
7981
pruning_predicate: Option<PruningPredicate>,
8082
/// Optional hint for the size of the parquet metadata
@@ -98,6 +100,7 @@ impl ParquetExec {
98100
MetricBuilder::new(&metrics).global_counter("num_predicate_creation_errors");
99101

100102
let pruning_predicate = predicate
103+
.clone()
101104
.and_then(|predicate_expr| {
102105
match PruningPredicate::try_new(
103106
predicate_expr,
@@ -127,6 +130,7 @@ impl ParquetExec {
127130
projected_schema,
128131
projected_statistics,
129132
metrics,
133+
predicate,
130134
pruning_predicate,
131135
metadata_size_hint,
132136
parquet_file_reader_factory: None,
@@ -284,6 +288,7 @@ impl ExecutionPlan for ParquetExec {
284288
partition_index,
285289
projection: Arc::from(projection),
286290
batch_size: ctx.session_config().batch_size(),
291+
predicate: self.predicate.clone(),
287292
pruning_predicate: self.pruning_predicate.clone(),
288293
table_schema: self.base_config.file_schema.clone(),
289294
metadata_size_hint: self.metadata_size_hint,
@@ -312,6 +317,12 @@ impl ExecutionPlan for ParquetExec {
312317
) -> std::fmt::Result {
313318
match t {
314319
DisplayFormatType::Default => {
320+
let predicate_string = self
321+
.predicate
322+
.as_ref()
323+
.map(|p| format!(", predicate={}", p))
324+
.unwrap_or_default();
325+
315326
let pruning_predicate_string = self
316327
.pruning_predicate
317328
.as_ref()
@@ -325,9 +336,10 @@ impl ExecutionPlan for ParquetExec {
325336

326337
write!(
327338
f,
328-
"ParquetExec: limit={:?}, partitions={}{}{}, projection={}",
339+
"ParquetExec: limit={:?}, partitions={}{}{}{}, projection={}",
329340
self.base_config.limit,
330341
super::FileGroupsDisplay(&self.base_config.file_groups),
342+
predicate_string,
331343
pruning_predicate_string,
332344
output_ordering_string,
333345
super::ProjectSchemaDisplay(&self.projected_schema),
@@ -364,6 +376,7 @@ struct ParquetOpener {
364376
partition_index: usize,
365377
projection: Arc<[usize]>,
366378
batch_size: usize,
379+
predicate: Option<Expr>,
367380
pruning_predicate: Option<PruningPredicate>,
368381
table_schema: SchemaRef,
369382
metadata_size_hint: Option<usize>,
@@ -399,6 +412,7 @@ impl FileOpener for ParquetOpener {
399412
let schema_adapter = SchemaAdapter::new(self.table_schema.clone());
400413
let batch_size = self.batch_size;
401414
let projection = self.projection.clone();
415+
let predicate = self.predicate.clone();
402416
let pruning_predicate = self.pruning_predicate.clone();
403417
let table_schema = self.table_schema.clone();
404418
let reorder_predicates = self.reorder_filters;
@@ -418,11 +432,8 @@ impl FileOpener for ParquetOpener {
418432
adapted_projections.iter().cloned(),
419433
);
420434

421-
// Filter pushdown: evlauate predicates during scan
422-
if let Some(predicate) = pushdown_filters
423-
.then(|| pruning_predicate.as_ref().map(|p| p.logical_expr()))
424-
.flatten()
425-
{
435+
// Filter pushdown: evaluate predicates during scan
436+
if let Some(predicate) = pushdown_filters.then_some(predicate).flatten() {
426437
let row_filter = row_filter::build_row_filter(
427438
predicate.clone(),
428439
builder.schema().as_ref(),
@@ -1564,6 +1575,9 @@ mod tests {
15641575
&display,
15651576
"pruning_predicate=c1_min@0 != bar OR bar != c1_max@1"
15661577
);
1578+
1579+
assert_contains!(&display, r#"predicate=c1 != Utf8("bar")"#);
1580+
15671581
assert_contains!(&display, "projection=[c1]");
15681582
}
15691583

@@ -1587,15 +1601,20 @@ mod tests {
15871601
.otherwise(lit(false))
15881602
.unwrap();
15891603

1590-
let rt = round_trip(vec![batch1], None, None, Some(filter), true, false).await;
1604+
let rt =
1605+
round_trip(vec![batch1], None, None, Some(filter.clone()), true, false).await;
15911606

15921607
// Should not contain a pruning predicate
15931608
let pruning_predicate = &rt.parquet_exec.pruning_predicate;
15941609
assert!(
15951610
pruning_predicate.is_none(),
15961611
"Still had pruning predicate: {:?}",
15971612
pruning_predicate
1598-
)
1613+
);
1614+
1615+
// but does still has a pushdown down predicate
1616+
let predicate = rt.parquet_exec.predicate.as_ref();
1617+
assert_eq!(predicate, Some(&filter));
15991618
}
16001619

16011620
/// returns the sum of all the metrics with the specified name

0 commit comments

Comments
 (0)