Skip to content

Push down more predicates into ParquetExec #4279

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Nov 21, 2022
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
35 changes: 27 additions & 8 deletions datafusion/core/src/physical_plan/file_format/parquet.rs
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,8 @@ pub struct ParquetExec {
projected_schema: SchemaRef,
/// Execution metrics
metrics: ExecutionPlanMetricsSet,
/// Optional predicate for row filtering during parquet scan
predicate: Option<Expr>,
/// Optional predicate for pruning row groups
pruning_predicate: Option<PruningPredicate>,
/// Optional hint for the size of the parquet metadata
Expand All @@ -98,6 +100,7 @@ impl ParquetExec {
MetricBuilder::new(&metrics).global_counter("num_predicate_creation_errors");

let pruning_predicate = predicate
.clone()
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the key difference here is that the original predicate is kept, even if the creation of the pruning predicate fails

.and_then(|predicate_expr| {
match PruningPredicate::try_new(
predicate_expr,
Expand Down Expand Up @@ -127,6 +130,7 @@ impl ParquetExec {
projected_schema,
projected_statistics,
metrics,
predicate,
pruning_predicate,
metadata_size_hint,
parquet_file_reader_factory: None,
Expand Down Expand Up @@ -284,6 +288,7 @@ impl ExecutionPlan for ParquetExec {
partition_index,
projection: Arc::from(projection),
batch_size: ctx.session_config().batch_size(),
predicate: self.predicate.clone(),
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't like all the cloning -- I have a follow on PR (#4283) to stop the copying

pruning_predicate: self.pruning_predicate.clone(),
table_schema: self.base_config.file_schema.clone(),
metadata_size_hint: self.metadata_size_hint,
Expand Down Expand Up @@ -312,6 +317,12 @@ impl ExecutionPlan for ParquetExec {
) -> std::fmt::Result {
match t {
DisplayFormatType::Default => {
let predicate_string = self
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The predicate is now explicitly shown as well in EXPLAIN (as it is difference)

.predicate
.as_ref()
.map(|p| format!(", predicate={}", p))
.unwrap_or_default();

let pruning_predicate_string = self
.pruning_predicate
.as_ref()
Expand All @@ -325,9 +336,10 @@ impl ExecutionPlan for ParquetExec {

write!(
f,
"ParquetExec: limit={:?}, partitions={}{}{}, projection={}",
"ParquetExec: limit={:?}, partitions={}{}{}{}, projection={}",
self.base_config.limit,
super::FileGroupsDisplay(&self.base_config.file_groups),
predicate_string,
pruning_predicate_string,
output_ordering_string,
super::ProjectSchemaDisplay(&self.projected_schema),
Expand Down Expand Up @@ -364,6 +376,7 @@ struct ParquetOpener {
partition_index: usize,
projection: Arc<[usize]>,
batch_size: usize,
predicate: Option<Expr>,
pruning_predicate: Option<PruningPredicate>,
table_schema: SchemaRef,
metadata_size_hint: Option<usize>,
Expand Down Expand Up @@ -399,6 +412,7 @@ impl FileOpener for ParquetOpener {
let schema_adapter = SchemaAdapter::new(self.table_schema.clone());
let batch_size = self.batch_size;
let projection = self.projection.clone();
let predicate = self.predicate.clone();
let pruning_predicate = self.pruning_predicate.clone();
let table_schema = self.table_schema.clone();
let reorder_predicates = self.reorder_filters;
Expand All @@ -418,11 +432,8 @@ impl FileOpener for ParquetOpener {
adapted_projections.iter().cloned(),
);

// Filter pushdown: evlauate predicates during scan
if let Some(predicate) = pushdown_filters
.then(|| pruning_predicate.as_ref().map(|p| p.logical_expr()))
.flatten()
{
// Filter pushdown: evaluate predicates during scan
if let Some(predicate) = pushdown_filters.then_some(predicate).flatten() {
let row_filter = row_filter::build_row_filter(
predicate.clone(),
builder.schema().as_ref(),
Expand Down Expand Up @@ -1564,6 +1575,9 @@ mod tests {
&display,
"pruning_predicate=c1_min@0 != bar OR bar != c1_max@1"
);

assert_contains!(&display, r#"predicate=c1 != Utf8("bar")"#);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍


assert_contains!(&display, "projection=[c1]");
}

Expand All @@ -1587,15 +1601,20 @@ mod tests {
.otherwise(lit(false))
.unwrap();

let rt = round_trip(vec![batch1], None, None, Some(filter), true, false).await;
let rt =
round_trip(vec![batch1], None, None, Some(filter.clone()), true, false).await;

// Should not contain a pruning predicate
let pruning_predicate = &rt.parquet_exec.pruning_predicate;
assert!(
pruning_predicate.is_none(),
"Still had pruning predicate: {:?}",
pruning_predicate
)
);

// but does still has a pushdown down predicate
let predicate = rt.parquet_exec.predicate.as_ref();
assert_eq!(predicate, Some(&filter));
}

/// returns the sum of all the metrics with the specified name
Expand Down