Skip to content

Commit 80abc94

Browse files
Pipeline-friendly Bounded Memory Window Executor (#4777)
* Sort Removal rule initial commit * move ordering satisfy to the util * update test and change repartition maintain_input_order impl * simplifications * partition by refactor (#28) * partition by refactor * minor changes * Unnecessary tuple to Range conversion is removed * move transpose under common * Add naive sort removal rule * Add todo for finer Sort removal handling * Refactors to improve readability and reduce nesting * reverse expr returns Option (no need for support check) * fix tests * partition by and order by no longer ends up at the same window group * Bounded window exec * solve merge problems * Refactor to simplify code * Better comments, change method names * resolve merge conflicts * Resolve errors introduced by syncing * remove set_state, make ntile debuggable * remove locked flag * address reviews * address reviews * Resolve merge conflict * address reviews * address reviews * address reviews * Add new tests * Update tests * add support for bounded min max * address reviews * rename sort rule * Resolve merge conflicts * refactors * Update fuzzy tests + minor changes * Simplify code and improve comments * Fix imports, make create_schema more functional * address reviews * undo yml change * minor change to pass from CI * resolve merge conflicts * rename some members * Move rule to physical planning * Minor stylistic/comment changes * Simplify batch-merging utility functions * Remove unnecessary clones, simplify code * update cargo lock file * address reviews * update comments * resolve linter error * Tidy up comments after final review Co-authored-by: Mehmet Ozan Kabak <[email protected]>
1 parent e1dc962 commit 80abc94

31 files changed

+2205
-133
lines changed

.github/workflows/rust.yml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -64,6 +64,7 @@ jobs:
6464
- name: Check Cargo.lock for datafusion-cli
6565
run: |
6666
# If this test fails, try running `cargo update` in the `datafusion-cli` directory
67+
# and check in the updated Cargo.lock file.
6768
cargo check --manifest-path datafusion-cli/Cargo.toml --locked
6869
6970
# test the crate

datafusion-cli/Cargo.lock

Lines changed: 2 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

datafusion/core/Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -75,6 +75,7 @@ flate2 = { version = "1.0.24", optional = true }
7575
futures = "0.3"
7676
glob = "0.3.0"
7777
hashbrown = { version = "0.13", features = ["raw"] }
78+
indexmap = "1.9.2"
7879
itertools = "0.10"
7980
lazy_static = { version = "^1.4.0" }
8081
log = "^0.4"

datafusion/core/src/execution/context.rs

Lines changed: 21 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -1453,44 +1453,43 @@ impl SessionState {
14531453
// We need to take care of the rule ordering. They may influence each other.
14541454
let physical_optimizers: Vec<Arc<dyn PhysicalOptimizerRule + Sync + Send>> = vec![
14551455
Arc::new(AggregateStatistics::new()),
1456-
// - In order to increase the parallelism, it will change the output partitioning
1457-
// of some operators in the plan tree, which will influence other rules.
1458-
// Therefore, it should be run as soon as possible.
1459-
// - The reason to make it optional is
1460-
// - it's not used for the distributed engine, Ballista.
1461-
// - it's conflicted with some parts of the BasicEnforcement, since it will
1462-
// introduce additional repartitioning while the BasicEnforcement aims at
1463-
// reducing unnecessary repartitioning.
1456+
// In order to increase the parallelism, the Repartition rule will change the
1457+
// output partitioning of some operators in the plan tree, which will influence
1458+
// other rules. Therefore, it should run as soon as possible. It is optional because:
1459+
// - It's not used for the distributed engine, Ballista.
1460+
// - It's conflicted with some parts of the BasicEnforcement, since it will
1461+
// introduce additional repartitioning while the BasicEnforcement aims at
1462+
// reducing unnecessary repartitioning.
14641463
Arc::new(Repartition::new()),
1465-
//- Currently it will depend on the partition number to decide whether to change the
1466-
// single node sort to parallel local sort and merge. Therefore, it should be run
1467-
// after the Repartition.
1468-
// - Since it will change the output ordering of some operators, it should be run
1464+
// - Currently it will depend on the partition number to decide whether to change the
1465+
// single node sort to parallel local sort and merge. Therefore, GlobalSortSelection
1466+
// should run after the Repartition.
1467+
// - Since it will change the output ordering of some operators, it should run
14691468
// before JoinSelection and BasicEnforcement, which may depend on that.
14701469
Arc::new(GlobalSortSelection::new()),
1471-
// Statistics-base join selection will change the Auto mode to real join implementation,
1470+
// Statistics-based join selection will change the Auto mode to a real join implementation,
14721471
// like collect left, or hash join, or future sort merge join, which will
14731472
// influence the BasicEnforcement to decide whether to add additional repartition
14741473
// and local sort to meet the distribution and ordering requirements.
1475-
// Therefore, it should be run before BasicEnforcement
1474+
// Therefore, it should run before BasicEnforcement.
14761475
Arc::new(JoinSelection::new()),
14771476
// If the query is processing infinite inputs, the PipelineFixer rule applies the
14781477
// necessary transformations to make the query runnable (if it is not already runnable).
14791478
// If the query can not be made runnable, the rule emits an error with a diagnostic message.
14801479
// Since the transformations it applies may alter output partitioning properties of operators
14811480
// (e.g. by swapping hash join sides), this rule runs before BasicEnforcement.
14821481
Arc::new(PipelineFixer::new()),
1483-
// It's for adding essential repartition and local sorting operator to satisfy the
1484-
// required distribution and local sort.
1482+
// BasicEnforcement is for adding essential repartition and local sorting operators
1483+
// to satisfy the required distribution and local sort requirements.
14851484
// Please make sure that the whole plan tree is determined.
14861485
Arc::new(BasicEnforcement::new()),
1487-
// `BasicEnforcement` stage conservatively inserts `SortExec`s to satisfy ordering requirements.
1488-
// However, a deeper analysis may sometimes reveal that such a `SortExec` is actually unnecessary.
1489-
// These cases typically arise when we have reversible `WindowAggExec`s or deep subqueries. The
1490-
// rule below performs this analysis and removes unnecessary `SortExec`s.
1486+
// The BasicEnforcement stage conservatively inserts sorts to satisfy ordering requirements.
1487+
// However, a deeper analysis may sometimes reveal that such a sort is actually unnecessary.
1488+
// These cases typically arise when we have reversible window expressions or deep subqueries.
1489+
// The rule below performs this analysis and removes unnecessary sorts.
14911490
Arc::new(OptimizeSorts::new()),
1492-
// It will not influence the distribution and ordering of the whole plan tree.
1493-
// Therefore, to avoid influencing other rules, it should be run at last.
1491+
// The CoalesceBatches rule will not influence the distribution and ordering of the
1492+
// whole plan tree. Therefore, to avoid influencing other rules, it should run last.
14941493
Arc::new(CoalesceBatches::new()),
14951494
// The PipelineChecker rule will reject non-runnable query plans that use
14961495
// pipeline-breaking operators on infinite input(s). The rule generates a

datafusion/core/src/physical_optimizer/optimize_sorts.rs

Lines changed: 47 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -33,10 +33,11 @@ use crate::physical_optimizer::utils::{
3333
use crate::physical_optimizer::PhysicalOptimizerRule;
3434
use crate::physical_plan::rewrite::TreeNodeRewritable;
3535
use crate::physical_plan::sorts::sort::SortExec;
36-
use crate::physical_plan::windows::WindowAggExec;
36+
use crate::physical_plan::windows::{BoundedWindowAggExec, WindowAggExec};
3737
use crate::physical_plan::{with_new_children_if_necessary, ExecutionPlan};
3838
use arrow::datatypes::SchemaRef;
3939
use datafusion_common::{reverse_sort_options, DataFusionError};
40+
use datafusion_physical_expr::window::WindowExpr;
4041
use datafusion_physical_expr::{PhysicalExpr, PhysicalSortExpr};
4142
use itertools::izip;
4243
use std::iter::zip;
@@ -181,17 +182,32 @@ fn optimize_sorts(
181182
sort_exec.input().equivalence_properties()
182183
}) {
183184
update_child_to_remove_unnecessary_sort(child, sort_onwards)?;
184-
} else if let Some(window_agg_exec) =
185+
}
186+
// For window expressions, we can remove some sorts when we can
187+
// calculate the result in reverse:
188+
else if let Some(exec) =
185189
requirements.plan.as_any().downcast_ref::<WindowAggExec>()
186190
{
187-
// For window expressions, we can remove some sorts when we can
188-
// calculate the result in reverse:
189-
if let Some(res) = analyze_window_sort_removal(
190-
window_agg_exec,
191+
if let Some(result) = analyze_window_sort_removal(
192+
exec.window_expr(),
193+
&exec.partition_keys,
194+
sort_exec,
195+
sort_onwards,
196+
)? {
197+
return Ok(Some(result));
198+
}
199+
} else if let Some(exec) = requirements
200+
.plan
201+
.as_any()
202+
.downcast_ref::<BoundedWindowAggExec>()
203+
{
204+
if let Some(result) = analyze_window_sort_removal(
205+
exec.window_expr(),
206+
&exec.partition_keys,
191207
sort_exec,
192208
sort_onwards,
193209
)? {
194-
return Ok(Some(res));
210+
return Ok(Some(result));
195211
}
196212
}
197213
// TODO: Once we can ensure that required ordering information propagates with
@@ -273,9 +289,11 @@ fn analyze_immediate_sort_removal(
273289
Ok(None)
274290
}
275291

276-
/// Analyzes a `WindowAggExec` to determine whether it may allow removing a sort.
292+
/// Analyzes a [WindowAggExec] or a [BoundedWindowAggExec] to determine whether
293+
/// it may allow removing a sort.
277294
fn analyze_window_sort_removal(
278-
window_agg_exec: &WindowAggExec,
295+
window_expr: &[Arc<dyn WindowExpr>],
296+
partition_keys: &[Arc<dyn PhysicalExpr>],
279297
sort_exec: &SortExec,
280298
sort_onward: &mut Vec<(usize, Arc<dyn ExecutionPlan>)>,
281299
) -> Result<Option<PlanWithCorrespondingSort>> {
@@ -289,7 +307,6 @@ fn analyze_window_sort_removal(
289307
// If there is no physical ordering, there is no way to remove a sort -- immediately return:
290308
return Ok(None);
291309
};
292-
let window_expr = window_agg_exec.window_expr();
293310
let (can_skip_sorting, should_reverse) = can_skip_sort(
294311
window_expr[0].partition_by(),
295312
required_ordering,
@@ -308,13 +325,26 @@ fn analyze_window_sort_removal(
308325
if let Some(window_expr) = new_window_expr {
309326
let new_child = remove_corresponding_sort_from_sub_plan(sort_onward)?;
310327
let new_schema = new_child.schema();
311-
let new_plan = Arc::new(WindowAggExec::try_new(
312-
window_expr,
313-
new_child,
314-
new_schema,
315-
window_agg_exec.partition_keys.clone(),
316-
Some(physical_ordering.to_vec()),
317-
)?);
328+
329+
let uses_bounded_memory = window_expr.iter().all(|e| e.uses_bounded_memory());
330+
// If all window exprs can run with bounded memory choose bounded window variant
331+
let new_plan = if uses_bounded_memory {
332+
Arc::new(BoundedWindowAggExec::try_new(
333+
window_expr,
334+
new_child,
335+
new_schema,
336+
partition_keys.to_vec(),
337+
Some(physical_ordering.to_vec()),
338+
)?) as _
339+
} else {
340+
Arc::new(WindowAggExec::try_new(
341+
window_expr,
342+
new_child,
343+
new_schema,
344+
partition_keys.to_vec(),
345+
Some(physical_ordering.to_vec()),
346+
)?) as _
347+
};
318348
return Ok(Some(PlanWithCorrespondingSort::new(new_plan)));
319349
}
320350
}

datafusion/core/src/physical_optimizer/pipeline_checker.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -301,7 +301,7 @@ mod sql_tests {
301301
let case = QueryCase {
302302
sql: "SELECT
303303
c9,
304-
SUM(c9) OVER(PARTITION BY c1 ORDER BY c9 ASC ROWS BETWEEN 1 PRECEDING AND 5 FOLLOWING) as sum1
304+
SUM(c9) OVER(PARTITION BY c1 ORDER BY c9 ASC ROWS BETWEEN 1 PRECEDING AND UNBOUNDED FOLLOWING) as sum1
305305
FROM test
306306
LIMIT 5".to_string(),
307307
cases: vec![Arc::new(test1), Arc::new(test2)],
@@ -325,7 +325,7 @@ mod sql_tests {
325325
let case = QueryCase {
326326
sql: "SELECT
327327
c9,
328-
SUM(c9) OVER(ORDER BY c9 ASC ROWS BETWEEN 1 PRECEDING AND 5 FOLLOWING) as sum1
328+
SUM(c9) OVER(ORDER BY c9 ASC ROWS BETWEEN 1 PRECEDING AND UNBOUNDED FOLLOWING) as sum1
329329
FROM test".to_string(),
330330
cases: vec![Arc::new(test1), Arc::new(test2)],
331331
error_operator: "Window Error".to_string()

datafusion/core/src/physical_plan/common.rs

Lines changed: 42 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@ use crate::error::{DataFusionError, Result};
2222
use crate::execution::context::TaskContext;
2323
use crate::physical_plan::metrics::MemTrackingMetrics;
2424
use crate::physical_plan::{displayable, ColumnStatistics, ExecutionPlan, Statistics};
25+
use arrow::compute::concat;
2526
use arrow::datatypes::{Schema, SchemaRef};
2627
use arrow::error::ArrowError;
2728
use arrow::error::Result as ArrowResult;
@@ -95,6 +96,47 @@ pub async fn collect(stream: SendableRecordBatchStream) -> Result<Vec<RecordBatc
9596
.map_err(DataFusionError::from)
9697
}
9798

99+
/// Merge two record batch references into a single record batch.
100+
/// All the record batches inside the slice must have the same schema.
101+
pub fn merge_batches(
102+
first: &RecordBatch,
103+
second: &RecordBatch,
104+
schema: SchemaRef,
105+
) -> ArrowResult<RecordBatch> {
106+
let columns = (0..schema.fields.len())
107+
.map(|index| {
108+
let first_column = first.column(index).as_ref();
109+
let second_column = second.column(index).as_ref();
110+
concat(&[first_column, second_column])
111+
})
112+
.collect::<ArrowResult<Vec<_>>>()?;
113+
RecordBatch::try_new(schema, columns)
114+
}
115+
116+
/// Merge a slice of record batch references into a single record batch, or
117+
/// return None if the slice itself is empty. All the record batches inside the
118+
/// slice must have the same schema.
119+
pub fn merge_multiple_batches(
120+
batches: &[&RecordBatch],
121+
schema: SchemaRef,
122+
) -> ArrowResult<Option<RecordBatch>> {
123+
Ok(if batches.is_empty() {
124+
None
125+
} else {
126+
let columns = (0..schema.fields.len())
127+
.map(|index| {
128+
concat(
129+
&batches
130+
.iter()
131+
.map(|batch| batch.column(index).as_ref())
132+
.collect::<Vec<_>>(),
133+
)
134+
})
135+
.collect::<ArrowResult<Vec<_>>>()?;
136+
Some(RecordBatch::try_new(schema, columns)?)
137+
})
138+
}
139+
98140
/// Recursively builds a list of files in a directory with a given extension
99141
pub fn build_checked_file_list(dir: &str, ext: &str) -> Result<Vec<String>> {
100142
let mut filenames: Vec<String> = Vec::new();

datafusion/core/src/physical_plan/planner.rs

Lines changed: 23 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -47,7 +47,7 @@ use crate::physical_plan::limit::{GlobalLimitExec, LocalLimitExec};
4747
use crate::physical_plan::projection::ProjectionExec;
4848
use crate::physical_plan::repartition::RepartitionExec;
4949
use crate::physical_plan::sorts::sort::SortExec;
50-
use crate::physical_plan::windows::WindowAggExec;
50+
use crate::physical_plan::windows::{BoundedWindowAggExec, WindowAggExec};
5151
use crate::physical_plan::{joins::utils as join_utils, Partitioning};
5252
use crate::physical_plan::{AggregateExpr, ExecutionPlan, PhysicalExpr, WindowExpr};
5353
use crate::{
@@ -614,13 +614,28 @@ impl DefaultPhysicalPlanner {
614614
})
615615
.collect::<Result<Vec<_>>>()?;
616616

617-
Ok(Arc::new(WindowAggExec::try_new(
618-
window_expr,
619-
input_exec,
620-
physical_input_schema,
621-
physical_partition_keys,
622-
physical_sort_keys,
623-
)?))
617+
let uses_bounded_memory = window_expr
618+
.iter()
619+
.all(|e| e.uses_bounded_memory());
620+
// If all window expressions can run with bounded memory,
621+
// choose the bounded window variant:
622+
Ok(if uses_bounded_memory {
623+
Arc::new(BoundedWindowAggExec::try_new(
624+
window_expr,
625+
input_exec,
626+
physical_input_schema,
627+
physical_partition_keys,
628+
physical_sort_keys,
629+
)?)
630+
} else {
631+
Arc::new(WindowAggExec::try_new(
632+
window_expr,
633+
input_exec,
634+
physical_input_schema,
635+
physical_partition_keys,
636+
physical_sort_keys,
637+
)?)
638+
})
624639
}
625640
LogicalPlan::Aggregate(Aggregate {
626641
input,

0 commit comments

Comments
 (0)