perf: Coalesce batches before sorting in ExternalSorter to reduce merge fan-in#21629
perf: Coalesce batches before sorting in ExternalSorter to reduce merge fan-in#21629mbutrovich wants to merge 6 commits intoapache:mainfrom
Conversation
…erge fan-in. Subset of changes from apache#21600.
| /// | ||
| /// Larger values reduce merge fan-in by producing fewer, larger | ||
| /// sorted runs. | ||
| pub sort_coalesce_target_rows: usize, default = 32768 |
There was a problem hiding this comment.
I wonder if we can make this somewhat adaptive: as we usually load everything in memory, it seems for very large sets larger batches would be even more favorable (e.g. use 10MiB "scratch space" for coalescing instead of 32KiB rows would make sense if our data is 1GiB and perhaps be even faster?)
There was a problem hiding this comment.
Yeah I suspect we'd want to do a good sensitivity analysis on different types and batch sizes for lexsort_to_indices (and eventually the radix sort kernel). We might hit a point of diminishing returns/cache friendliness if our coalesced batches get too large.
This design also first spills from the sorted runs, so holding more unsorted rows in the coalescer may make it more likely for us to trigger spilling.
I'm definitely of the mind that we can and should tune this, but unclear what even a reasonable default right now would be. In Comet where we run TPC-H SF 1000, for example, I suspect we'll want longer sorted runs.
|
run benchmark tpch10 |
|
🤖 Benchmark running (GKE) | trigger CPU Details (lscpu)Comparing externalsorter (32a6882) to e6b32fe (merge-base) diff using: tpch10 File an issue against this benchmark runner |
|
🤖 Benchmark completed (GKE) | trigger Instance: CPU Details (lscpu)Details
Resource Usagetpch10 — base (merge-base)
tpch10 — branch
File an issue against this benchmark runner |
|
Wooh, nice results |
|
Lets compare to hash join (still 2x diff I think) |
|
run benchmark tpch10 |
|
🤖 Benchmark running (GKE) | trigger CPU Details (lscpu)Comparing externalsorter (32a6882) to e6b32fe (merge-base) diff using: tpch10 File an issue against this benchmark runner |
|
I also opened a new PR that scales our existing sort benchmark to larger numbers: #21630 In theory we should merge that first, then run the sort benchmark against |
|
🤖 Benchmark completed (GKE) | trigger Instance: CPU Details (lscpu)Details
Resource Usagetpch10 — base (merge-base)
tpch10 — branch
File an issue against this benchmark runner |
|
run benchmark tpcds tpch_sort |
|
🤖 Criterion benchmark running (GKE) | trigger CPU Details (lscpu)Comparing externalsorter (32a6882) to e6b32fe (merge-base) diff File an issue against this benchmark runner |
|
🤖 Benchmark running (GKE) | trigger CPU Details (lscpu)Comparing externalsorter (32a6882) to e6b32fe (merge-base) diff using: tpcds File an issue against this benchmark runner |
|
Benchmark for this request failed. Last 20 lines of output: Click to expandFile an issue against this benchmark runner |
|
run benchmark tpcds |
|
🤖 Benchmark running (GKE) | trigger CPU Details (lscpu)Comparing externalsorter (32a6882) to e6b32fe (merge-base) diff using: tpcds File an issue against this benchmark runner |
|
🤖 Benchmark completed (GKE) | trigger Instance: CPU Details (lscpu)Details
Resource Usagetpcds — base (merge-base)
tpcds — branch
File an issue against this benchmark runner |
|
🤖 Benchmark completed (GKE) | trigger Instance: CPU Details (lscpu)Details
Resource Usagetpcds — base (merge-base)
tpcds — branch
File an issue against this benchmark runner |
## Which issue does this PR close? - Partially addresses #21543. Also needed to properly evaluate the ExternalSorter refactor in #21629, which improves the merge path. ## Rationale for this change Current sort benchmarks use 100K rows across 8 partitions (~12.5K rows per partition, ~100KB for integers). This falls below the `sort_in_place_threshold_bytes` (1MB), so the "sort partitioned" benchmarks always take the concat-and-sort-in-place path and never exercise the sort-then-merge path that dominates real workloads. ## What changes are included in this PR? Parameterizes the sort benchmark on input size, running each case at both 100K rows (existing) and 1M rows (new). At 1M rows, each partition holds ~125K rows (~1MB for integers), which exercises the merge path. - `INPUT_SIZE` constant replaced with `INPUT_SIZES` array: `[(100_000, "100k"), (1_000_000, "1M")]` - `DataGenerator` takes `input_size` as a constructor parameter - All stream generator functions accept `input_size` - Benchmark names include size label (e.g. `sort partitioned i64 100k`, `sort partitioned i64 10M`) - Data distribution and cardinality ratios are preserved across sizes ## Are these changes tested? Benchmark compiles and runs. No functional test changes. ## Are there any user-facing changes? No.
|
run benchmark sort |
|
🤖 Criterion benchmark running (GKE) | trigger CPU Details (lscpu)Comparing externalsorter (689dfd2) to d0692b8 (merge-base) diff File an issue against this benchmark runner |
| /// `─────────────────' │ | ||
| /// spills | ||
| /// ┌──────────┐ ┌────────────┐ ┌──────┐ ┌────────────┐ | ||
| /// │ Incoming │────▶│ Batch │────▶│ Sort │────▶│ Sorted Run │ |
There was a problem hiding this comment.
In the current approach, it would be worth it to "push down" the target batch size to the inner operation, so the output of the previous stage already is of the higher batch size instead of doing another copy.
There was a problem hiding this comment.
@andygrove and I have been discussing this in the context of Comet, where for some Spark stages that get translated to native plans we'd possibly want to set different target batch sizes. What you're proposing sounds even more granular, where individual operators could potentially advertise an ideal input batch size, though this could get complicated fast based on schema. Maybe some sort of trait on operators to advertise when they want larger batch sizes, and the optimizer could figure out what that target should be?
Regardless, a general solution seems out of scope for this PR.
There was a problem hiding this comment.
Yeah I think something like that could work.
The propagation doesn't seem too complex I think, just change the target batch size of te inner op based on the desired one.
I agree it is out of scope.
|
I fear my sort benchmark scale up might have killed the benchmark bot :( |
| } | ||
| } | ||
| for batch in &completed { | ||
| self.sort_and_store_run(batch)?; |
There was a problem hiding this comment.
Nice - I wonder if a part of the speedup comes from this, sorting batch immediately (while in CPU cache) instead of waiting to the end.
|
run benchmark sort |
|
🤖 Criterion benchmark running (GKE) | trigger CPU Details (lscpu)Comparing externalsorter (0198302) to 5c653be (merge-base) diff File an issue against this benchmark runner |
…21657) ## Which issue does this PR close? - Closes #. ## Rationale for this change PR #21620 (commit 5c653be) ported `test_aggregate_dynamic_filter_parquet_e2e` from Rust to sqllogictest using `analyze_categories = 'rows'`, which includes exact pushdown metrics. These metrics are nondeterministic under parallel execution — the order in which Partial aggregates publish dynamic filter updates races against when the scan reads each partition — so the expected output is flaky. Noticed on #21629 ([CI run](https://github.com/apache/datafusion/actions/runs/24472961090/job/71518239089?pr=21629)) and confirmed on main ([CI run](https://github.com/apache/datafusion/actions/runs/24467213913/job/71497866154)). ## What changes are included in this PR? Switch the `agg_dyn_e2e` test to `analyze_level = summary` + `analyze_categories = 'none'`, suppressing nondeterministic metrics. This matches the approach already used by the other aggregate dynamic filter tests in the same file. The original Rust test only asserted `matched < 4` (i.e. some pruning happened); the important invariant — that the `DynamicFilter [ column1@0 > 4 ]` text and pruning predicate are correct — is still verified. ## Are these changes tested? Yes — the test itself is what is being fixed. ## Are there any user-facing changes? No.
|
Would be nice to see results in |
I’ll try to run locally with the scaled up |
Benchmark Analysis:
|
| Benchmark | externalsorter | main | Speedup |
|---|---|---|---|
| sort i64 100k | 2.40 ms | 3.88 ms | 1.61x |
| sort f64 100k | 2.61 ms | 3.94 ms | 1.51x |
| sort merge i64 1M | 26.6 ms | 38.5 ms | 1.45x |
| sort i64 1M | 33.8 ms | 45.5 ms | 1.35x |
| sort f64 1M | 35.8 ms | 46.4 ms | 1.30x |
| sort utf8 high cardinality 1M | 59.7 ms | 74.0 ms | 1.24x |
| sort utf8 view tuple 1M | 8.1 ms | 9.1 ms | 1.13x |
Regressions (multi-column StringArray and Dictionary)
| Benchmark | externalsorter | main | Slowdown |
|---|---|---|---|
| sort utf8 tuple 100k (3x StringArray) | 23.7 ms | 9.5 ms | 2.5x |
| sort utf8 tuple 1M (3x StringArray) | 272 ms | 111 ms | 2.5x |
| sort mixed dictionary tuple 1M (3x dict + i64) | 283 ms | 108 ms | 2.6x |
| sort utf8 dictionary tuple 1M (3x dict) | 92.4 ms | 78.5 ms | 1.18x |
Root cause
The coalesce-then-sort pipeline does: coalesce (copy all column data) -> lexsort_to_indices -> take (random-access scatter-gather) -> chunk back to batch_size. With multiple StringArray or Dictionary columns at 32K rows, the take scatter-gathers across ~1.9 MB of string heap data, exceeding L2 cache.
Schemas that don't regress help pinpoint the cause:
- Single StringArray (e.g.
sort utf8 high cardinality 1M, +1.24x): one ~640KB string buffer fits in L2 duringtake - StringViewArray (e.g.
sort utf8 view tuple 1M, +1.13x):takecopies fixed-size 16-byte view structs — no heap random access - Fixed-width (i64, f64): coalesce is memcpy,
takeis sequential — all cache-friendly
Benchmark batch size
The sort benchmark uses BATCH_SIZE = 1024, while DataFusion's default is 8192. This makes the coalesce expand 32x (1024 -> 32768) instead of 4x (8192 -> 32768), amplifying the copy cost. Small batches are a valid scenario though — filters, joins, and sources with pushdown can all produce sub-8192 batches.
Proposed fix
Arrow's BatchCoalescer has a with_biggest_coalesce_batch_size(Some(limit)) option: batches larger than limit bypass coalescing and pass through directly. For schemas with non-view variable-length columns (StringArray, BinaryArray, DictionaryArray), set this to batch_size. This way:
- Full-size batches (>= 8192 rows) pass through without string copying — sorted directly as individual runs
- Small batches still coalesce to reduce fan-in
- Fixed-width and StringView schemas keep the full 32K coalesce target unchanged
|
Marking as draft while we explore performance trade-offs. I don't want to accidentally merge this yet. |
Which issue does this PR close?
Partially addresses #21543. This is the ExternalSorter pipeline refactor from #21600, separated from the radix sort changes.
Rationale for this change
ExternalSorter's merge path sorts each incoming batch individually (typically 8192 rows), then k-way merges all of them. At scale (TPC-H SF10, ~60M rows in lineitem), this produces ~7300 individually-sorted batches feeding the k-way merge with high fan-in.
What changes are included in this PR?
Coalesce-then-sort pipeline
Replaces ExternalSorter's buffer-then-sort architecture with a coalesce-then-sort pipeline:
BatchCoalesceruntilsort_coalesce_target_rows(default 32768) is reachedbatch_size, producing fewer, larger sorted runsStreamingMergeBuilderThis reduces merge fan-in from ~7300 to ~1800 runs at SF10, which is the primary source of speedup.
Spill strategy
sort_spill_reservation_bytes > 0): merge all runs into a single sorted stream before spilling to one file. Fewer files = lower fan-in for the finalMultiLevelMerge.sort_spill_reservation_bytes == 0): spill each run as its own file. The multi-level merge handles low merge memory by reducing fan-in, so this no longer fails under tight memory budgets.Dead code removal
Sorted runs no longer require an in-memory merge before spilling. Removes
in_mem_sort_stream,sort_batch_stream,consume_and_spill_append,spill_finish,organize_stringview_arrays, andin_progress_spill_file.Config changes
sort_coalesce_target_rows(default 32768)sort_in_place_threshold_bytes(no longer read,warnattribute per API health policy)Are these changes tested?
information_schema.sltupdated for new configAre there any user-facing changes?
sort_coalesce_target_rows(default 32768) controls the coalesce target before sortingsort_in_place_threshold_bytesis deprecatedsort_spill_reservation_bytesnear zero) that previously failed now succeed via multi-level merge with reduced fan-in