Skip to content

Commit e5fd1eb

Browse files
committed
init
1 parent 345117b commit e5fd1eb

File tree

34 files changed

+1293
-37
lines changed

34 files changed

+1293
-37
lines changed

datafusion-cli/Cargo.lock

Lines changed: 2 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

datafusion/common/src/config.rs

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -636,6 +636,10 @@ config_namespace! {
636636
/// then the output will be coerced to a non-view.
637637
/// Coerces `Utf8View` to `LargeUtf8`, and `BinaryView` to `LargeBinary`.
638638
pub expand_views_at_output: bool, default = false
639+
640+
/// when set to true, datafusion would try to push the build side statistic
641+
/// to probe phase
642+
pub dynamic_join_pushdown: bool, default = true
639643
}
640644
}
641645

datafusion/core/src/datasource/physical_plan/file_stream.rs

Lines changed: 22 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -21,11 +21,6 @@
2121
//! Note: Most traits here need to be marked `Sync + Send` to be
2222
//! compliant with the `SendableRecordBatchStream` trait.
2323
24-
use std::collections::VecDeque;
25-
use std::mem;
26-
use std::pin::Pin;
27-
use std::task::{Context, Poll};
28-
2924
use crate::datasource::listing::PartitionedFile;
3025
use crate::datasource::physical_plan::file_scan_config::PartitionColumnProjector;
3126
use crate::datasource::physical_plan::{FileMeta, FileScanConfig};
@@ -34,13 +29,19 @@ use crate::physical_plan::metrics::{
3429
BaselineMetrics, Count, ExecutionPlanMetricsSet, MetricBuilder, Time,
3530
};
3631
use crate::physical_plan::RecordBatchStream;
32+
use std::collections::VecDeque;
33+
use std::mem;
34+
use std::pin::Pin;
35+
use std::sync::Arc;
36+
use std::task::{Context, Poll};
3737

3838
use arrow::datatypes::SchemaRef;
3939
use arrow::error::ArrowError;
4040
use arrow::record_batch::RecordBatch;
4141
use datafusion_common::instant::Instant;
4242
use datafusion_common::ScalarValue;
4343

44+
use datafusion_physical_plan::joins::DynamicFilterInfo;
4445
use futures::future::BoxFuture;
4546
use futures::stream::BoxStream;
4647
use futures::{ready, FutureExt, Stream, StreamExt};
@@ -95,6 +96,8 @@ pub struct FileStream<F: FileOpener> {
9596
baseline_metrics: BaselineMetrics,
9697
/// Describes the behavior of the `FileStream` if file opening or scanning fails
9798
on_error: OnError,
99+
/// dynamic filters
100+
dynamic_filters: Option<Arc<DynamicFilterInfo>>,
98101
}
99102

100103
/// Represents the state of the next `FileOpenFuture`. Since we need to poll
@@ -272,6 +275,7 @@ impl<F: FileOpener> FileStream<F> {
272275
file_stream_metrics: FileStreamMetrics::new(metrics, partition),
273276
baseline_metrics: BaselineMetrics::new(metrics, partition),
274277
on_error: OnError::Fail,
278+
dynamic_filters: None,
275279
})
276280
}
277281

@@ -283,6 +287,14 @@ impl<F: FileOpener> FileStream<F> {
283287
self.on_error = on_error;
284288
self
285289
}
290+
/// with dynamic filters
291+
pub fn with_dynamic_filter(
292+
mut self,
293+
dynamic_filter: Option<Arc<DynamicFilterInfo>>,
294+
) -> Self {
295+
self.dynamic_filters = dynamic_filter;
296+
self
297+
}
286298

287299
/// Begin opening the next file in parallel while decoding the current file in FileStream.
288300
///
@@ -390,7 +402,11 @@ impl<F: FileOpener> FileStream<F> {
390402
}
391403
}
392404
match ready!(reader.poll_next_unpin(cx)) {
393-
Some(Ok(batch)) => {
405+
Some(Ok(mut batch)) => {
406+
// if there is a ready dynamic filter, we just use it to filter
407+
if let Some(dynamic_filters) = &self.dynamic_filters {
408+
batch = dynamic_filters.filter_batch(&batch)?
409+
}
394410
self.file_stream_metrics.time_scanning_until_data.stop();
395411
self.file_stream_metrics.time_scanning_total.stop();
396412
let result = self

datafusion/core/src/datasource/physical_plan/parquet/mod.rs

Lines changed: 44 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -42,6 +42,8 @@ use crate::{
4242
use arrow::datatypes::SchemaRef;
4343
use datafusion_physical_expr::{EquivalenceProperties, LexOrdering, PhysicalExpr};
4444

45+
use datafusion_physical_plan::joins::DynamicFilterInfo;
46+
use datafusion_physical_plan::Metric;
4547
use itertools::Itertools;
4648
use log::debug;
4749

@@ -282,6 +284,8 @@ pub struct ParquetExec {
282284
table_parquet_options: TableParquetOptions,
283285
/// Optional user defined schema adapter
284286
schema_adapter_factory: Option<Arc<dyn SchemaAdapterFactory>>,
287+
/// dynamic filters (like join filters)
288+
dynamic_filters: Option<Arc<DynamicFilterInfo>>,
285289
}
286290

287291
impl From<ParquetExec> for ParquetExecBuilder {
@@ -291,7 +295,6 @@ impl From<ParquetExec> for ParquetExecBuilder {
291295
}
292296

293297
/// [`ParquetExecBuilder`], builder for [`ParquetExec`].
294-
///
295298
/// See example on [`ParquetExec`].
296299
pub struct ParquetExecBuilder {
297300
file_scan_config: FileScanConfig,
@@ -463,6 +466,7 @@ impl ParquetExecBuilder {
463466
cache,
464467
table_parquet_options,
465468
schema_adapter_factory,
469+
dynamic_filters: None,
466470
}
467471
}
468472
}
@@ -515,6 +519,7 @@ impl ParquetExec {
515519
cache: _,
516520
table_parquet_options,
517521
schema_adapter_factory,
522+
..
518523
} = self;
519524
ParquetExecBuilder {
520525
file_scan_config: base_config,
@@ -711,10 +716,9 @@ impl DisplayAs for ParquetExec {
711716
)
712717
})
713718
.unwrap_or_default();
714-
715719
write!(f, "ParquetExec: ")?;
716720
self.base_config.fmt_as(t, f)?;
717-
write!(f, "{}{}", predicate_string, pruning_predicate_string,)
721+
write!(f, "{}{}", predicate_string, pruning_predicate_string)
718722
}
719723
}
720724
}
@@ -798,7 +802,16 @@ impl ExecutionPlan for ParquetExec {
798802
.schema_adapter_factory
799803
.clone()
800804
.unwrap_or_else(|| Arc::new(DefaultSchemaAdapterFactory));
801-
805+
if let Some(dynamic_filter) = &self.dynamic_filters {
806+
let (final_expr, name) =
807+
dynamic_filter.final_predicate(self.predicate.clone());
808+
if final_expr.is_some() {
809+
self.metrics.register(Arc::new(Metric::new(
810+
datafusion_physical_plan::metrics::MetricValue::DynamicFilter(name),
811+
None,
812+
)));
813+
}
814+
}
802815
let opener = ParquetOpener {
803816
partition_index,
804817
projection: Arc::from(projection),
@@ -819,7 +832,8 @@ impl ExecutionPlan for ParquetExec {
819832
};
820833

821834
let stream =
822-
FileStream::new(&self.base_config, partition_index, opener, &self.metrics)?;
835+
FileStream::new(&self.base_config, partition_index, opener, &self.metrics)?
836+
.with_dynamic_filter(self.dynamic_filters.clone());
823837

824838
Ok(Box::pin(stream))
825839
}
@@ -862,8 +876,33 @@ impl ExecutionPlan for ParquetExec {
862876
cache: self.cache.clone(),
863877
table_parquet_options: self.table_parquet_options.clone(),
864878
schema_adapter_factory: self.schema_adapter_factory.clone(),
879+
dynamic_filters: self.dynamic_filters.clone(),
865880
}))
866881
}
882+
883+
fn support_dynamic_filter(&self) -> bool {
884+
true
885+
}
886+
887+
fn with_dynamic_filter(
888+
&self,
889+
dynamic_filters: Option<Arc<DynamicFilterInfo>>,
890+
) -> Result<Option<Arc<dyn ExecutionPlan>>> {
891+
Ok(Some(Arc::new(ParquetExec {
892+
base_config: self.base_config.clone(),
893+
projected_statistics: self.projected_statistics.clone(),
894+
metrics: self.metrics.clone(),
895+
predicate: self.predicate.clone(),
896+
pruning_predicate: self.pruning_predicate.clone(),
897+
page_pruning_predicate: self.page_pruning_predicate.clone(),
898+
metadata_size_hint: self.metadata_size_hint,
899+
parquet_file_reader_factory: self.parquet_file_reader_factory.clone(),
900+
cache: self.cache.clone(),
901+
table_parquet_options: self.table_parquet_options.clone(),
902+
schema_adapter_factory: self.schema_adapter_factory.clone(),
903+
dynamic_filters,
904+
})))
905+
}
867906
}
868907

869908
fn should_enable_page_index(
Lines changed: 116 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,116 @@
1+
// Licensed to the Apache Software Foundation (ASF) under one
2+
// or more contributor license agreements. See the NOTICE file
3+
// distributed with this work for additional information
4+
// regarding copyright ownership. The ASF licenses this file
5+
// to you under the Apache License, Version 2.0 (the
6+
// "License"); you may not use this file except in compliance
7+
// with the License. You may obtain a copy of the License at
8+
//
9+
// http://www.apache.org/licenses/LICENSE-2.0
10+
//
11+
// Unless required by applicable law or agreed to in writing,
12+
// software distributed under the License is distributed on an
13+
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14+
// KIND, either express or implied. See the License for the
15+
// specific language governing permissions and limitations
16+
// under the License.
17+
18+
//! Pushdown the dynamic join filters down to scan execution if there is any
19+
20+
use std::sync::Arc;
21+
22+
use crate::datasource::physical_plan::ParquetExec;
23+
use crate::physical_plan::ExecutionPlan;
24+
use crate::{config::ConfigOptions, error::Result, physical_plan::joins::HashJoinExec};
25+
use datafusion_common::tree_node::{Transformed, TransformedResult};
26+
use datafusion_physical_optimizer::PhysicalOptimizerRule;
27+
use datafusion_physical_plan::joins::DynamicFilterInfo;
28+
29+
/// this rule used for pushing the build side statistic down to probe phase
30+
#[derive(Default, Debug)]
31+
pub struct JoinFilterPushdown {}
32+
33+
impl JoinFilterPushdown {
34+
#[allow(missing_docs)]
35+
pub fn new() -> Self {
36+
Self {}
37+
}
38+
}
39+
40+
impl PhysicalOptimizerRule for JoinFilterPushdown {
41+
fn optimize(
42+
&self,
43+
plan: Arc<dyn ExecutionPlan>,
44+
config: &ConfigOptions,
45+
) -> Result<Arc<dyn ExecutionPlan>> {
46+
if !config.optimizer.dynamic_join_pushdown {
47+
return Ok(plan);
48+
}
49+
optimize_impl(plan, &mut None).data()
50+
}
51+
52+
fn name(&self) -> &str {
53+
"JoinFilterPushdown"
54+
}
55+
56+
fn schema_check(&self) -> bool {
57+
true
58+
}
59+
}
60+
61+
fn optimize_impl(
62+
plan: Arc<dyn ExecutionPlan>,
63+
join_filters: &mut Option<Arc<DynamicFilterInfo>>,
64+
) -> Result<Transformed<Arc<dyn ExecutionPlan>>> {
65+
if let Some(hashjoin_exec) = plan.as_any().downcast_ref::<HashJoinExec>() {
66+
join_filters.clone_from(&hashjoin_exec.dynamic_filters_pushdown);
67+
let new_right = optimize_impl(hashjoin_exec.right.clone(), join_filters)?;
68+
if new_right.transformed {
69+
let new_hash_join = HashJoinExec::try_new(
70+
hashjoin_exec.left().clone(),
71+
new_right.data,
72+
hashjoin_exec.on.clone(),
73+
hashjoin_exec.filter().cloned(),
74+
hashjoin_exec.join_type(),
75+
hashjoin_exec.projection.clone(),
76+
*hashjoin_exec.partition_mode(),
77+
hashjoin_exec.null_equals_null(),
78+
)?
79+
.with_dynamic_filter(hashjoin_exec.dynamic_filters_pushdown.clone())?
80+
.map_or(plan, |f| f);
81+
return Ok(Transformed::yes(new_hash_join));
82+
}
83+
Ok(Transformed::no(plan))
84+
} else if let Some(parquet_exec) = plan.as_any().downcast_ref::<ParquetExec>() {
85+
if let Some(dynamic_filters) = join_filters {
86+
let final_exec = parquet_exec
87+
.clone()
88+
.with_dynamic_filter(Some(dynamic_filters.clone()))?;
89+
if let Some(plan) = final_exec {
90+
return Ok(Transformed::yes(plan));
91+
} else {
92+
return Ok(Transformed::no(plan));
93+
}
94+
}
95+
Ok(Transformed::no(plan))
96+
} else {
97+
let children = plan.children();
98+
let mut new_children = Vec::with_capacity(children.len());
99+
let mut transformed = false;
100+
101+
for child in children {
102+
let new_child = optimize_impl(child.clone(), join_filters)?;
103+
if new_child.transformed {
104+
transformed = true;
105+
}
106+
new_children.push(new_child.data);
107+
}
108+
109+
if transformed {
110+
let new_plan = plan.with_new_children(new_children)?;
111+
Ok(Transformed::yes(new_plan))
112+
} else {
113+
Ok(Transformed::no(plan))
114+
}
115+
}
116+
}

datafusion/core/src/physical_optimizer/join_selection.rs

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -387,7 +387,7 @@ fn try_collect_left(
387387
{
388388
Ok(Some(swap_hash_join(hash_join, PartitionMode::CollectLeft)?))
389389
} else {
390-
Ok(Some(Arc::new(HashJoinExec::try_new(
390+
Ok(HashJoinExec::try_new(
391391
Arc::clone(left),
392392
Arc::clone(right),
393393
hash_join.on().to_vec(),
@@ -396,7 +396,8 @@ fn try_collect_left(
396396
hash_join.projection.clone(),
397397
PartitionMode::CollectLeft,
398398
hash_join.null_equals_null(),
399-
)?)))
399+
)?
400+
.with_dynamic_filter(hash_join.dynamic_filters_pushdown.clone())?)
400401
}
401402
}
402403
(true, false) => Ok(Some(Arc::new(HashJoinExec::try_new(

datafusion/core/src/physical_optimizer/mod.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,8 @@
2424
pub mod coalesce_batches;
2525
pub mod enforce_distribution;
2626
pub mod enforce_sorting;
27+
#[cfg(feature = "parquet")]
28+
pub mod join_filter_pushdown;
2729
pub mod join_selection;
2830
pub mod optimizer;
2931
pub mod projection_pushdown;

datafusion/core/src/physical_optimizer/optimizer.rs

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -17,9 +17,8 @@
1717

1818
//! Physical optimizer traits
1919
20-
use datafusion_physical_optimizer::PhysicalOptimizerRule;
21-
use std::sync::Arc;
22-
20+
#[cfg(feature = "parquet")]
21+
use super::join_filter_pushdown::JoinFilterPushdown;
2322
use super::projection_pushdown::ProjectionPushdown;
2423
use super::update_aggr_exprs::OptimizeAggregateOrder;
2524
use crate::physical_optimizer::aggregate_statistics::AggregateStatistics;
@@ -33,6 +32,8 @@ use crate::physical_optimizer::limited_distinct_aggregation::LimitedDistinctAggr
3332
use crate::physical_optimizer::output_requirements::OutputRequirements;
3433
use crate::physical_optimizer::sanity_checker::SanityCheckPlan;
3534
use crate::physical_optimizer::topk_aggregation::TopKAggregation;
35+
use datafusion_physical_optimizer::PhysicalOptimizerRule;
36+
use std::sync::Arc;
3637

3738
/// A rule-based physical optimizer.
3839
#[derive(Clone, Debug)]
@@ -112,6 +113,8 @@ impl PhysicalOptimizer {
112113
// given query plan; i.e. it only acts as a final
113114
// gatekeeping rule.
114115
Arc::new(SanityCheckPlan::new()),
116+
#[cfg(feature = "parquet")]
117+
Arc::new(JoinFilterPushdown::new()),
115118
];
116119

117120
Self::with_rules(rules)

0 commit comments

Comments
 (0)