Skip to content

Commit e96004d

Browse files
committed
init
1 parent 34fbe8e commit e96004d

File tree

29 files changed

+926
-64
lines changed

29 files changed

+926
-64
lines changed

datafusion/common/src/config.rs

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -617,6 +617,10 @@ config_namespace! {
617617
/// then the output will be coerced to a non-view.
618618
/// Coerces `Utf8View` to `LargeUtf8`, and `BinaryView` to `LargeBinary`.
619619
pub expand_views_at_output: bool, default = false
620+
621+
/// when set to true, datafusion would try to push the build side statistic
622+
/// to probe phase
623+
pub dynamic_join_pushdown: bool, default = true
620624
}
621625
}
622626

datafusion/core/src/datasource/physical_plan/parquet/mod.rs

Lines changed: 28 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -42,6 +42,7 @@ use crate::{
4242
use arrow::datatypes::SchemaRef;
4343
use datafusion_physical_expr::{EquivalenceProperties, LexOrdering, PhysicalExpr};
4444

45+
use datafusion_physical_plan::joins::DynamicFilterInfo;
4546
use itertools::Itertools;
4647
use log::debug;
4748

@@ -282,6 +283,8 @@ pub struct ParquetExec {
282283
table_parquet_options: TableParquetOptions,
283284
/// Optional user defined schema adapter
284285
schema_adapter_factory: Option<Arc<dyn SchemaAdapterFactory>>,
286+
/// dynamic filters (like join filters)
287+
dynamic_filters: Option<Arc<DynamicFilterInfo>>,
285288
}
286289

287290
impl From<ParquetExec> for ParquetExecBuilder {
@@ -291,7 +294,6 @@ impl From<ParquetExec> for ParquetExecBuilder {
291294
}
292295

293296
/// [`ParquetExecBuilder`], builder for [`ParquetExec`].
294-
///
295297
/// See example on [`ParquetExec`].
296298
pub struct ParquetExecBuilder {
297299
file_scan_config: FileScanConfig,
@@ -463,6 +465,7 @@ impl ParquetExecBuilder {
463465
cache,
464466
table_parquet_options,
465467
schema_adapter_factory,
468+
dynamic_filters: None,
466469
}
467470
}
468471
}
@@ -515,6 +518,7 @@ impl ParquetExec {
515518
cache: _,
516519
table_parquet_options,
517520
schema_adapter_factory,
521+
..
518522
} = self;
519523
ParquetExecBuilder {
520524
file_scan_config: base_config,
@@ -579,6 +583,15 @@ impl ParquetExec {
579583
self
580584
}
581585

586+
/// with the dynamic filter
587+
pub fn with_dynamic_filter(
588+
mut self,
589+
dynamic_filter: Option<Arc<DynamicFilterInfo>>,
590+
) -> Self {
591+
self.dynamic_filters = dynamic_filter;
592+
self
593+
}
594+
582595
/// If true, the predicate will be used during the parquet scan.
583596
/// Defaults to false
584597
///
@@ -711,10 +724,15 @@ impl DisplayAs for ParquetExec {
711724
)
712725
})
713726
.unwrap_or_default();
714-
727+
let dynamic_filter =
728+
format!("dynamic_filter: {:?}", self.dynamic_filters);
715729
write!(f, "ParquetExec: ")?;
716730
self.base_config.fmt_as(t, f)?;
717-
write!(f, "{}{}", predicate_string, pruning_predicate_string,)
731+
write!(
732+
f,
733+
"{}{}{}",
734+
predicate_string, pruning_predicate_string, dynamic_filter
735+
)
718736
}
719737
}
720738
}
@@ -798,13 +816,18 @@ impl ExecutionPlan for ParquetExec {
798816
.schema_adapter_factory
799817
.clone()
800818
.unwrap_or_else(|| Arc::new(DefaultSchemaAdapterFactory));
819+
let final_predicate = if let Some(dynamic_filter) = &self.dynamic_filters {
820+
dynamic_filter.final_predicate(self.predicate.clone())
821+
} else {
822+
self.predicate.clone()
823+
};
801824

802825
let opener = ParquetOpener {
803826
partition_index,
804827
projection: Arc::from(projection),
805828
batch_size: ctx.session_config().batch_size(),
806829
limit: self.base_config.limit,
807-
predicate: self.predicate.clone(),
830+
predicate: final_predicate,
808831
pruning_predicate: self.pruning_predicate.clone(),
809832
page_pruning_predicate: self.page_pruning_predicate.clone(),
810833
table_schema: self.base_config.file_schema.clone(),
@@ -862,6 +885,7 @@ impl ExecutionPlan for ParquetExec {
862885
cache: self.cache.clone(),
863886
table_parquet_options: self.table_parquet_options.clone(),
864887
schema_adapter_factory: self.schema_adapter_factory.clone(),
888+
dynamic_filters: self.dynamic_filters.clone(),
865889
}))
866890
}
867891
}

datafusion/core/src/datasource/physical_plan/parquet/opener.rs

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -160,7 +160,6 @@ impl FileOpener for ParquetOpener {
160160
&file_metrics,
161161
Arc::clone(&schema_mapping),
162162
);
163-
164163
match row_filter {
165164
Ok(Some(filter)) => {
166165
builder = builder.with_row_filter(filter);

datafusion/core/src/physical_optimizer/enforce_distribution.rs

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -52,11 +52,10 @@ use datafusion_physical_expr::utils::map_columns_before_projection;
5252
use datafusion_physical_expr::{
5353
physical_exprs_equal, EquivalenceProperties, PhysicalExpr, PhysicalExprRef,
5454
};
55-
use datafusion_physical_plan::windows::{get_best_fitting_window, BoundedWindowAggExec};
56-
use datafusion_physical_plan::ExecutionPlanProperties;
57-
5855
use datafusion_physical_optimizer::output_requirements::OutputRequirementExec;
5956
use datafusion_physical_optimizer::PhysicalOptimizerRule;
57+
use datafusion_physical_plan::windows::{get_best_fitting_window, BoundedWindowAggExec};
58+
use datafusion_physical_plan::ExecutionPlanProperties;
6059
use itertools::izip;
6160

6261
/// The `EnforceDistribution` rule ensures that distribution requirements are
Lines changed: 112 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,112 @@
1+
// Licensed to the Apache Software Foundation (ASF) under one
2+
// or more contributor license agreements. See the NOTICE file
3+
// distributed with this work for additional information
4+
// regarding copyright ownership. The ASF licenses this file
5+
// to you under the Apache License, Version 2.0 (the
6+
// "License"); you may not use this file except in compliance
7+
// with the License. You may obtain a copy of the License at
8+
//
9+
// http://www.apache.org/licenses/LICENSE-2.0
10+
//
11+
// Unless required by applicable law or agreed to in writing,
12+
// software distributed under the License is distributed on an
13+
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14+
// KIND, either express or implied. See the License for the
15+
// specific language governing permissions and limitations
16+
// under the License.
17+
18+
//! Pushdown the dynamic join filters down to scan execution if there is any
19+
20+
use std::sync::Arc;
21+
22+
use crate::{config::ConfigOptions, error::Result, physical_plan::joins::HashJoinExec};
23+
24+
use crate::datasource::physical_plan::ParquetExec;
25+
use crate::physical_plan::ExecutionPlan;
26+
use datafusion_common::tree_node::{Transformed, TransformedResult};
27+
use datafusion_physical_optimizer::PhysicalOptimizerRule;
28+
use datafusion_physical_plan::joins::DynamicFilterInfo;
29+
30+
/// this rule used for pushing the build side statistic down to probe phase
31+
#[derive(Default, Debug)]
32+
pub struct JoinFilterPushdown {}
33+
34+
impl JoinFilterPushdown {
35+
#[allow(missing_docs)]
36+
pub fn new() -> Self {
37+
Self {}
38+
}
39+
}
40+
41+
impl PhysicalOptimizerRule for JoinFilterPushdown {
42+
fn optimize(
43+
&self,
44+
plan: Arc<dyn ExecutionPlan>,
45+
config: &ConfigOptions,
46+
) -> Result<Arc<dyn ExecutionPlan>> {
47+
if !config.optimizer.dynamic_join_pushdown {
48+
return Ok(plan);
49+
}
50+
optimize_impl(plan, &mut None).data()
51+
}
52+
53+
fn name(&self) -> &str {
54+
"JoinFilterPushdown"
55+
}
56+
57+
fn schema_check(&self) -> bool {
58+
true
59+
}
60+
}
61+
62+
fn optimize_impl(
63+
plan: Arc<dyn ExecutionPlan>,
64+
join_filters: &mut Option<Arc<DynamicFilterInfo>>,
65+
) -> Result<Transformed<Arc<dyn ExecutionPlan>>> {
66+
if let Some(hashjoin_exec) = plan.as_any().downcast_ref::<HashJoinExec>() {
67+
join_filters.clone_from(&hashjoin_exec.dynamic_filters_pushdown);
68+
let new_right = optimize_impl(hashjoin_exec.right.clone(), join_filters)?;
69+
if new_right.transformed {
70+
let new_hash_join = HashJoinExec::try_new(
71+
hashjoin_exec.left().clone(),
72+
new_right.data,
73+
hashjoin_exec.on.clone(),
74+
hashjoin_exec.filter().cloned(),
75+
hashjoin_exec.join_type(),
76+
hashjoin_exec.projection.clone(),
77+
*hashjoin_exec.partition_mode(),
78+
hashjoin_exec.null_equals_null(),
79+
)?
80+
.with_dynamic_filter_info(hashjoin_exec.dynamic_filters_pushdown.clone());
81+
return Ok(Transformed::yes(Arc::new(new_hash_join)));
82+
}
83+
Ok(Transformed::no(plan))
84+
} else if let Some(parquet_exec) = plan.as_any().downcast_ref::<ParquetExec>() {
85+
if let Some(dynamic_filters) = join_filters {
86+
let final_exec = parquet_exec
87+
.clone()
88+
.with_dynamic_filter(Some(dynamic_filters.clone()));
89+
return Ok(Transformed::yes(Arc::new(final_exec)));
90+
}
91+
Ok(Transformed::no(plan))
92+
} else {
93+
let children = plan.children();
94+
let mut new_children = Vec::with_capacity(children.len());
95+
let mut transformed = false;
96+
97+
for child in children {
98+
let new_child = optimize_impl(child.clone(), join_filters)?;
99+
if new_child.transformed {
100+
transformed = true;
101+
}
102+
new_children.push(new_child.data);
103+
}
104+
105+
if transformed {
106+
let new_plan = plan.with_new_children(new_children)?;
107+
Ok(Transformed::yes(new_plan))
108+
} else {
109+
Ok(Transformed::no(plan))
110+
}
111+
}
112+
}

datafusion/core/src/physical_optimizer/join_selection.rs

Lines changed: 13 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -383,16 +383,19 @@ fn try_collect_left(
383383
{
384384
Ok(Some(swap_hash_join(hash_join, PartitionMode::CollectLeft)?))
385385
} else {
386-
Ok(Some(Arc::new(HashJoinExec::try_new(
387-
Arc::clone(left),
388-
Arc::clone(right),
389-
hash_join.on().to_vec(),
390-
hash_join.filter().cloned(),
391-
hash_join.join_type(),
392-
hash_join.projection.clone(),
393-
PartitionMode::CollectLeft,
394-
hash_join.null_equals_null(),
395-
)?)))
386+
Ok(Some(Arc::new(
387+
HashJoinExec::try_new(
388+
Arc::clone(left),
389+
Arc::clone(right),
390+
hash_join.on().to_vec(),
391+
hash_join.filter().cloned(),
392+
hash_join.join_type(),
393+
hash_join.projection.clone(),
394+
PartitionMode::CollectLeft,
395+
hash_join.null_equals_null(),
396+
)?
397+
.with_dynamic_filter_info(hash_join.dynamic_filters_pushdown.clone()),
398+
)))
396399
}
397400
}
398401
(true, false) => Ok(Some(Arc::new(HashJoinExec::try_new(

datafusion/core/src/physical_optimizer/mod.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@
2424
pub mod coalesce_batches;
2525
pub mod enforce_distribution;
2626
pub mod enforce_sorting;
27+
pub mod join_filter_pushdown;
2728
pub mod join_selection;
2829
pub mod optimizer;
2930
pub mod projection_pushdown;

datafusion/core/src/physical_optimizer/optimizer.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020
use datafusion_physical_optimizer::PhysicalOptimizerRule;
2121
use std::sync::Arc;
2222

23+
use super::join_filter_pushdown::JoinFilterPushdown;
2324
use super::projection_pushdown::ProjectionPushdown;
2425
use super::update_aggr_exprs::OptimizeAggregateOrder;
2526
use crate::physical_optimizer::aggregate_statistics::AggregateStatistics;
@@ -112,6 +113,7 @@ impl PhysicalOptimizer {
112113
// given query plan; i.e. it only acts as a final
113114
// gatekeeping rule.
114115
Arc::new(SanityCheckPlan::new()),
116+
Arc::new(JoinFilterPushdown::new()),
115117
];
116118

117119
Self::with_rules(rules)

0 commit comments

Comments
 (0)