feat: Add memory-limited execution for NestedLoopJoinExec#21448
feat: Add memory-limited execution for NestedLoopJoinExec#21448viirya merged 6 commits intoapache:mainfrom
Conversation
2010YOUY01
left a comment
There was a problem hiding this comment.
Thank you — this idea looks good to me. I’ve left a few comments; this should be ready to go once the repeated left-side evaluation issue is addressed.
| // ======================================================================== | ||
| /// Left input stream for incremental buffering (memory-limited mode only). | ||
| /// None when using the standard OnceFut path. | ||
| left_stream: Option<SendableRecordBatchStream>, |
There was a problem hiding this comment.
A minor style point, possibly for follow-up: I noticed in the multiple places that, the memory-limited execution logic is using Option fields to implicitly check the current fallback state.
It might be clearer to introduce a dedicated state enum, for example:
enum NljMemLimitedState {
Unsupported, // this join type cannot use memory-limited fallback
FirstRightPass,
SubsequentRightPass,
// ...
}
This would make the execution state explicit, and the Option fields could then be used purely as payload (or for sanity checks), rather than also acting as implicit state flags.
There was a problem hiding this comment.
Good suggestion! The current approach does use Option fields as implicit state flags (e.g., left_stream.is_some() to check if we're in memory-limited mode), which is fragile. I'll introduce a dedicated enum to make the execution state explicit. Will address in a follow-up or in this PR if you prefer.
| left_reservation: Option<MemoryReservation>, | ||
| /// A batch that couldn't be added to the current chunk due to memory limit. | ||
| /// It will be the first batch in the next chunk. | ||
| left_stashed_batch: Option<RecordBatch>, |
There was a problem hiding this comment.
Also optional, I feel it's simpler to eliminate this special case, and directly put this stashed batch to the end of left_pending_batches, since it's already in memory.
There was a problem hiding this comment.
You're right — the stashed batch is already in memory, so there's no need for a separate field. I can push it directly onto left_pending_batches when the memory limit is hit, instead of stashing it separately and re-inserting on the next BufferingLeft entry. Will simplify this.
| /// Incrementally polls the left stream and accumulates batches until: | ||
| /// - Memory reservation fails (chunk is full, more data remains) | ||
| /// - Left stream is exhausted (this is the last/only chunk) | ||
| fn handle_buffering_left_memory_limited( |
There was a problem hiding this comment.
I think this issue should be addressed first:
In the regular in-memory path, the left side is evaluated only once, buffered, and shared across all partitions via OnceFut<JoinLeftData>.
In contrast, the memory-limited fallback re-evaluates the left side for each partition. This can be inefficient and may even increase memory pressure. Ideally, we would like the left side to be evaluated only once, similar to the in-memory path. (I can imagine this is tricky due to the use of OnceFut... I'm thinking if there is any easy way to do it)
There was a problem hiding this comment.
This is a valid concern. Currently each partition independently re-executes the left child on OOM fallback. This is because the original left stream is consumed by the failed collect_left_input attempt and cannot be reused, so each partition must re-execute the left child to get a fresh stream.
A possible improvement: have the first partition that hits OOM spill the left side to disk, then other partitions read from that spill file instead of re-executing the left child. This would require some cross-partition coordination (e.g., an OnceAsync-like mechanism for the spilled left data).
That said, this only happens on the OOM fallback path, and the cost is executing the left child plan N times (where N = right partition count) instead of once. I think this is acceptable for an initial implementation since the alternative before this PR is a hard OOM failure. Would you prefer I add the shared-spill optimization in this PR or as a follow-up?
There was a problem hiding this comment.
Sounds good -- we could do this as the first follow-up PR to keep this PR simpler.
|
|
||
| // ======================================================================== | ||
| // Memory-limited execution tests | ||
| // ======================================================================== |
There was a problem hiding this comment.
I recommend to add some e2e tests: in slt, run NLJ queries with generate_series() table source, and check spill metrics from explain analyze
There was a problem hiding this comment.
Good idea! I'll add sqllogictest cases that use generate_series() with a tight memory limit and verify spill metrics in EXPLAIN ANALYZE output.
This gives us true end-to-end coverage beyond the unit tests.
There was a problem hiding this comment.
I will update the end-to-end test once #21565 is merged.
|
Thanks again! This PR should be ready once the slt tests are updated. Please let me know if you plan to apply any refactors in this PR, or we could do in follow-ups. |
…21565) ## Which issue does this PR close? <!-- We generally require a GitHub issue to be filed for all bug fixes and enhancements and this helps us generate change logs for our releases. You can link an issue to this PR using the GitHub syntax. For example `Closes #123` indicates that this PR will close issue #123. --> - Closes #. ## Rationale for this change LazyMemoryExec::execute() shares the same generator instance across multiple calls via Arc::clone, so a second call to execute(0) continues from where the first left off instead of starting from the beginning. This is inconsistent with how other ExecutionPlan implementations behave, where each execute() call produces an independent stream. This was discovered while writing e2e tests for NestedLoopJoinExec memory-limited execution (#21448), where the OOM fallback path re-executes the left child plan and got incomplete results. ## What changes are included in this PR? LazyMemoryExec::execute() was sharing the same generator instance (via Arc::clone) across multiple calls, causing streams to share mutable state. This meant a second call to execute(0) would continue from where the first call left off, instead of starting from the beginning. Fix by calling reset_state() on the generator to create a fresh instance for each execute() call, matching the expected ExecutionPlan semantics that each execute() produces an independent stream. ## Are these changes tested? <!-- We typically require tests for all PRs in order to: 1. Prevent the code from being accidentally broken by subsequent changes 2. Serve as another way to document the expected behavior of the code If tests are not included in your PR, please explain why (for example, are they covered by existing tests)? --> Unit test ## Are there any user-facing changes? No <!-- If there are user-facing changes then we may require documentation to be updated before approving the PR. --> <!-- If there are any breaking changes to public APIs, please add the `api change` label. -->
Implement multi-pass execution strategy for NestedLoopJoinExec when the left (build) side exceeds the memory budget. Instead of failing with OOM, the operator now: 1. Buffers left-side data in chunks that fit within memory limits 2. Spills the right side to disk on the first pass via SpillManager 3. Re-reads the right side from the spill file for each subsequent left chunk This is enabled automatically when disk spilling is available and the right side has a single partition. Multi-partition right side falls back to the existing OnceFut-based path. Phase 1 supports INNER, LEFT, LEFT SEMI, LEFT ANTI, and LEFT MARK join types. RIGHT/FULL joins with global right bitmap tracking will follow in a later phase. Tracking issue: apache#15760 Co-authored-by: Isaac
RIGHT, FULL, RIGHT SEMI, RIGHT ANTI, and RIGHT MARK joins require tracking which right-side rows have been matched across ALL left chunks. The current implementation only tracks right-side matches per-batch within a single left chunk, which would silently produce incorrect results in multi-pass mode. Gate the memory-limited path on `!need_produce_right_in_final()` so these join types fall back to the standard OnceFut path. A global right bitmap spanning all left chunks will be added in Phase 3. Co-authored-by: Isaac
Instead of deciding the execution path at execute() time, always attempt to load all left data in memory via OnceFut first. If that fails with ResourcesExhausted, each partition independently falls back to memory-limited mode by: 1. Re-executing the left child to get a fresh stream 2. Setting up SpillManager for right-side spilling 3. Switching to incremental chunked loading This removes the right_partition_count == 1 restriction — fallback now works regardless of how many right partitions exist. Each partition independently re-executes the left child on OOM. The fallback is gated on: - Disk manager supports temp files - Join type supports multi-pass (!need_produce_right_in_final) Co-authored-by: Isaac
Address review feedback: instead of stashing the batch that triggered the memory limit in a separate field, push it directly into left_pending_batches. The batch is already in memory, so there is no need for special handling on re-entry. Co-authored-by: Isaac
When transitioning from EmitLeftUnmatched back to BufferingLeft for the next left chunk, the output buffer may still contain a completed batch from the previous ProbeRight phase. Flush it before discarding the current chunk to avoid silently dropping join results. Co-authored-by: Isaac
8f5ceb4 to
e84b5b6
Compare
Add sqllogictest e2e tests for NestedLoopJoinExec memory-limited execution using generate_series with tight memory limits to verify correct results under multi-pass spill. Co-authored-by: Isaac
e84b5b6 to
8350d3a
Compare
Thank you @2010YOUY01. I have added the end-to-end coverage in the slt test. |
…pache#21565) ## Which issue does this PR close? <!-- We generally require a GitHub issue to be filed for all bug fixes and enhancements and this helps us generate change logs for our releases. You can link an issue to this PR using the GitHub syntax. For example `Closes apache#123` indicates that this PR will close issue apache#123. --> - Closes #. ## Rationale for this change LazyMemoryExec::execute() shares the same generator instance across multiple calls via Arc::clone, so a second call to execute(0) continues from where the first left off instead of starting from the beginning. This is inconsistent with how other ExecutionPlan implementations behave, where each execute() call produces an independent stream. This was discovered while writing e2e tests for NestedLoopJoinExec memory-limited execution (apache#21448), where the OOM fallback path re-executes the left child plan and got incomplete results. ## What changes are included in this PR? LazyMemoryExec::execute() was sharing the same generator instance (via Arc::clone) across multiple calls, causing streams to share mutable state. This meant a second call to execute(0) would continue from where the first call left off, instead of starting from the beginning. Fix by calling reset_state() on the generator to create a fresh instance for each execute() call, matching the expected ExecutionPlan semantics that each execute() produces an independent stream. ## Are these changes tested? <!-- We typically require tests for all PRs in order to: 1. Prevent the code from being accidentally broken by subsequent changes 2. Serve as another way to document the expected behavior of the code If tests are not included in your PR, please explain why (for example, are they covered by existing tests)? --> Unit test ## Are there any user-facing changes? No <!-- If there are user-facing changes then we may require documentation to be updated before approving the PR. --> <!-- If there are any breaking changes to public APIs, please add the `api change` label. -->
Which issue does this PR close?
Rationale for this change
NestedLoopJoinExec currently fails with an OOM error when the left (build) side exceeds the memory budget. This PR adds a spill-to-disk fallback so the query can complete instead of crashing.
What changes are included in this PR?
When collect_left_input via OnceFut fails with ResourcesExhausted, each partition independently falls back to a memory-limited multi-pass strategy:
The fallback is transparent — if memory is sufficient, the existing OnceFut path is used with zero overhead. It is currently gated to join types that don't require global right-side bitmap tracking (INNER, LEFT, LEFT SEMI, LEFT ANTI, LEFT MARK). RIGHT/FULL joins retain the existing OOM behavior until adding a cross-chunk right bitmap.
Are these changes tested?
Unit tests
Are there any user-facing changes?
No