Skip to content

Commit 5475ed9

Browse files
committed
fix: prevent hash join deadlock when dynamic filtering is enabled
Empty partitions were short-circuiting to Completed state without reporting to the shared accumulator. This caused other partitions to wait indefinitely at the barrier. This fix ensures that even empty partitions proceed to report their data if a build accumulator is present. Made-with: Cursor
1 parent 2818abb commit 5475ed9

File tree

1 file changed

+19
-10
lines changed
  • datafusion/physical-plan/src/joins/hash_join

1 file changed

+19
-10
lines changed

datafusion/physical-plan/src/joins/hash_join/stream.rs

Lines changed: 19 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -408,12 +408,9 @@ impl HashJoinStream {
408408

409409
/// Returns the next state after the build side has been fully collected
410410
/// and any required build-side coordination has completed.
411-
fn state_after_build_ready(
412-
join_type: JoinType,
413-
left_data: &JoinLeftData,
414-
) -> HashJoinStreamState {
411+
fn state_after_build_ready(&self, left_data: &JoinLeftData) -> HashJoinStreamState {
415412
if left_data.map().is_empty()
416-
&& join_type.empty_build_side_produces_empty_result()
413+
&& self.join_type.empty_build_side_produces_empty_result()
417414
{
418415
HashJoinStreamState::Completed
419416
} else {
@@ -485,8 +482,7 @@ impl HashJoinStream {
485482
ready!(fut.get_shared(cx))?;
486483
}
487484
let build_side = self.build_side.try_as_ready()?;
488-
self.state =
489-
Self::state_after_build_ready(self.join_type, build_side.left_data.as_ref());
485+
self.state = self.state_after_build_ready(build_side.left_data.as_ref());
490486
Poll::Ready(Ok(StatefulStreamResult::Continue))
491487
}
492488

@@ -557,8 +553,7 @@ impl HashJoinStream {
557553
}));
558554
self.state = HashJoinStreamState::WaitPartitionBoundsReport;
559555
} else {
560-
self.state =
561-
Self::state_after_build_ready(self.join_type, left_data.as_ref());
556+
self.state = self.state_after_build_ready(left_data.as_ref());
562557
}
563558

564559
self.build_side = BuildSide::Ready(BuildSideReadyState { left_data });
@@ -668,7 +663,21 @@ impl HashJoinStream {
668663
if is_empty {
669664
// Invariant: state_after_build_ready should have already completed
670665
// join types whose result is fixed to empty when the build side is empty.
671-
debug_assert!(!self.join_type.empty_build_side_produces_empty_result());
666+
//
667+
// However, when dynamic filtering is enabled, we skip the short-circuit
668+
// in state_after_build_ready to ensure all partitions report their
669+
// build-side data. In this case, we might reach here with an empty
670+
// build side even for join types that produce empty results.
671+
if self.build_accumulator.is_none() {
672+
debug_assert!(!self.join_type.empty_build_side_produces_empty_result());
673+
}
674+
675+
if self.join_type.empty_build_side_produces_empty_result() {
676+
timer.done();
677+
self.state = HashJoinStreamState::FetchProbeBatch;
678+
return Ok(StatefulStreamResult::Continue);
679+
}
680+
672681
let result = build_batch_empty_build_side(
673682
&self.schema,
674683
build_side.left_data.batch(),

0 commit comments

Comments
 (0)