Skip to content

Commit 49fb6dd

Browse files
committed
Skip useless pruning predicates
1 parent 671178e commit 49fb6dd

File tree

2 files changed

+66
-12
lines changed

2 files changed

+66
-12
lines changed

datafusion/core/src/physical_optimizer/pruning.rs

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -50,6 +50,7 @@ use datafusion_expr::expr::{BinaryExpr, Cast};
5050
use datafusion_expr::expr_rewriter::{ExprRewritable, ExprRewriter};
5151
use datafusion_expr::{binary_expr, cast, try_cast, ExprSchemable};
5252
use datafusion_physical_expr::create_physical_expr;
53+
use datafusion_physical_expr::expressions::Literal;
5354
use log::trace;
5455

5556
/// Interface to pass statistics information to [`PruningPredicate`]
@@ -224,6 +225,15 @@ impl PruningPredicate {
224225
&self.predicate_expr
225226
}
226227

228+
/// Returns true if this pruning predicate is "always true" (aka will not prune anything)
229+
pub fn allways_true(&self) -> bool {
230+
self.predicate_expr
231+
.as_any()
232+
.downcast_ref::<Literal>()
233+
.map(|l| matches!(l.value(), ScalarValue::Boolean(Some(true))))
234+
.unwrap_or_default()
235+
}
236+
227237
/// Returns all need column indexes to evaluate this pruning predicate
228238
pub(crate) fn need_input_columns_ids(&self) -> HashSet<usize> {
229239
let mut set = HashSet::new();

datafusion/core/src/physical_plan/file_format/parquet.rs

Lines changed: 56 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -97,19 +97,28 @@ impl ParquetExec {
9797
let predicate_creation_errors =
9898
MetricBuilder::new(&metrics).global_counter("num_predicate_creation_errors");
9999

100-
let pruning_predicate = predicate.and_then(|predicate_expr| {
101-
match PruningPredicate::try_new(
102-
predicate_expr,
103-
base_config.file_schema.clone(),
104-
) {
105-
Ok(pruning_predicate) => Some(pruning_predicate),
106-
Err(e) => {
107-
debug!("Could not create pruning predicate for: {}", e);
108-
predicate_creation_errors.add(1);
100+
let pruning_predicate = predicate
101+
.and_then(|predicate_expr| {
102+
match PruningPredicate::try_new(
103+
predicate_expr,
104+
base_config.file_schema.clone(),
105+
) {
106+
Ok(pruning_predicate) => Some(pruning_predicate),
107+
Err(e) => {
108+
debug!("Could not create pruning predicate for: {}", e);
109+
predicate_creation_errors.add(1);
110+
None
111+
}
112+
}
113+
})
114+
.and_then(|pruning_predicate| {
115+
// If the pruning predicate can't prune anything, don't try
116+
if pruning_predicate.allways_true() {
109117
None
118+
} else {
119+
Some(pruning_predicate)
110120
}
111-
}
112-
});
121+
});
113122

114123
let (projected_schema, projected_statistics) = base_config.project();
115124

@@ -680,7 +689,7 @@ mod tests {
680689
use chrono::{TimeZone, Utc};
681690
use datafusion_common::assert_contains;
682691
use datafusion_common::ScalarValue;
683-
use datafusion_expr::{col, lit};
692+
use datafusion_expr::{col, lit, when};
684693
use futures::StreamExt;
685694
use object_store::local::LocalFileSystem;
686695
use object_store::path::Path;
@@ -1544,6 +1553,10 @@ mod tests {
15441553

15451554
let rt = round_trip(vec![batch1], None, None, Some(filter), true, false).await;
15461555

1556+
// should have a pruning predicate
1557+
let pruning_predicate = &rt.parquet_exec.pruning_predicate;
1558+
assert!(pruning_predicate.is_some());
1559+
15471560
// convert to explain plan form
15481561
let display = displayable(rt.parquet_exec.as_ref()).indent().to_string();
15491562

@@ -1554,6 +1567,37 @@ mod tests {
15541567
assert_contains!(&display, "projection=[c1]");
15551568
}
15561569

1570+
#[tokio::test]
1571+
async fn parquet_exec_skip_empty_pruning() {
1572+
let c1: ArrayRef = Arc::new(StringArray::from(vec![
1573+
Some("Foo"),
1574+
None,
1575+
Some("bar"),
1576+
Some("bar"),
1577+
Some("bar"),
1578+
Some("bar"),
1579+
Some("zzz"),
1580+
]));
1581+
1582+
// batch1: c1(string)
1583+
let batch1 = create_batch(vec![("c1", c1.clone())]);
1584+
1585+
// filter is too complicated for pruning
1586+
let filter = when(col("c1").not_eq(lit("bar")), lit(true))
1587+
.otherwise(lit(false))
1588+
.unwrap();
1589+
1590+
let rt = round_trip(vec![batch1], None, None, Some(filter), true, false).await;
1591+
1592+
// Should not contain a pruning predicate
1593+
let pruning_predicate = &rt.parquet_exec.pruning_predicate;
1594+
assert!(
1595+
pruning_predicate.is_none(),
1596+
"Still had pruning predicate: {:?}",
1597+
pruning_predicate
1598+
)
1599+
}
1600+
15571601
/// returns the sum of all the metrics with the specified name
15581602
/// the returned set.
15591603
///

0 commit comments

Comments
 (0)