Skip to content

Commit 9c24a79

Browse files
authored
[Part3] Partition and Sort Enforcement, Enforcement rule implementation (#4122)
* [Part3] Partition and Sort Enforcement, Enforcement rule implementation * Avoid unncessary CoalescePartitionsExec in HashJoinExec and CrossJoinExec * Fix join key ordering * Fix join key reordering * join key reordering, handle more operators explicitly * Resolve review comments, add more UT to test reorder_join_keys_to_inputs * add length check in fn expected_expr_positions()
1 parent 30813dc commit 9c24a79

File tree

15 files changed

+2467
-199
lines changed

15 files changed

+2467
-199
lines changed

datafusion/core/src/execution/context.rs

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -73,7 +73,6 @@ use crate::optimizer::optimizer::{OptimizerConfig, OptimizerRule};
7373
use datafusion_sql::{ResolvedTableReference, TableReference};
7474

7575
use crate::physical_optimizer::coalesce_batches::CoalesceBatches;
76-
use crate::physical_optimizer::merge_exec::AddCoalescePartitionsExec;
7776
use crate::physical_optimizer::repartition::Repartition;
7877

7978
use crate::config::{
@@ -82,6 +81,7 @@ use crate::config::{
8281
};
8382
use crate::datasource::file_format::file_type::{FileCompressionType, FileType};
8483
use crate::execution::{runtime_env::RuntimeEnv, FunctionRegistry};
84+
use crate::physical_optimizer::enforcement::BasicEnforcement;
8585
use crate::physical_plan::file_format::{plan_to_csv, plan_to_json, plan_to_parquet};
8686
use crate::physical_plan::planner::DefaultPhysicalPlanner;
8787
use crate::physical_plan::udaf::AggregateUDF;
@@ -1227,6 +1227,8 @@ pub struct SessionConfig {
12271227
pub parquet_pruning: bool,
12281228
/// Should DataFusion collect statistics after listing files
12291229
pub collect_statistics: bool,
1230+
/// Should DataFusion optimizer run a top down process to reorder the join keys
1231+
pub top_down_join_key_reordering: bool,
12301232
/// Configuration options
12311233
pub config_options: Arc<RwLock<ConfigOptions>>,
12321234
/// Opaque extensions.
@@ -1246,6 +1248,7 @@ impl Default for SessionConfig {
12461248
repartition_windows: true,
12471249
parquet_pruning: true,
12481250
collect_statistics: false,
1251+
top_down_join_key_reordering: true,
12491252
config_options: Arc::new(RwLock::new(ConfigOptions::new())),
12501253
// Assume no extensions by default.
12511254
extensions: HashMap::with_capacity_and_hasher(
@@ -1568,6 +1571,7 @@ impl SessionState {
15681571
Arc::new(AggregateStatistics::new()),
15691572
Arc::new(HashBuildProbeOrder::new()),
15701573
];
1574+
physical_optimizers.push(Arc::new(BasicEnforcement::new()));
15711575
if config
15721576
.config_options
15731577
.read()
@@ -1585,7 +1589,8 @@ impl SessionState {
15851589
)));
15861590
}
15871591
physical_optimizers.push(Arc::new(Repartition::new()));
1588-
physical_optimizers.push(Arc::new(AddCoalescePartitionsExec::new()));
1592+
physical_optimizers.push(Arc::new(BasicEnforcement::new()));
1593+
// physical_optimizers.push(Arc::new(AddCoalescePartitionsExec::new()));
15891594

15901595
SessionState {
15911596
session_id,

datafusion/core/src/physical_optimizer/coalesce_batches.rs

Lines changed: 11 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,7 @@ use crate::{
2323
physical_optimizer::PhysicalOptimizerRule,
2424
physical_plan::{
2525
coalesce_batches::CoalesceBatchesExec, filter::FilterExec, joins::HashJoinExec,
26-
repartition::RepartitionExec, with_new_children_if_necessary,
26+
repartition::RepartitionExec, rewrite::TreeNodeRewritable,
2727
},
2828
};
2929
use std::sync::Arc;
@@ -48,34 +48,25 @@ impl PhysicalOptimizerRule for CoalesceBatches {
4848
plan: Arc<dyn crate::physical_plan::ExecutionPlan>,
4949
_config: &crate::execution::context::SessionConfig,
5050
) -> Result<Arc<dyn crate::physical_plan::ExecutionPlan>> {
51-
if plan.children().is_empty() {
52-
// leaf node, children cannot be replaced
53-
Ok(plan.clone())
54-
} else {
55-
// recurse down first
56-
let children = plan
57-
.children()
58-
.iter()
59-
.map(|child| self.optimize(child.clone(), _config))
60-
.collect::<Result<Vec<_>>>()?;
61-
let plan = with_new_children_if_necessary(plan, children)?;
51+
let target_batch_size = self.target_batch_size;
52+
plan.transform_up(&|plan| {
53+
let plan_any = plan.as_any();
6254
// The goal here is to detect operators that could produce small batches and only
6355
// wrap those ones with a CoalesceBatchesExec operator. An alternate approach here
6456
// would be to build the coalescing logic directly into the operators
6557
// See https://github.com/apache/arrow-datafusion/issues/139
66-
let plan_any = plan.as_any();
6758
let wrap_in_coalesce = plan_any.downcast_ref::<FilterExec>().is_some()
6859
|| plan_any.downcast_ref::<HashJoinExec>().is_some()
6960
|| plan_any.downcast_ref::<RepartitionExec>().is_some();
70-
Ok(if wrap_in_coalesce {
71-
Arc::new(CoalesceBatchesExec::new(
61+
if wrap_in_coalesce {
62+
Some(Arc::new(CoalesceBatchesExec::new(
7263
plan.clone(),
73-
self.target_batch_size,
74-
))
64+
target_batch_size,
65+
)))
7566
} else {
76-
plan.clone()
77-
})
78-
}
67+
None
68+
}
69+
})
7970
}
8071

8172
fn name(&self) -> &str {

0 commit comments

Comments
 (0)