Skip to content

Commit 1aeb19a

Browse files
committed
fix failed test, add analyze info, fix compile error
1 parent 333c57b commit 1aeb19a

File tree

11 files changed

+60
-20
lines changed

11 files changed

+60
-20
lines changed

datafusion/core/src/datasource/physical_plan/parquet/mod.rs

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -43,6 +43,7 @@ use arrow::datatypes::SchemaRef;
4343
use datafusion_physical_expr::{EquivalenceProperties, LexOrdering, PhysicalExpr};
4444

4545
use datafusion_physical_plan::joins::DynamicFilterInfo;
46+
use datafusion_physical_plan::Metric;
4647
use itertools::Itertools;
4748
use log::debug;
4849

@@ -802,7 +803,15 @@ impl ExecutionPlan for ParquetExec {
802803
.clone()
803804
.unwrap_or_else(|| Arc::new(DefaultSchemaAdapterFactory));
804805
let final_predicate = if let Some(dynamic_filter) = &self.dynamic_filters {
805-
dynamic_filter.final_predicate(self.predicate.clone())
806+
let (final_expr, name) =
807+
dynamic_filter.final_predicate(self.predicate.clone());
808+
if let Some(_) = &final_expr {
809+
self.metrics.register(Arc::new(Metric::new(
810+
datafusion_physical_plan::metrics::MetricValue::DynamicFilter(name),
811+
None,
812+
)));
813+
}
814+
final_expr
806815
} else {
807816
self.predicate.clone()
808817
};

datafusion/core/src/physical_optimizer/join_filter_pushdown.rs

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,6 @@
1919
2020
use std::sync::Arc;
2121

22-
#[cfg(feature = "parquet")]
2322
use crate::datasource::physical_plan::ParquetExec;
2423
use crate::physical_plan::ExecutionPlan;
2524
use crate::{config::ConfigOptions, error::Result, physical_plan::joins::HashJoinExec};

datafusion/core/src/physical_optimizer/mod.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@
2424
pub mod coalesce_batches;
2525
pub mod enforce_distribution;
2626
pub mod enforce_sorting;
27+
#[cfg(feature = "parquet")]
2728
pub mod join_filter_pushdown;
2829
pub mod join_selection;
2930
pub mod optimizer;

datafusion/core/src/physical_optimizer/optimizer.rs

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -17,9 +17,7 @@
1717

1818
//! Physical optimizer traits
1919
20-
use datafusion_physical_optimizer::PhysicalOptimizerRule;
21-
use std::sync::Arc;
22-
20+
#[cfg(feature = "parquet")]
2321
use super::join_filter_pushdown::JoinFilterPushdown;
2422
use super::projection_pushdown::ProjectionPushdown;
2523
use super::update_aggr_exprs::OptimizeAggregateOrder;
@@ -34,6 +32,8 @@ use crate::physical_optimizer::limited_distinct_aggregation::LimitedDistinctAggr
3432
use crate::physical_optimizer::output_requirements::OutputRequirements;
3533
use crate::physical_optimizer::sanity_checker::SanityCheckPlan;
3634
use crate::physical_optimizer::topk_aggregation::TopKAggregation;
35+
use datafusion_physical_optimizer::PhysicalOptimizerRule;
36+
use std::sync::Arc;
3737

3838
/// A rule-based physical optimizer.
3939
#[derive(Clone, Debug)]
@@ -113,6 +113,7 @@ impl PhysicalOptimizer {
113113
// given query plan; i.e. it only acts as a final
114114
// gatekeeping rule.
115115
Arc::new(SanityCheckPlan::new()),
116+
#[cfg(feature = "parquet")]
116117
Arc::new(JoinFilterPushdown::new()),
117118
];
118119

datafusion/core/src/physical_planner.rs

Lines changed: 3 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -78,8 +78,7 @@ use datafusion_expr::expr::{
7878
use datafusion_expr::expr_rewriter::unnormalize_cols;
7979
use datafusion_expr::logical_plan::builder::wrap_projection_for_join_if_necessary;
8080
use datafusion_expr::{
81-
DescribeTable, DmlStatement, Extension, Filter, JoinType, RecursiveQuery, SortExpr,
82-
StringifiedPlan, WindowFrame, WindowFrameBound, WriteOp,
81+
DescribeTable, DmlStatement, Extension, Filter, JoinType, RecursiveQuery, SortExpr, StringifiedPlan, WindowFrame, WindowFrameBound, WriteOp
8382
};
8483
use datafusion_physical_expr::aggregate::{AggregateExprBuilder, AggregateFunctionExpr};
8584
use datafusion_physical_expr::expressions::{Column, Literal};
@@ -1110,17 +1109,16 @@ impl DefaultPhysicalPlanner {
11101109
null_equals_null,
11111110
)?)
11121111
};
1113-
11141112
// build dynamic filter
1115-
if join.support_dynamic_filter() {
1113+
if join.support_dynamic_filter() && dynamic_pushdown_columns.as_ref().is_some_and(|columns| !columns.is_empty()) {
11161114
let physical_dynamic_filter_info: Option<Arc<DynamicFilterInfo>> =
11171115
if let Some(dynamic_columns) = dynamic_pushdown_columns {
11181116
let columns_and_types_and_names: Vec<(Arc<Column>, String)> =
11191117
dynamic_columns
11201118
.iter()
11211119
.map(|dynamic_column| {
11221120
let column = dynamic_column.column();
1123-
let index = join_schema.index_of_column(column)?;
1121+
let index = join.schema().index_of(column.name())?;
11241122
let physical_column = Arc::new(
11251123
datafusion_physical_expr::expressions::Column::new(
11261124
&column.name,

datafusion/optimizer/src/join_filter_pushdown.rs

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -47,7 +47,9 @@ impl OptimizerRule for JoinFilterPushdown {
4747
plan: LogicalPlan,
4848
config: &dyn OptimizerConfig,
4949
) -> Result<Transformed<LogicalPlan>, DataFusionError> {
50-
if !config.options().optimizer.dynamic_join_pushdown {
50+
if !config.options().optimizer.dynamic_join_pushdown
51+
//|| !config.options().execution.parquet.pushdown_filters
52+
{
5153
return Ok(Transformed::no(plan));
5254
}
5355

@@ -116,7 +118,11 @@ impl OptimizerRule for JoinFilterPushdown {
116118
fn unsupported_join_type(join_type: &JoinType) -> bool {
117119
matches!(
118120
join_type,
119-
JoinType::Left | JoinType::RightSemi | JoinType::RightAnti
121+
JoinType::Left
122+
| JoinType::RightSemi
123+
| JoinType::RightAnti
124+
| JoinType::LeftSemi
125+
| JoinType::LeftAnti
120126
)
121127
}
122128

datafusion/physical-plan/src/joins/dynamic_filters.rs

Lines changed: 15 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -230,19 +230,26 @@ impl DynamicFilterInfo {
230230
pub fn final_predicate(
231231
&self,
232232
predicate: Option<Arc<dyn PhysicalExpr>>,
233-
) -> Option<Arc<dyn PhysicalExpr>> {
233+
) -> (Option<Arc<dyn PhysicalExpr>>, String) {
234234
let inner = self.inner.lock();
235235

236-
match (inner.final_expr.clone(), predicate) {
237-
(Some(self_expr), Some(input_expr)) => Some(Arc::new(BinaryExpr::new(
238-
self_expr,
239-
Operator::And,
240-
input_expr,
241-
))),
236+
let result = match (inner.final_expr.clone(), predicate) {
237+
(Some(self_expr), Some(input_expr)) => {
238+
Some(
239+
Arc::new(BinaryExpr::new(self_expr, Operator::And, input_expr))
240+
as Arc<dyn PhysicalExpr>,
241+
)
242+
}
242243
(Some(self_expr), None) => Some(self_expr),
243244
(None, Some(input_expr)) => Some(input_expr),
244245
(None, None) => None,
245-
}
246+
};
247+
let debug_info = inner
248+
.final_expr
249+
.as_ref()
250+
.map(|expr| format!("{}", expr))
251+
.map_or("".to_string(), |name| name);
252+
(result, debug_info)
246253
}
247254

248255
// used for adding partition numbers

datafusion/physical-plan/src/metrics/mod.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -263,6 +263,7 @@ impl MetricsSet {
263263
MetricValue::Gauge { name, .. } => name == metric_name,
264264
MetricValue::StartTimestamp(_) => false,
265265
MetricValue::EndTimestamp(_) => false,
266+
MetricValue::DynamicFilter(_) => false,
266267
})
267268
}
268269

datafusion/physical-plan/src/metrics/value.rs

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -400,6 +400,9 @@ pub enum MetricValue {
400400
StartTimestamp(Timestamp),
401401
/// The time at which execution ended
402402
EndTimestamp(Timestamp),
403+
404+
/// Dynamic filters
405+
DynamicFilter(String),
403406
}
404407

405408
impl MetricValue {
@@ -417,6 +420,7 @@ impl MetricValue {
417420
Self::Time { name, .. } => name.borrow(),
418421
Self::StartTimestamp(_) => "start_timestamp",
419422
Self::EndTimestamp(_) => "end_timestamp",
423+
Self::DynamicFilter(_) => "dynamic_filters",
420424
}
421425
}
422426

@@ -442,6 +446,7 @@ impl MetricValue {
442446
.and_then(|ts| ts.timestamp_nanos_opt())
443447
.map(|nanos| nanos as usize)
444448
.unwrap_or(0),
449+
Self::DynamicFilter(_) => 1,
445450
}
446451
}
447452

@@ -469,6 +474,7 @@ impl MetricValue {
469474
},
470475
Self::StartTimestamp(_) => Self::StartTimestamp(Timestamp::new()),
471476
Self::EndTimestamp(_) => Self::EndTimestamp(Timestamp::new()),
477+
Self::DynamicFilter(name) => Self::DynamicFilter(name.clone()),
472478
}
473479
}
474480

@@ -515,6 +521,7 @@ impl MetricValue {
515521
(Self::EndTimestamp(timestamp), Self::EndTimestamp(other_timestamp)) => {
516522
timestamp.update_to_max(other_timestamp);
517523
}
524+
(Self::DynamicFilter(_), _) => {}
518525
m @ (_, _) => {
519526
panic!(
520527
"Mismatched metric types. Can not aggregate {:?} with value {:?}",
@@ -539,6 +546,7 @@ impl MetricValue {
539546
Self::Time { .. } => 8,
540547
Self::StartTimestamp(_) => 9, // show timestamps last
541548
Self::EndTimestamp(_) => 10,
549+
Self::DynamicFilter(_) => 11,
542550
}
543551
}
544552

@@ -574,6 +582,9 @@ impl Display for MetricValue {
574582
Self::StartTimestamp(timestamp) | Self::EndTimestamp(timestamp) => {
575583
write!(f, "{timestamp}")
576584
}
585+
Self::DynamicFilter(filter) => {
586+
write!(f, "dynamic_filter: {filter}")
587+
}
577588
}
578589
}
579590
}

datafusion/sqllogictest/test_files/explain.slt

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -207,6 +207,7 @@ logical_plan after unwrap_cast_in_comparison SAME TEXT AS ABOVE
207207
logical_plan after common_sub_expression_eliminate SAME TEXT AS ABOVE
208208
logical_plan after eliminate_group_by_constant SAME TEXT AS ABOVE
209209
logical_plan after optimize_projections TableScan: simple_explain_test projection=[a, b, c]
210+
logical_plan after join_filter_pushdown SAME TEXT AS ABOVE
210211
logical_plan after eliminate_nested_union SAME TEXT AS ABOVE
211212
logical_plan after simplify_expressions SAME TEXT AS ABOVE
212213
logical_plan after unwrap_cast_in_comparison SAME TEXT AS ABOVE
@@ -234,6 +235,7 @@ logical_plan after unwrap_cast_in_comparison SAME TEXT AS ABOVE
234235
logical_plan after common_sub_expression_eliminate SAME TEXT AS ABOVE
235236
logical_plan after eliminate_group_by_constant SAME TEXT AS ABOVE
236237
logical_plan after optimize_projections SAME TEXT AS ABOVE
238+
logical_plan after join_filter_pushdown SAME TEXT AS ABOVE
237239
logical_plan TableScan: simple_explain_test projection=[a, b, c]
238240
initial_physical_plan CsvExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/core/tests/data/example.csv]]}, projection=[a, b, c], has_header=true
239241
initial_physical_plan_with_stats CsvExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/core/tests/data/example.csv]]}, projection=[a, b, c], has_header=true, statistics=[Rows=Absent, Bytes=Absent, [(Col[0]:),(Col[1]:),(Col[2]:)]]
@@ -255,6 +257,7 @@ physical_plan after LimitAggregation SAME TEXT AS ABOVE
255257
physical_plan after ProjectionPushdown SAME TEXT AS ABOVE
256258
physical_plan after LimitPushdown SAME TEXT AS ABOVE
257259
physical_plan after SanityCheckPlan SAME TEXT AS ABOVE
260+
physical_plan after JoinFilterPushdown SAME TEXT AS ABOVE
258261
physical_plan CsvExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/core/tests/data/example.csv]]}, projection=[a, b, c], has_header=true
259262
physical_plan_with_stats CsvExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/core/tests/data/example.csv]]}, projection=[a, b, c], has_header=true, statistics=[Rows=Absent, Bytes=Absent, [(Col[0]:),(Col[1]:),(Col[2]:)]]
260263
physical_plan_with_schema CsvExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/core/tests/data/example.csv]]}, projection=[a, b, c], has_header=true, schema=[a:Int32;N, b:Int32;N, c:Int32;N]
@@ -331,6 +334,7 @@ physical_plan after LimitAggregation SAME TEXT AS ABOVE
331334
physical_plan after ProjectionPushdown SAME TEXT AS ABOVE
332335
physical_plan after LimitPushdown ParquetExec: file_groups={1 group: [[WORKSPACE_ROOT/parquet-testing/data/alltypes_plain.parquet]]}, projection=[id, bool_col, tinyint_col, smallint_col, int_col, bigint_col, float_col, double_col, date_string_col, string_col, timestamp_col], limit=10, statistics=[Rows=Exact(8), Bytes=Absent, [(Col[0]:),(Col[1]:),(Col[2]:),(Col[3]:),(Col[4]:),(Col[5]:),(Col[6]:),(Col[7]:),(Col[8]:),(Col[9]:),(Col[10]:)]]
333336
physical_plan after SanityCheckPlan SAME TEXT AS ABOVE
337+
physical_plan after JoinFilterPushdown SAME TEXT AS ABOVE
334338
physical_plan ParquetExec: file_groups={1 group: [[WORKSPACE_ROOT/parquet-testing/data/alltypes_plain.parquet]]}, projection=[id, bool_col, tinyint_col, smallint_col, int_col, bigint_col, float_col, double_col, date_string_col, string_col, timestamp_col], limit=10, statistics=[Rows=Exact(8), Bytes=Absent, [(Col[0]:),(Col[1]:),(Col[2]:),(Col[3]:),(Col[4]:),(Col[5]:),(Col[6]:),(Col[7]:),(Col[8]:),(Col[9]:),(Col[10]:)]]
335339
physical_plan_with_schema ParquetExec: file_groups={1 group: [[WORKSPACE_ROOT/parquet-testing/data/alltypes_plain.parquet]]}, projection=[id, bool_col, tinyint_col, smallint_col, int_col, bigint_col, float_col, double_col, date_string_col, string_col, timestamp_col], limit=10, schema=[id:Int32;N, bool_col:Boolean;N, tinyint_col:Int32;N, smallint_col:Int32;N, int_col:Int32;N, bigint_col:Int64;N, float_col:Float32;N, double_col:Float64;N, date_string_col:Binary;N, string_col:Binary;N, timestamp_col:Timestamp(Nanosecond, None);N]
336340

@@ -371,6 +375,7 @@ physical_plan after LimitAggregation SAME TEXT AS ABOVE
371375
physical_plan after ProjectionPushdown SAME TEXT AS ABOVE
372376
physical_plan after LimitPushdown ParquetExec: file_groups={1 group: [[WORKSPACE_ROOT/parquet-testing/data/alltypes_plain.parquet]]}, projection=[id, bool_col, tinyint_col, smallint_col, int_col, bigint_col, float_col, double_col, date_string_col, string_col, timestamp_col], limit=10
373377
physical_plan after SanityCheckPlan SAME TEXT AS ABOVE
378+
physical_plan after JoinFilterPushdown SAME TEXT AS ABOVE
374379
physical_plan ParquetExec: file_groups={1 group: [[WORKSPACE_ROOT/parquet-testing/data/alltypes_plain.parquet]]}, projection=[id, bool_col, tinyint_col, smallint_col, int_col, bigint_col, float_col, double_col, date_string_col, string_col, timestamp_col], limit=10
375380
physical_plan_with_stats ParquetExec: file_groups={1 group: [[WORKSPACE_ROOT/parquet-testing/data/alltypes_plain.parquet]]}, projection=[id, bool_col, tinyint_col, smallint_col, int_col, bigint_col, float_col, double_col, date_string_col, string_col, timestamp_col], limit=10, statistics=[Rows=Exact(8), Bytes=Absent, [(Col[0]:),(Col[1]:),(Col[2]:),(Col[3]:),(Col[4]:),(Col[5]:),(Col[6]:),(Col[7]:),(Col[8]:),(Col[9]:),(Col[10]:)]]
376381
physical_plan_with_schema ParquetExec: file_groups={1 group: [[WORKSPACE_ROOT/parquet-testing/data/alltypes_plain.parquet]]}, projection=[id, bool_col, tinyint_col, smallint_col, int_col, bigint_col, float_col, double_col, date_string_col, string_col, timestamp_col], limit=10, schema=[id:Int32;N, bool_col:Boolean;N, tinyint_col:Int32;N, smallint_col:Int32;N, int_col:Int32;N, bigint_col:Int64;N, float_col:Float32;N, double_col:Float64;N, date_string_col:Binary;N, string_col:Binary;N, timestamp_col:Timestamp(Nanosecond, None);N]

datafusion/sqllogictest/test_files/information_schema.slt

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -223,6 +223,7 @@ datafusion.explain.show_sizes true
223223
datafusion.explain.show_statistics false
224224
datafusion.optimizer.allow_symmetric_joins_without_pruning true
225225
datafusion.optimizer.default_filter_selectivity 20
226+
datafusion.optimizer.dynamic_join_pushdown true
226227
datafusion.optimizer.enable_distinct_aggregation_soft_limit true
227228
datafusion.optimizer.enable_round_robin_repartition true
228229
datafusion.optimizer.enable_topk_aggregation true
@@ -314,6 +315,7 @@ datafusion.explain.show_sizes true When set to true, the explain statement will
314315
datafusion.explain.show_statistics false When set to true, the explain statement will print operator statistics for physical plans
315316
datafusion.optimizer.allow_symmetric_joins_without_pruning true Should DataFusion allow symmetric hash joins for unbounded data sources even when its inputs do not have any ordering or filtering If the flag is not enabled, the SymmetricHashJoin operator will be unable to prune its internal buffers, resulting in certain join types - such as Full, Left, LeftAnti, LeftSemi, Right, RightAnti, and RightSemi - being produced only at the end of the execution. This is not typical in stream processing. Additionally, without proper design for long runner execution, all types of joins may encounter out-of-memory errors.
316317
datafusion.optimizer.default_filter_selectivity 20 The default filter selectivity used by Filter Statistics when an exact selectivity cannot be determined. Valid values are between 0 (no selectivity) and 100 (all rows are selected).
318+
datafusion.optimizer.dynamic_join_pushdown true when set to true, datafusion would try to push the build side statistic to probe phase
317319
datafusion.optimizer.enable_distinct_aggregation_soft_limit true When set to true, the optimizer will push a limit operation into grouped aggregations which have no aggregate expressions, as a soft limit, emitting groups once the limit is reached, before all rows in the group are read.
318320
datafusion.optimizer.enable_round_robin_repartition true When set to true, the physical plan optimizer will try to add round robin repartitioning to increase parallelism to leverage more CPU cores
319321
datafusion.optimizer.enable_topk_aggregation true When set to true, the optimizer will attempt to perform limit operations during aggregations, if possible

0 commit comments

Comments
 (0)