Skip to content

Commit 55730dc

Browse files
Feat: Add fetch to CoalescePartitionsExec (#14499)
* add fetch info to CoalescePartitionsExec * use Statistics with_fetch API on CoalescePartitionsExec * check limit_reached only if fetch is assigned
1 parent 62e23a2 commit 55730dc

File tree

10 files changed

+128
-83
lines changed

10 files changed

+128
-83
lines changed

datafusion/core/tests/physical_optimizer/limit_pushdown.rs

Lines changed: 9 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -233,12 +233,11 @@ fn transforms_coalesce_batches_exec_into_fetching_version_and_removes_local_limi
233233
LimitPushdown::new().optimize(global_limit, &ConfigOptions::new())?;
234234

235235
let expected = [
236-
"GlobalLimitExec: skip=0, fetch=5",
237-
" CoalescePartitionsExec",
238-
" CoalesceBatchesExec: target_batch_size=8192, fetch=5",
239-
" FilterExec: c3@2 > 0",
240-
" RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1",
241-
" StreamingTableExec: partition_sizes=1, projection=[c1, c2, c3], infinite_source=true"
236+
"CoalescePartitionsExec: fetch=5",
237+
" CoalesceBatchesExec: target_batch_size=8192, fetch=5",
238+
" FilterExec: c3@2 > 0",
239+
" RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1",
240+
" StreamingTableExec: partition_sizes=1, projection=[c1, c2, c3], infinite_source=true"
242241
];
243242
assert_eq!(get_plan_string(&after_optimize), expected);
244243

@@ -378,11 +377,10 @@ fn keeps_pushed_local_limit_exec_when_there_are_multiple_input_partitions() -> R
378377
LimitPushdown::new().optimize(global_limit, &ConfigOptions::new())?;
379378

380379
let expected = [
381-
"GlobalLimitExec: skip=0, fetch=5",
382-
" CoalescePartitionsExec",
383-
" FilterExec: c3@2 > 0",
384-
" RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1",
385-
" StreamingTableExec: partition_sizes=1, projection=[c1, c2, c3], infinite_source=true"
380+
"CoalescePartitionsExec: fetch=5",
381+
" FilterExec: c3@2 > 0",
382+
" RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1",
383+
" StreamingTableExec: partition_sizes=1, projection=[c1, c2, c3], infinite_source=true"
386384
];
387385
assert_eq!(get_plan_string(&after_optimize), expected);
388386

datafusion/core/tests/sql/explain_analyze.rs

Lines changed: 0 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -69,11 +69,6 @@ async fn explain_analyze_baseline_metrics() {
6969
"FilterExec: c13@1 != C2GT5KVyOPZpgKVl110TyZO0NcJ434",
7070
"metrics=[output_rows=99, elapsed_compute="
7171
);
72-
assert_metrics!(
73-
&formatted,
74-
"GlobalLimitExec: skip=0, fetch=3, ",
75-
"metrics=[output_rows=3, elapsed_compute="
76-
);
7772
assert_metrics!(
7873
&formatted,
7974
"ProjectionExec: expr=[count(*)",
@@ -101,9 +96,7 @@ async fn explain_analyze_baseline_metrics() {
10196

10297
plan.as_any().downcast_ref::<sorts::sort::SortExec>().is_some()
10398
|| plan.as_any().downcast_ref::<physical_plan::aggregates::AggregateExec>().is_some()
104-
// CoalescePartitionsExec doesn't do any work so is not included
10599
|| plan.as_any().downcast_ref::<physical_plan::filter::FilterExec>().is_some()
106-
|| plan.as_any().downcast_ref::<physical_plan::limit::GlobalLimitExec>().is_some()
107100
|| plan.as_any().downcast_ref::<physical_plan::limit::LocalLimitExec>().is_some()
108101
|| plan.as_any().downcast_ref::<physical_plan::projection::ProjectionExec>().is_some()
109102
|| plan.as_any().downcast_ref::<physical_plan::coalesce_batches::CoalesceBatchesExec>().is_some()

datafusion/physical-optimizer/src/limit_pushdown.rs

Lines changed: 0 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -146,15 +146,6 @@ pub fn pushdown_limit_helper(
146146
global_state.skip = skip;
147147
global_state.fetch = fetch;
148148

149-
if limit_exec.input().as_any().is::<CoalescePartitionsExec>() {
150-
// If the child is a `CoalescePartitionsExec`, we should not remove the limit
151-
// the push_down through the `CoalescePartitionsExec` to each partition will not guarantee the limit.
152-
// TODO: we may have a better solution if we can support with_fetch for limit inside CoalescePartitionsExec.
153-
// Follow-up issue: https://github.com/apache/datafusion/issues/14446
154-
global_state.satisfied = true;
155-
return Ok((Transformed::no(pushdown_plan), global_state));
156-
}
157-
158149
// Now the global state has the most recent information, we can remove
159150
// the `LimitExec` plan. We will decide later if we should add it again
160151
// or not.

datafusion/physical-plan/src/coalesce_partitions.rs

Lines changed: 40 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -43,6 +43,8 @@ pub struct CoalescePartitionsExec {
4343
/// Execution metrics
4444
metrics: ExecutionPlanMetricsSet,
4545
cache: PlanProperties,
46+
/// Optional number of rows to fetch. Stops producing rows after this fetch
47+
pub(crate) fetch: Option<usize>,
4648
}
4749

4850
impl CoalescePartitionsExec {
@@ -53,6 +55,7 @@ impl CoalescePartitionsExec {
5355
input,
5456
metrics: ExecutionPlanMetricsSet::new(),
5557
cache,
58+
fetch: None,
5659
}
5760
}
5861

@@ -83,9 +86,12 @@ impl DisplayAs for CoalescePartitionsExec {
8386
f: &mut std::fmt::Formatter,
8487
) -> std::fmt::Result {
8588
match t {
86-
DisplayFormatType::Default | DisplayFormatType::Verbose => {
87-
write!(f, "CoalescePartitionsExec")
88-
}
89+
DisplayFormatType::Default | DisplayFormatType::Verbose => match self.fetch {
90+
Some(fetch) => {
91+
write!(f, "CoalescePartitionsExec: fetch={fetch}")
92+
}
93+
None => write!(f, "CoalescePartitionsExec"),
94+
},
8995
}
9096
}
9197
}
@@ -116,9 +122,9 @@ impl ExecutionPlan for CoalescePartitionsExec {
116122
self: Arc<Self>,
117123
children: Vec<Arc<dyn ExecutionPlan>>,
118124
) -> Result<Arc<dyn ExecutionPlan>> {
119-
Ok(Arc::new(CoalescePartitionsExec::new(Arc::clone(
120-
&children[0],
121-
))))
125+
let mut plan = CoalescePartitionsExec::new(Arc::clone(&children[0]));
126+
plan.fetch = self.fetch;
127+
Ok(Arc::new(plan))
122128
}
123129

124130
fn execute(
@@ -164,7 +170,11 @@ impl ExecutionPlan for CoalescePartitionsExec {
164170
}
165171

166172
let stream = builder.build();
167-
Ok(Box::pin(ObservedStream::new(stream, baseline_metrics)))
173+
Ok(Box::pin(ObservedStream::new(
174+
stream,
175+
baseline_metrics,
176+
self.fetch,
177+
)))
168178
}
169179
}
170180
}
@@ -174,7 +184,7 @@ impl ExecutionPlan for CoalescePartitionsExec {
174184
}
175185

176186
fn statistics(&self) -> Result<Statistics> {
177-
self.input.statistics()
187+
Statistics::with_fetch(self.input.statistics()?, self.schema(), self.fetch, 0, 1)
178188
}
179189

180190
fn supports_limit_pushdown(&self) -> bool {
@@ -197,8 +207,28 @@ impl ExecutionPlan for CoalescePartitionsExec {
197207
return Ok(None);
198208
}
199209
// CoalescePartitionsExec always has a single child, so zero indexing is safe.
200-
make_with_child(projection, projection.input().children()[0])
201-
.map(|e| Some(Arc::new(CoalescePartitionsExec::new(e)) as _))
210+
make_with_child(projection, projection.input().children()[0]).map(|e| {
211+
if self.fetch.is_some() {
212+
let mut plan = CoalescePartitionsExec::new(e);
213+
plan.fetch = self.fetch;
214+
Some(Arc::new(plan) as _)
215+
} else {
216+
Some(Arc::new(CoalescePartitionsExec::new(e)) as _)
217+
}
218+
})
219+
}
220+
221+
fn fetch(&self) -> Option<usize> {
222+
self.fetch
223+
}
224+
225+
fn with_fetch(&self, limit: Option<usize>) -> Option<Arc<dyn ExecutionPlan>> {
226+
Some(Arc::new(CoalescePartitionsExec {
227+
input: Arc::clone(&self.input),
228+
fetch: limit,
229+
metrics: self.metrics.clone(),
230+
cache: self.cache.clone(),
231+
}))
202232
}
203233
}
204234

datafusion/physical-plan/src/stream.rs

Lines changed: 30 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -444,18 +444,44 @@ impl Stream for EmptyRecordBatchStream {
444444
pub(crate) struct ObservedStream {
445445
inner: SendableRecordBatchStream,
446446
baseline_metrics: BaselineMetrics,
447+
fetch: Option<usize>,
448+
produced: usize,
447449
}
448450

449451
impl ObservedStream {
450452
pub fn new(
451453
inner: SendableRecordBatchStream,
452454
baseline_metrics: BaselineMetrics,
455+
fetch: Option<usize>,
453456
) -> Self {
454457
Self {
455458
inner,
456459
baseline_metrics,
460+
fetch,
461+
produced: 0,
457462
}
458463
}
464+
465+
fn limit_reached(
466+
&mut self,
467+
poll: Poll<Option<Result<RecordBatch>>>,
468+
) -> Poll<Option<Result<RecordBatch>>> {
469+
let Some(fetch) = self.fetch else { return poll };
470+
471+
if self.produced >= fetch {
472+
return Poll::Ready(None);
473+
}
474+
475+
if let Poll::Ready(Some(Ok(batch))) = &poll {
476+
if self.produced + batch.num_rows() > fetch {
477+
let batch = batch.slice(0, fetch.saturating_sub(self.produced));
478+
self.produced += batch.num_rows();
479+
return Poll::Ready(Some(Ok(batch)));
480+
};
481+
self.produced += batch.num_rows()
482+
}
483+
poll
484+
}
459485
}
460486

461487
impl RecordBatchStream for ObservedStream {
@@ -471,7 +497,10 @@ impl Stream for ObservedStream {
471497
mut self: Pin<&mut Self>,
472498
cx: &mut Context<'_>,
473499
) -> Poll<Option<Self::Item>> {
474-
let poll = self.inner.poll_next_unpin(cx);
500+
let mut poll = self.inner.poll_next_unpin(cx);
501+
if self.fetch.is_some() {
502+
poll = self.limit_reached(poll);
503+
}
475504
self.baseline_metrics.record_poll(poll)
476505
}
477506
}

datafusion/physical-plan/src/union.rs

Lines changed: 10 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -237,7 +237,11 @@ impl ExecutionPlan for UnionExec {
237237
if partition < input.output_partitioning().partition_count() {
238238
let stream = input.execute(partition, context)?;
239239
debug!("Found a Union partition to execute");
240-
return Ok(Box::pin(ObservedStream::new(stream, baseline_metrics)));
240+
return Ok(Box::pin(ObservedStream::new(
241+
stream,
242+
baseline_metrics,
243+
None,
244+
)));
241245
} else {
242246
partition -= input.output_partitioning().partition_count();
243247
}
@@ -448,7 +452,11 @@ impl ExecutionPlan for InterleaveExec {
448452
self.schema(),
449453
input_stream_vec,
450454
));
451-
return Ok(Box::pin(ObservedStream::new(stream, baseline_metrics)));
455+
return Ok(Box::pin(ObservedStream::new(
456+
stream,
457+
baseline_metrics,
458+
None,
459+
)));
452460
}
453461

454462
warn!("Error in InterleaveExec: Partition {} not found", partition);

datafusion/sqllogictest/test_files/aggregate.slt

Lines changed: 11 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -5032,18 +5032,17 @@ logical_plan
50325032
03)----Aggregate: groupBy=[[aggregate_test_100.c3]], aggr=[[min(aggregate_test_100.c1)]]
50335033
04)------TableScan: aggregate_test_100 projection=[c1, c3]
50345034
physical_plan
5035-
01)GlobalLimitExec: skip=0, fetch=5
5036-
02)--CoalescePartitionsExec
5037-
03)----AggregateExec: mode=FinalPartitioned, gby=[c3@0 as c3, min(aggregate_test_100.c1)@1 as min(aggregate_test_100.c1)], aggr=[], lim=[5]
5038-
04)------CoalesceBatchesExec: target_batch_size=8192
5039-
05)--------RepartitionExec: partitioning=Hash([c3@0, min(aggregate_test_100.c1)@1], 4), input_partitions=4
5040-
06)----------AggregateExec: mode=Partial, gby=[c3@0 as c3, min(aggregate_test_100.c1)@1 as min(aggregate_test_100.c1)], aggr=[], lim=[5]
5041-
07)------------AggregateExec: mode=FinalPartitioned, gby=[c3@0 as c3], aggr=[min(aggregate_test_100.c1)]
5042-
08)--------------CoalesceBatchesExec: target_batch_size=8192
5043-
09)----------------RepartitionExec: partitioning=Hash([c3@0], 4), input_partitions=4
5044-
10)------------------AggregateExec: mode=Partial, gby=[c3@1 as c3], aggr=[min(aggregate_test_100.c1)]
5045-
11)--------------------RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1
5046-
12)----------------------CsvExec: file_groups={1 group: [[WORKSPACE_ROOT/testing/data/csv/aggregate_test_100.csv]]}, projection=[c1, c3], has_header=true
5035+
01)CoalescePartitionsExec: fetch=5
5036+
02)--AggregateExec: mode=FinalPartitioned, gby=[c3@0 as c3, min(aggregate_test_100.c1)@1 as min(aggregate_test_100.c1)], aggr=[], lim=[5]
5037+
03)----CoalesceBatchesExec: target_batch_size=8192
5038+
04)------RepartitionExec: partitioning=Hash([c3@0, min(aggregate_test_100.c1)@1], 4), input_partitions=4
5039+
05)--------AggregateExec: mode=Partial, gby=[c3@0 as c3, min(aggregate_test_100.c1)@1 as min(aggregate_test_100.c1)], aggr=[], lim=[5]
5040+
06)----------AggregateExec: mode=FinalPartitioned, gby=[c3@0 as c3], aggr=[min(aggregate_test_100.c1)]
5041+
07)------------CoalesceBatchesExec: target_batch_size=8192
5042+
08)--------------RepartitionExec: partitioning=Hash([c3@0], 4), input_partitions=4
5043+
09)----------------AggregateExec: mode=Partial, gby=[c3@1 as c3], aggr=[min(aggregate_test_100.c1)]
5044+
10)------------------RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1
5045+
11)--------------------CsvExec: file_groups={1 group: [[WORKSPACE_ROOT/testing/data/csv/aggregate_test_100.csv]]}, projection=[c1, c3], has_header=true
50475046

50485047

50495048
#

datafusion/sqllogictest/test_files/limit.slt

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -852,9 +852,8 @@ physical_plan
852852
01)ProjectionExec: expr=[foo@0 as foo]
853853
02)--SortExec: TopK(fetch=1000), expr=[part_key@1 ASC NULLS LAST], preserve_partitioning=[false]
854854
03)----ProjectionExec: expr=[1 as foo, part_key@0 as part_key]
855-
04)------GlobalLimitExec: skip=0, fetch=1
856-
05)--------CoalescePartitionsExec
857-
06)----------ParquetExec: file_groups={3 groups: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/parquet/test_limit_with_partitions/part-0.parquet:0..794], [WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/parquet/test_limit_with_partitions/part-1.parquet:0..794], [WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/parquet/test_limit_with_partitions/part-2.parquet:0..794]]}, projection=[part_key], limit=1
855+
04)------CoalescePartitionsExec: fetch=1
856+
05)--------ParquetExec: file_groups={3 groups: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/parquet/test_limit_with_partitions/part-0.parquet:0..794], [WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/parquet/test_limit_with_partitions/part-1.parquet:0..794], [WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/parquet/test_limit_with_partitions/part-2.parquet:0..794]]}, projection=[part_key], limit=1
858857

859858
query I
860859
with selection as (

datafusion/sqllogictest/test_files/repartition.slt

Lines changed: 5 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -121,12 +121,11 @@ logical_plan
121121
02)--Filter: sink_table.c3 > Int16(0)
122122
03)----TableScan: sink_table projection=[c1, c2, c3]
123123
physical_plan
124-
01)GlobalLimitExec: skip=0, fetch=5
125-
02)--CoalescePartitionsExec
126-
03)----CoalesceBatchesExec: target_batch_size=8192, fetch=5
127-
04)------FilterExec: c3@2 > 0
128-
05)--------RepartitionExec: partitioning=RoundRobinBatch(3), input_partitions=1
129-
06)----------StreamingTableExec: partition_sizes=1, projection=[c1, c2, c3], infinite_source=true
124+
01)CoalescePartitionsExec: fetch=5
125+
02)--CoalesceBatchesExec: target_batch_size=8192, fetch=5
126+
03)----FilterExec: c3@2 > 0
127+
04)------RepartitionExec: partitioning=RoundRobinBatch(3), input_partitions=1
128+
05)--------StreamingTableExec: partition_sizes=1, projection=[c1, c2, c3], infinite_source=true
130129

131130
# Start repratition on empty column test.
132131
# See https://github.com/apache/datafusion/issues/12057

datafusion/sqllogictest/test_files/union.slt

Lines changed: 21 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -510,28 +510,27 @@ logical_plan
510510
19)------------Projection: Int64(1) AS c1
511511
20)--------------EmptyRelation
512512
physical_plan
513-
01)GlobalLimitExec: skip=0, fetch=3
514-
02)--CoalescePartitionsExec
515-
03)----UnionExec
516-
04)------ProjectionExec: expr=[count(*)@0 as cnt]
517-
05)--------AggregateExec: mode=Final, gby=[], aggr=[count(*)]
518-
06)----------CoalescePartitionsExec
519-
07)------------AggregateExec: mode=Partial, gby=[], aggr=[count(*)]
520-
08)--------------ProjectionExec: expr=[]
521-
09)----------------AggregateExec: mode=FinalPartitioned, gby=[c1@0 as c1], aggr=[]
522-
10)------------------CoalesceBatchesExec: target_batch_size=2
523-
11)--------------------RepartitionExec: partitioning=Hash([c1@0], 4), input_partitions=4
524-
12)----------------------AggregateExec: mode=Partial, gby=[c1@0 as c1], aggr=[]
525-
13)------------------------CoalesceBatchesExec: target_batch_size=2
526-
14)--------------------------FilterExec: c13@1 != C2GT5KVyOPZpgKVl110TyZO0NcJ434, projection=[c1@0]
527-
15)----------------------------RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1
528-
16)------------------------------CsvExec: file_groups={1 group: [[WORKSPACE_ROOT/testing/data/csv/aggregate_test_100.csv]]}, projection=[c1, c13], has_header=true
529-
17)------ProjectionExec: expr=[1 as cnt]
530-
18)--------PlaceholderRowExec
531-
19)------ProjectionExec: expr=[lead(b.c1,Int64(1)) ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING@1 as cnt]
532-
20)--------BoundedWindowAggExec: wdw=[lead(b.c1,Int64(1)) ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING: Ok(Field { name: "lead(b.c1,Int64(1)) ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING", data_type: Int64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Rows, start_bound: Preceding(UInt64(NULL)), end_bound: Following(UInt64(NULL)), is_causal: false }], mode=[Sorted]
533-
21)----------ProjectionExec: expr=[1 as c1]
534-
22)------------PlaceholderRowExec
513+
01)CoalescePartitionsExec: fetch=3
514+
02)--UnionExec
515+
03)----ProjectionExec: expr=[count(*)@0 as cnt]
516+
04)------AggregateExec: mode=Final, gby=[], aggr=[count(*)]
517+
05)--------CoalescePartitionsExec
518+
06)----------AggregateExec: mode=Partial, gby=[], aggr=[count(*)]
519+
07)------------ProjectionExec: expr=[]
520+
08)--------------AggregateExec: mode=FinalPartitioned, gby=[c1@0 as c1], aggr=[]
521+
09)----------------CoalesceBatchesExec: target_batch_size=2
522+
10)------------------RepartitionExec: partitioning=Hash([c1@0], 4), input_partitions=4
523+
11)--------------------AggregateExec: mode=Partial, gby=[c1@0 as c1], aggr=[]
524+
12)----------------------CoalesceBatchesExec: target_batch_size=2
525+
13)------------------------FilterExec: c13@1 != C2GT5KVyOPZpgKVl110TyZO0NcJ434, projection=[c1@0]
526+
14)--------------------------RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1
527+
15)----------------------------CsvExec: file_groups={1 group: [[WORKSPACE_ROOT/testing/data/csv/aggregate_test_100.csv]]}, projection=[c1, c13], has_header=true
528+
16)----ProjectionExec: expr=[1 as cnt]
529+
17)------PlaceholderRowExec
530+
18)----ProjectionExec: expr=[lead(b.c1,Int64(1)) ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING@1 as cnt]
531+
19)------BoundedWindowAggExec: wdw=[lead(b.c1,Int64(1)) ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING: Ok(Field { name: "lead(b.c1,Int64(1)) ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING", data_type: Int64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Rows, start_bound: Preceding(UInt64(NULL)), end_bound: Following(UInt64(NULL)), is_causal: false }], mode=[Sorted]
532+
20)--------ProjectionExec: expr=[1 as c1]
533+
21)----------PlaceholderRowExec
535534

536535

537536
########

0 commit comments

Comments
 (0)