Skip to content

feat(streaming-merge): per-region engine + multi-output sorted_series splitting#6424

Open
g-talbot wants to merge 4 commits into
gtt/parquet-streaming-basefrom
gtt/streaming-merge-engine-multi-rg-1-engine
Open

feat(streaming-merge): per-region engine + multi-output sorted_series splitting#6424
g-talbot wants to merge 4 commits into
gtt/parquet-streaming-basefrom
gtt/streaming-merge-engine-multi-rg-1-engine

Conversation

@g-talbot
Copy link
Copy Markdown
Contributor

Slice 1 of 3 from the former PR #6410. Stacked underneath #6425 (adapter) and #6426 (hardening).

The streaming merge engine's per-region foundation. Refactors the engine from "one big merge" to "one region per prefix-key value", and adds intra-region splitting at sorted_series transitions so prefix_len=0 multi-output cases honor the requested file count.

What's in here

  • New module merge/streaming/region_grouping.rs: extracts regions from input metadata by composite prefix key (BTreeMap-driven), validates MS-2 (region order matches each input's physical RG order), and verifies PA-3 uniqueness via assert_unique_rg_prefix_keys.
  • New module merge/streaming/body_assembler.rs: page-bounded body-col write driven from per-input page caches.
  • New module merge/streaming/output.rs: per-output writer + finalize that derives row_keys / zonemap / metric_names from the rows that landed in that output.
  • Refactor merge/streaming.rs from a single-region engine to a per-region processor with sub-region splitting.
  • Composite prefix key encoding: escape-encoded bytes (0x00 → 0x00 0x01, terminator 0x00 0x00) with bytewise complement for DESC columns, so BTreeMap iteration matches the declared sort order across any composite of leaf primitives.
  • Up-front rejection of multi-RG legacy inputs (prefix_len=0 + num_RGs>1) — those must go through the adapter (next slice).
  • Up-front rejection of duplicate composite prefix keys within a single input (05dfb).
  • split_region_at_sorted_series: when prefix_len=0 + num_outputs > 1, walks the merge order and splits at sorted_series transitions so even single-prefix inputs honor the file count. Single sorted_series runs are never broken.

Test plan

  • cargo test -p quickwit-parquet-engine --all-features — 476 unit tests pass at this slice's HEAD.
  • MS-2 rejection test: a file whose physical RG order disagrees with the BTreeMap-derived region order is rejected up-front, not allowed to crash mid-merge.
  • MS-7 page-cache bound test: peak resident pages stays ≤ small constant across 300/3000/30000-row fixtures.
  • MC-1 / MC-2 / MC-3 / MC-4 proptests on the regular merge path remain green.
  • Composite-key encoding tests for two byte-array cols, mixed ASC/DESC direction, length-variance.
  • PA-3 duplicate-prefix rejection test.

Out of scope

🤖 Generated with Claude Code

@g-talbot g-talbot requested a review from a team as a code owner May 13, 2026 15:23
@g-talbot g-talbot force-pushed the gtt/streaming-merge-engine-merger branch from 61163cd to eac4f9c Compare May 14, 2026 13:48
@g-talbot g-talbot force-pushed the gtt/streaming-merge-engine-multi-rg-1-engine branch from 0c3ae7c to bc10992 Compare May 14, 2026 13:48
g-talbot added a commit that referenced this pull request May 14, 2026
…ocstrings

Addresses adamtobey's review on PR-6409.

- Rename `advance_decoder_to_row` → `fill_page_cache_to_row`. The
  function's effect on the world is "add pages to the per-input cache"
  — it never advances a cursor or skips data. The old name primed
  reviewers to ask "are we skipping rows?" (which is exactly what
  Adam asked).
- Use a `rows_for_current_output` register inside
  `compute_input_row_destinations` and write to
  `rows_per_output[out_idx]` once after the inner loop; saves the
  per-row indexed store.
- Expand `body_col_page_cache` docstring with the horizontal-vs-vertical
  memory bound argument and a pointer to the MS-7 invariant test
  (`test_ms7_body_col_page_cache_bounded_regardless_of_input_size`).
- Add context comments at the cross-file invariant sites Adam flagged:
  - Sort-cols-first storage-ordering contract on the sort-col drain.
  - Single-RG-input restriction with forward pointer to PR-6c.2
    (#6424) which relaxes it.
  - `rg_partition_prefix_len` defaulting to 0 (with reference to the
    legacy-promotion `mixed_prefix_ok` escape in PR-6423).

No behaviour change. 461 lib tests pass; workspace clippy + nightly
fmt + rustdoc clean.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
@g-talbot g-talbot force-pushed the gtt/streaming-merge-engine-multi-rg-1-engine branch 2 times, most recently from 2085c8d to ceed395 Compare May 14, 2026 14:49
@g-talbot g-talbot force-pushed the gtt/streaming-merge-engine-merger branch from 4d246e9 to d5118cf Compare May 14, 2026 17:15
@g-talbot g-talbot requested a review from a team as a code owner May 14, 2026 17:15
g-talbot added a commit that referenced this pull request May 14, 2026
…ocstrings

Addresses adamtobey's review on PR-6409.

- Rename `advance_decoder_to_row` → `fill_page_cache_to_row`. The
  function's effect on the world is "add pages to the per-input cache"
  — it never advances a cursor or skips data. The old name primed
  reviewers to ask "are we skipping rows?" (which is exactly what
  Adam asked).
- Use a `rows_for_current_output` register inside
  `compute_input_row_destinations` and write to
  `rows_per_output[out_idx]` once after the inner loop; saves the
  per-row indexed store.
- Expand `body_col_page_cache` docstring with the horizontal-vs-vertical
  memory bound argument and a pointer to the MS-7 invariant test
  (`test_ms7_body_col_page_cache_bounded_regardless_of_input_size`).
- Add context comments at the cross-file invariant sites Adam flagged:
  - Sort-cols-first storage-ordering contract on the sort-col drain.
  - Single-RG-input restriction with forward pointer to PR-6c.2
    (#6424) which relaxes it.
  - `rg_partition_prefix_len` defaulting to 0 (with reference to the
    legacy-promotion `mixed_prefix_ok` escape in PR-6423).

No behaviour change. 461 lib tests pass; workspace clippy + nightly
fmt + rustdoc clean.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
@g-talbot g-talbot force-pushed the gtt/streaming-merge-engine-multi-rg-1-engine branch from ceed395 to 7b8317b Compare May 14, 2026 17:16
g-talbot added a commit that referenced this pull request May 14, 2026
… (PR-6b.2) (#6409)

* feat: streaming column-major merge engine with page-bounded body cols (PR-6b.2)

Rebuilds PR-6b on top of PR-6a.2's per-page Arrow decoder. The
streaming merge engine now keeps body-col memory bounded by output
page size (not column-chunk size) while preserving caller-specified
M:N output splitting at sorted_series boundaries.

Architecture (Husky multi-input → multi-output sorted merge):

  Phase 0 (async) — drain sort cols from each input. With Husky
  column ordering, sort cols + sorted_series are the prefix of each
  row group's body bytes, so the decoder can stop after they are
  fully decoded; the remaining body col pages stay un-read in the
  input stream, ready for phase 3.

  Phase 1 — compute_merge_order over the per-input sort-col
  RecordBatches using the existing k-way (sorted_series,
  timestamp_secs) heap.

  Phase 2 — compute_output_boundaries with the caller's
  num_outputs, splitting at sorted_series transitions.

  Phase 3 (blocking + block_on bridges) — streaming write. All M
  output writers are alive for the duration. For each column in
  Husky order, every output's col K is written in turn:
   - Sort col / sorted_series: applied via arrow::interleave from
     the already-buffered phase-0 data.
   - Body col: each output page is assembled via arrow::interleave
     from input page slices, with decoders advanced page-by-page via
     handle.block_on from inside the sync iterator passed to
     write_next_column_arrays. Pages flush to the writer's sink as
     SerializedColumnWriter's page-size threshold trips — memory
     stays bounded by the in-flight output page plus a small number
     of in-flight input pages.

After all M outputs' col K is done, every input decoder is at the
start of col K+1 in its single row group. Move to col K+1.

PR-6b.2 only handles single-row-group inputs (real or PR-5-
adapter-presented). Multi-RG metric-aligned inputs are rejected
with a clear error message; supporting them requires either
consuming + discarding body cols of RG[i-1] from the stream to
reach RG[i]'s sort cols, or a second body GET — both are larger
scope changes that land in a follow-up.

Page-bounded contract verified by
test_body_col_streams_many_pages_per_column_chunk: with
data_page_row_count_limit=1000 on an 8000-row merge, the output
value column spans ≥ 2 pages, demonstrating that body col writes
respect data_page_size and do not materialise whole column chunks.

Tests (9, all passing): two-input merge, single-RG output for
single-metric_name input, total-rows-preserved across M:N,
sort-schema mismatch rejection, KV metadata propagation,
all-empty-inputs no-output, output drainable by StreamDecoder,
multi-RG input rejection, page-bounded body col streaming.

Also exposes existing helpers in merge/writer.rs as pub(super)
(apply_merge_permutation, build_merge_kv_metadata,
build_sorting_columns, resolve_sort_field_names, verify_sort_order)
so streaming.rs can reuse the same MC-3 / KV / sorting-columns
construction the non-streaming engine uses. PR-7 will fold the
non-streaming engine away.

PR-6c.2 will add file-size monitoring on top: close the current
output at the next sorted_series transition when an in-progress
file approaches the size cap, producing additional splits beyond
the caller's N.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>

* fix: persist decoder + page cache across body-col passes

Address two Codex review findings on PR-6b.2 (#6409):

* P1 — Preserve decoder/page cache across output chunks. The merge
  engine was constructing a fresh `StreamDecoder` for every
  `advance_decoder_to_row` call, which reset the per-column
  `rows_decoded` counter so the second decoded page reported
  `row_start = 0` after the stream had already advanced. The page
  cache also lived on the per-output assembler, so pages whose row
  range straddled two outputs were dropped when the first output
  finished even though the stream couldn't be rewound. Both
  scenarios produced silently wrong rows or out-of-bounds panics on
  any input large enough to require multi-page advances per output
  or multi-output coverage of a single page.

  The decoder now lives on `InputDecoderState` (owned via the new
  `StreamDecoder::from_owned` constructor), and the per-input body-
  col page cache + cursor are reset only at the start of each body
  column.

* P2 — Stream body pages instead of collecting `Vec<ArrayRef>`. The
  per-output body-col write now feeds `write_next_column_arrays`
  one page at a time via `StreamingBodyColIter`, which captures
  assembly errors in a side cell so memory stays bounded by output-
  page size rather than column-chunk size.

Two regression tests cover the bug shapes — multi-page body col
within one output (2500 rows × 50-row pages) and multi-output input
where pages span output boundaries.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>

* fix: guard body-col path against zero-row-group inputs

Address Codex P1 (third comment) on PR-6b.2 (#6409): phase 0
explicitly accepts inputs with `num_row_groups() == 0` (returning a
zero-row sort batch), but `write_body_col_for_all_outputs`
unconditionally called `state.metadata.row_group(0)` for every
input, panicking with "index out of bounds" before the first body
column was written.

Treat zero-RG inputs the same as inputs whose schema lacks the
current column: push `None` into `input_col_indices` and skip them
for this body col. Also drop the unused `input_target_rows` vec
that was being built only for its row-group lookup side effect.

Regression test `test_zero_row_input_mixed_with_non_empty` builds a
0-row + 50-row pair and merges them through the streaming engine;
without this fix the merge blocking task panics inside parquet-rs's
`row_group()` indexing.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>

* fix: drop all-null sort fields from per-output streaming schema

Address Codex P2 (fourth comment) on PR-6b.2 (#6409): the schema
derivation condition `sort_optimised.has(name) || full_union.has(name)`
was tautologically true for every iterated field — every `field` came
from `full_union_schema`, so the second disjunct was always satisfied
and the intended "drop all-null sort fields" branch never fired.

Pass the sort union schema in explicitly so we can tell sort fields
apart from body fields. Sort field present in `sort_union_schema` →
keep only if `optimize_output_batch` kept it (not all-null for this
output's rows). Body field → keep unconditionally; tracking per-output
body-col presence would require pre-reading every body column for
every output, which is the column-chunk-bounded buffering the
streaming path exists to avoid.

Regression test `test_derive_output_schema_drops_all_null_sort_field`
calls the helper directly with a synthetic union + sort-optimised
pair and asserts an all-null sort field is dropped while a body
field with the same union-schema position is preserved. Verified
the test fails against the pre-fix logic with the expected
`['metric_name', 'env', 'timestamp_secs', 'value']` vs
`['metric_name', 'timestamp_secs', 'value']` mismatch.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>

* chore: code-quality fixes + MC-2 type round-trip test on streaming merge

Bundle three pieces:

- **Husky → neutral phrasing.** Replaced the seven "Husky" mentions in
  the streaming engine's doc-comments and error messages with neutral
  "sort-cols-first storage ordering" / "column ordering" phrasing.
  Project is Quickwit; the internal column-ordering scheme didn't
  need a separate brand in user-visible error strings.

- **One `.unwrap()` → `.expect()` in lib code.** The hashmap lookup
  in `drain_sort_cols_one_input` is guarded by a `contains_key`
  check; promote the implicit invariant to a documented panic
  message per CODE_STYLE.md.

- **`align_inputs_to_union_schema` nullability fix.** The first-sight
  branch unconditionally marked new fields nullable; the existing
  comment claims "columns that don't appear in every input must be
  nullable" but the code applied that rule to every field. Replaced
  with a two-pass scheme: track `any_nullable` and `appears_in` per
  field across all inputs, then mark nullable iff some input had it
  nullable OR the field is missing from some input. This unblocks
  `List<Float64>` columns end-to-end (the writer rejects nullable
  List; the previous behaviour forced every list column nullable on
  first sight even when every input declared it non-null).

- **MC-2 round-trip integration test.** New
  `test_mc2_all_types_round_trip_through_streaming_merge` builds two
  inputs covering every parquet physical type the decoder accepts —
  Int8/16/32/64, UInt8/16/32/64, Float32/64, Bool, Utf8,
  Dictionary<Int32, Utf8>, LargeBinary, and non-nullable
  `List<Float64>` — merges them through the streaming engine, and
  asserts every `(sorted_series_key → body-col tuple)` pair survives
  byte-equal. Storage-encoding transitions (Dict→Utf8, LargeBinary→
  Binary) are normalised in the render helper because MC-2 promises
  value preservation, not internal representation preservation.

  This test caught two real bugs while being written:
  1. Body cols must be declared in lexicographic order — the streaming
     engine assumes the storage convention and crashes mid-merge if
     they aren't. Fixture re-ordered accordingly. (Worth adding
     upfront validation in a follow-up; not in scope here.)
  2. The schema-union nullability bug above.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>

* test(MS-7): page-cache bounded-memory contract is observable + asserted

Add a `#[cfg(test)] static AtomicUsize` PEAK_BODY_COL_PAGE_CACHE_LEN
that records the maximum length any input's `body_col_page_cache`
ever reached during the current merge, bumped on every page push in
`advance_decoder_to_row`. Zero production overhead — the `record_*`
helper compiles to a no-op outside test builds.

New test `test_ms7_body_col_page_cache_bounded_regardless_of_input_size`
runs the streaming merge over three input sizes (300 / 3 000 / 30 000
rows at 50-row pages) and asserts:
  1. Peak resident pages stays below a fixed ceiling (24, for the
     ratio of OUTPUT_PAGE_ROWS=1024 to input page_rows=50, plus a
     few-page slack for decoder lookahead + transients).
  2. Growth from 3 000 to 30 000 rows (10× more input pages) yields
     at most a 2-page increase in peak. The whole MS-7 claim is that
     peak does not scale with input size.

Verified the test catches a deliberate regression: removing the per-
output-page eviction loop in `assemble_one_output_page` pushed the
3 000-row peak to 60 (60 > 24) and the test failed with the expected
message.

Fixture support: `write_input_parquet_with_small_pages` now also
sets `write_batch_size` and `data_page_size` proportional to the
requested page row count. Without those, the arrow writer's defaults
(64 KiB / 1 MiB) caused `data_page_row_count_limit` to be silently
ignored and produced one giant page per column. Probed the output
via `get_column_page_reader` — 30 000 rows now produces 600 pages
per col as expected.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>

* fix: drive col loop from full union schema + collect service tags from sort col

Address two new Codex P2 findings on PR-6b.2 (#6409):

- **Use the full union schema when driving column writes.** The old
  `build_parent_union_schema` picked one per-output schema by field
  count and used it as the column-iteration driver. If two outputs
  drop *different* all-null sort fields and end up with the same
  field count, the first wins — and any column it dropped is never
  iterated, leaving the other output's writer missing a column or
  writing subsequent columns into the wrong slot. The doc comment
  already said "process the FULL union schema's cols in order"; the
  implementation diverged. Drive `write_all_columns` from
  `full_union_schema` directly and delete the broken heuristic.

- **Collect service names from the sort-col path too.** If the sort
  schema places `service` in the sort key
  (`metric_name|service|...`), the streaming engine writes it via
  the sort-col path and the body-col `track_service` branch never
  runs. `MergeOutputFile.low_cardinality_tags[TAG_SERVICE]` came
  back empty even though every row had a service value. Extract
  service names from `static_meta.sort_optimised` at
  `finalize_output_writer` time so the TAG_SERVICE metadata is
  accurate regardless of which write path the column took.

Two regression tests:
- `test_heterogeneous_dropped_fields_drive_from_full_union_schema`
  builds two inputs whose per-output schemas drop different all-null
  sort fields with the same field count. Each kept tag must survive
  to its output. Verified the test fails (panic on missing column)
  against the pre-fix logic.
- `test_service_as_sort_column_still_populates_low_cardinality_tags`
  uses a sort schema `metric_name|service|-timestamp_secs/V2` and
  asserts the output's `low_cardinality_tags[TAG_SERVICE]` covers
  every distinct service value. Verified the test fails against
  pre-fix `finalize_output_writer` with the expected "must contain
  TAG_SERVICE" message.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>

* refactor(streaming): rename to fill_page_cache_to_row + cross-input docstrings

Addresses adamtobey's review on PR-6409.

- Rename `advance_decoder_to_row` → `fill_page_cache_to_row`. The
  function's effect on the world is "add pages to the per-input cache"
  — it never advances a cursor or skips data. The old name primed
  reviewers to ask "are we skipping rows?" (which is exactly what
  Adam asked).
- Use a `rows_for_current_output` register inside
  `compute_input_row_destinations` and write to
  `rows_per_output[out_idx]` once after the inner loop; saves the
  per-row indexed store.
- Expand `body_col_page_cache` docstring with the horizontal-vs-vertical
  memory bound argument and a pointer to the MS-7 invariant test
  (`test_ms7_body_col_page_cache_bounded_regardless_of_input_size`).
- Add context comments at the cross-file invariant sites Adam flagged:
  - Sort-cols-first storage-ordering contract on the sort-col drain.
  - Single-RG-input restriction with forward pointer to PR-6c.2
    (#6424) which relaxes it.
  - `rg_partition_prefix_len` defaulting to 0 (with reference to the
    legacy-promotion `mixed_prefix_ok` escape in PR-6423).

No behaviour change. 461 lib tests pass; workspace clippy + nightly
fmt + rustdoc clean.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>

* fix(streaming): assert per-input body cols are in Husky order

Adam's question on L194 asked whether body-col ordering was a hard
cross-file requirement. My first answer said "no" — true for which
array we read (we look up by name), but wrong for the body-col
memory bound:

  Phase 3 iterates the union schema's body cols alphabetically and
  asks each input's decoder to advance to that col. Parquet emits
  column chunks in declared schema order, so the decoder reads pages
  in that input's storage order. If an input's body cols aren't in
  the same alphabetical-after-sort-cols order ("Husky order"),
  fill_page_cache_to_row has to drain every body col preceding the
  requested one on the wire — those pages land in
  body_col_page_caches[col_idx] until that col's turn in the union
  iteration. The cache grows to a full column-chunk's worth per
  misaligned col. Vertical, not horizontal. Defeats streaming.

Catch this at merge entry instead of silently degrading to vertical
caching:

- `assert_inputs_in_husky_body_col_order` runs after
  `build_input_decoders_state` and before phase 0. Bails with a
  concrete error message naming the offending pair of column names.
- New regression test
  `test_assert_inputs_in_husky_body_col_order_rejects_misaligned_input`
  builds an input with body cols `[value, metric_type]` (alphabetical
  would be `[metric_type, value]`) and asserts the merge errors out
  before phase 3.

No production producer violates this today (streaming writer and
legacy Husky writer both emit lexicographic body cols), so the
assertion catches future producer drift, not current traffic.

462 lib tests pass (461 prior + 1 new); workspace clippy + nightly
fmt + rustdoc clean.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>

---------

Co-authored-by: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Base automatically changed from gtt/streaming-merge-engine-merger to gtt/parquet-streaming-base May 14, 2026 18:06
g-talbot and others added 3 commits May 14, 2026 14:09
…PR-6c.2)

Restructures PR-6b.2's flat phase 0 → phase 3 into a per-merge-region
loop. Unlocks multi-RG metric-aligned input support and produces
multi-RG output naturally — one output row group per merge region
(typically one per metric_name when `rg_partition_prefix_len == 1`).

Sort-prefix alignment (`prefix_len >= 1`) guarantees that any merge
region has AT MOST one row group per input. That single invariant
unlocks the restructure:

1. Pre-compute regions from RG metadata. For `prefix_len >= 1`, read
   each RG's metric_name min stat (must equal max — verifies
   metric-alignment). Group RGs across inputs by prefix_key. Sort
   regions by prefix_key. For `prefix_len == 0` (single-RG inputs
   only, validated earlier), one region covers everything.

2. Assign regions to output files by cumulative row count. Caller's
   `num_outputs` preserved as the upper bound. Each output file gets
   a contiguous slice of the region list, so output files have
   non-overlapping key ranges.

3. Per-region processing: for each region, advance contributing
   inputs' decoders through their RGs (drain sort cols of that RG,
   then stream body cols via the existing page-bounded
   BodyColOutputPageAssembler). Each region becomes one output RG in
   the current writer; when the assignment moves to a new output
   file, close the previous writer and open a new one.

The streaming body-col mechanism from PR-6b.2 (arrow::compute::
interleave + handle.block_on driven decoder) is unchanged; it just
runs over smaller row ranges (one region instead of one whole
output).

PR-6b.2's check that rejected any multi-RG input is replaced with:
reject only `prefix_len == 0` AND multi-RG (those still need PR-5's
LegacyMultiRGAdapter). Multi-RG metric-aligned inputs are now
accepted natively.

PR-6b.2 optimised the per-output schema based on per-output sort col
data (drop all-null cols, re-dict-encode low-cardinality strings).
With per-region streaming we don't know each region's content until
we drain it, so PR-6c.2 declares the writer's schema as the full
union schema and leaves output strings as Utf8. Per-output dict
re-encoding can be reintroduced later by tracking cardinality during
the streaming pass.

- All 9 PR-6b.2 tests still pass (single-RG input regression —
  behaviour preserved).
- New test_multi_rg_metric_aligned_input_produces_multi_rg_output:
  feeds a 2-RG metric-aligned input (prefix_len = 1, RG 0 =
  cpu.usage, RG 1 = memory.used); the streaming engine accepts it
  and produces a 2-RG output (one RG per metric_name region).
- Renamed test_multi_rg_input_rejected →
  test_legacy_multi_rg_input_rejected to reflect the new rejection
  scope (only prefix_len == 0 multi-RG is rejected; metric-aligned
  is accepted).

10/10 streaming tests pass. Clippy, doc, machete, fmt all clean.

1. File-size cap with sort-key-boundary splits.
2. Per-output schema optimisation (track region body-col cardinality
   during the streaming pass).
3. Mid-region splits at sorted_series transitions for finer-grained
   M:N control when callers want more outputs than regions.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Two P1 bugs flagged by Codex on PR-6c.2 (#6410):

1. **Duplicate input row groups silently dropped.** When one input
   contained two RGs with the same composite prefix key,
   `process_region` overwrote `sort_col_batches[input_idx]` while
   `Region::total_rows` still counted both — losing rows and
   misaligning the body-col / sort-col mapping. Now enforce
   at-most-one-RG-per-input-per-prefix as a strong invariant at three
   sites: the merge read path (`extract_regions_from_metadata`), the
   streaming merge output finalize, and the indexing writer
   (`ParquetWriter::write_to_bytes` / `write_to_file_with_metadata`).
   The new `assert_unique_rg_prefix_keys` helper is shared.

2. **Byte-array prefix encoding broke lex order across lengths.**
   The 4-byte length prefix made `"b"` sort before `"aa"`, violating
   the declared ASC order. Switched to byte-stuffed escape encoding
   (`0x00` → `0x00 0x01`, terminator `0x00 0x00`), which preserves
   single-column lex order AND retains unambiguous concatenation for
   composite keys (the terminator is the smallest 2-byte sequence
   under escaping, so shorter values still sort before longer ones
   with the same prefix).

Tests:
- `test_byte_array_prefix_preserves_lex_order_across_lengths` —
  `"aa" < "b"`, empty < non-empty, shared-prefix shorter < longer,
  null-byte escaping preserves order.
- `test_streaming_merge_rejects_duplicate_prefix_rgs_in_one_input` —
  end-to-end bail with clear error.
- `test_write_to_bytes_rejects_duplicate_rg_prefix_when_claimed_aligned`
  + the `write_to_file` and single-RG positive counterparts.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
…i-output

When inputs declare rg_partition_prefix_len = 0 (legacy single-RG)
and the caller asks for num_outputs > 1, the engine subdivides the
single region at sorted_series transitions in the merge order so it
can honor the output count. A single sorted_series run is never
broken; if one run exceeds the remaining budget the whole run lands
in one output anyway. The output inherits the input's
rg_partition_prefix_len (=0) — the engine does not synthesize a
prefix it can't unconditionally guarantee.

Also handles the giant-single-metric case (prefix_len=0, one
metric_name, num_outputs > 1): sorted_series transitions still
split the merge order even though there are no metric_name
transitions to drive a prefix synthesis.

Implementation:
- New `split_region_at_sorted_series` in region_grouping: walks the merge order and splits at
  sorted_series transitions when accumulated rows reach the target budget.
- Main engine loop: when num_outputs > current_output_idx + 1 AND region's rows exceed the
  remaining budget, drain sort cols for the region, compute merge order, call
  split_region_at_sorted_series, process sub-regions.
- Per-col page cache + cursor keyed by col_idx so the body-col path can read pages once and re-use
  them across sub-regions within the same top-level region. Resets between top-level regions
  (different RGs).

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
@g-talbot g-talbot force-pushed the gtt/streaming-merge-engine-multi-rg-1-engine branch from 7b8317b to a5dfd72 Compare May 14, 2026 18:10
The MS-2 validation path returns `Err` via `bail!()` (anyhow), not a
panic / abort. Five doc-comment / inline-comment sites described the
failure as "the engine would crash mid-merge" — overstated. Callers
get a `Result::Err` propagated up the spawn_blocking task and the
`streaming_merge_sorted_parquet_files` return.

Sites updated:
- `region_grouping.rs` module doc.
- `validate_region_order_matches_physical_rg_order` doc.
- streaming.rs MS-2 validation call-site comment.
- Test docstrings for `test_streaming_merge_with_desc_prefix_col` and
  `test_ms2_region_order_disagrees_with_physical_rg_order_rejected`.

No behaviour change. 477 lib tests pass; clippy + nightly fmt clean.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant