Skip to content

Commit a31b44e

Browse files
authored
Teach optimizer that CoalesceBatchesExec does not destroy sort order (apache#4332)
1 parent dafd957 commit a31b44e

File tree

2 files changed

+40
-2
lines changed

2 files changed

+40
-2
lines changed

datafusion/core/src/physical_optimizer/enforcement.rs

Lines changed: 38 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1027,7 +1027,9 @@ impl TreeNodeRewritable for PlanWithKeyRequirements {
10271027

10281028
#[cfg(test)]
10291029
mod tests {
1030+
use crate::physical_plan::coalesce_batches::CoalesceBatchesExec;
10301031
use crate::physical_plan::filter::FilterExec;
1032+
use crate::physical_plan::sorts::sort_preserving_merge::SortPreservingMergeExec;
10311033
use arrow::compute::SortOptions;
10321034
use arrow::datatypes::{DataType, Field, Schema, SchemaRef};
10331035
use datafusion_expr::logical_plan::JoinType;
@@ -1064,6 +1066,12 @@ mod tests {
10641066
}
10651067

10661068
fn parquet_exec() -> Arc<ParquetExec> {
1069+
parquet_exec_with_sort(None)
1070+
}
1071+
1072+
fn parquet_exec_with_sort(
1073+
output_ordering: Option<Vec<PhysicalSortExpr>>,
1074+
) -> Arc<ParquetExec> {
10671075
Arc::new(ParquetExec::new(
10681076
FileScanConfig {
10691077
object_store_url: ObjectStoreUrl::parse("test:///").unwrap(),
@@ -1074,7 +1082,7 @@ mod tests {
10741082
limit: None,
10751083
table_partition_cols: vec![],
10761084
config_options: ConfigOptions::new().into_shareable(),
1077-
output_ordering: None,
1085+
output_ordering,
10781086
},
10791087
None,
10801088
None,
@@ -2139,4 +2147,33 @@ mod tests {
21392147
assert_optimized!(expected, join);
21402148
Ok(())
21412149
}
2150+
2151+
#[test]
2152+
fn merge_does_not_need_sort() -> Result<()> {
2153+
// see https://github.com/apache/arrow-datafusion/issues/4331
2154+
let schema = schema();
2155+
let sort_key = vec![PhysicalSortExpr {
2156+
expr: col("a", &schema).unwrap(),
2157+
options: SortOptions::default(),
2158+
}];
2159+
2160+
// Scan some sorted parquet files
2161+
let exec = parquet_exec_with_sort(Some(sort_key.clone()));
2162+
2163+
// CoalesceBatchesExec to mimic behavior after a filter
2164+
let exec = Arc::new(CoalesceBatchesExec::new(exec, 4096));
2165+
2166+
// Merge from multiple parquet files and keep the data sorted
2167+
let exec = Arc::new(SortPreservingMergeExec::new(sort_key, exec));
2168+
2169+
// The optimizer should not add an additional SortExec as the
2170+
// data is already sorted
2171+
let expected = &[
2172+
"SortPreservingMergeExec: [a@0 ASC]",
2173+
"CoalesceBatchesExec: target_batch_size=4096",
2174+
"ParquetExec: limit=None, partitions=[x], output_ordering=[a@0 ASC], projection=[a, b, c, d, e]",
2175+
];
2176+
assert_optimized!(expected, exec);
2177+
Ok(())
2178+
}
21422179
}

datafusion/core/src/physical_plan/coalesce_batches.rs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -97,7 +97,8 @@ impl ExecutionPlan for CoalesceBatchesExec {
9797
}
9898

9999
fn output_ordering(&self) -> Option<&[PhysicalSortExpr]> {
100-
None
100+
// The coalesce batches operator does not make any changes to the sorting of its input
101+
self.input.output_ordering()
101102
}
102103

103104
fn equivalence_properties(&self) -> EquivalenceProperties {

0 commit comments

Comments
 (0)