Skip to content

Commit 6c0e4fb

Browse files
authored
SMJ: fix streaming row concurrency issue for LEFT SEMI filtered join (#11041)
* Fix: Sort Merge Join crashes on TPCH Q21 * Fix LeftAnti SMJ join when the join filter is set * rm dbg * Minor: disable fuzz test to avoid CI spontaneous failures * Minor: disable fuzz test to avoid CI spontaneous failures * Fix: Sort Merge Join crashes on TPCH Q21 * Fix LeftAnti SMJ join when the join filter is set * rm dbg * Minor: disable fuzz test to avoid CI spontaneous failures * Minor: disable fuzz test to avoid CI spontaneous failures * Minor: Add routine to debug join fuzz tests * Minor: Add routine to debug join fuzz tests * Minor: Add routine to debug join fuzz tests * Minor: Add routine to debug join fuzz tests * Minor: Add routine to debug join fuzz tests * SMJ: fix streaming row concurrency issue for LEFT SEMI filtered join * SMJ: fix streaming row concurrency issue for LEFT SEMI filtered join * SMJ: fix streaming row concurrency issue for LEFT SEMI filtered join
1 parent a22423d commit 6c0e4fb

File tree

2 files changed

+110
-18
lines changed

2 files changed

+110
-18
lines changed

datafusion/core/tests/fuzz_cases/join_fuzz.rs

Lines changed: 89 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -215,10 +215,6 @@ async fn test_semi_join_1k() {
215215
.await
216216
}
217217

218-
// The test is flaky
219-
// https://github.com/apache/datafusion/issues/10886
220-
// SMJ produces 1 more row in the output
221-
#[ignore]
222218
#[tokio::test]
223219
async fn test_semi_join_1k_filtered() {
224220
JoinFuzzTestCase::new(
@@ -442,18 +438,45 @@ impl JoinFuzzTestCase {
442438

443439
if debug {
444440
println!("The debug is ON. Input data will be saved");
445-
let out_dir_name = &format!("fuzz_test_debug_batch_size_{batch_size}");
446-
Self::save_as_parquet(&self.input1, out_dir_name, "input1");
447-
Self::save_as_parquet(&self.input2, out_dir_name, "input2");
441+
let fuzz_debug = "fuzz_test_debug";
442+
std::fs::remove_dir_all(fuzz_debug).unwrap_or(());
443+
std::fs::create_dir_all(fuzz_debug).unwrap();
444+
let out_dir_name = &format!("{fuzz_debug}/batch_size_{batch_size}");
445+
Self::save_partitioned_batches_as_parquet(
446+
&self.input1,
447+
out_dir_name,
448+
"input1",
449+
);
450+
Self::save_partitioned_batches_as_parquet(
451+
&self.input2,
452+
out_dir_name,
453+
"input2",
454+
);
448455

449456
if join_tests.contains(&JoinTestType::NljHj) {
450-
Self::save_as_parquet(&nlj_collected, out_dir_name, "nlj");
451-
Self::save_as_parquet(&hj_collected, out_dir_name, "hj");
457+
Self::save_partitioned_batches_as_parquet(
458+
&nlj_collected,
459+
out_dir_name,
460+
"nlj",
461+
);
462+
Self::save_partitioned_batches_as_parquet(
463+
&hj_collected,
464+
out_dir_name,
465+
"hj",
466+
);
452467
}
453468

454469
if join_tests.contains(&JoinTestType::HjSmj) {
455-
Self::save_as_parquet(&hj_collected, out_dir_name, "hj");
456-
Self::save_as_parquet(&smj_collected, out_dir_name, "smj");
470+
Self::save_partitioned_batches_as_parquet(
471+
&hj_collected,
472+
out_dir_name,
473+
"hj",
474+
);
475+
Self::save_partitioned_batches_as_parquet(
476+
&smj_collected,
477+
out_dir_name,
478+
"smj",
479+
);
457480
}
458481
}
459482

@@ -527,11 +550,26 @@ impl JoinFuzzTestCase {
527550
/// as a parquet files preserving partitioning.
528551
/// Once the data is saved it is possible to run a custom test on top of the saved data and debug
529552
///
553+
/// #[tokio::test]
554+
/// async fn test1() {
555+
/// let left: Vec<RecordBatch> = JoinFuzzTestCase::load_partitioned_batches_from_parquet("fuzz_test_debug/batch_size_2/input1").await.unwrap();
556+
/// let right: Vec<RecordBatch> = JoinFuzzTestCase::load_partitioned_batches_from_parquet("fuzz_test_debug/batch_size_2/input2").await.unwrap();
557+
///
558+
/// JoinFuzzTestCase::new(
559+
/// left,
560+
/// right,
561+
/// JoinType::LeftSemi,
562+
/// Some(Box::new(col_lt_col_filter)),
563+
/// )
564+
/// .run_test(&[JoinTestType::HjSmj], false)
565+
/// .await
566+
/// }
567+
///
530568
/// let ctx: SessionContext = SessionContext::new();
531569
/// let df = ctx
532570
/// .read_parquet(
533571
/// "/tmp/input1/*.parquet",
534-
/// ParquetReadOptions::default(),
572+
/// datafusion::prelude::ParquetReadOptions::default(),
535573
/// )
536574
/// .await
537575
/// .unwrap();
@@ -540,7 +578,7 @@ impl JoinFuzzTestCase {
540578
/// let df = ctx
541579
/// .read_parquet(
542580
/// "/tmp/input2/*.parquet",
543-
/// ParquetReadOptions::default(),
581+
/// datafusion::prelude::ParquetReadOptions::default(),
544582
/// )
545583
/// .await
546584
/// .unwrap();
@@ -554,8 +592,11 @@ impl JoinFuzzTestCase {
554592
/// )
555593
/// .run_test()
556594
/// .await
557-
/// }
558-
fn save_as_parquet(input: &[RecordBatch], output_dir: &str, out_name: &str) {
595+
fn save_partitioned_batches_as_parquet(
596+
input: &[RecordBatch],
597+
output_dir: &str,
598+
out_name: &str,
599+
) {
559600
let out_path = &format!("{output_dir}/{out_name}");
560601
std::fs::remove_dir_all(out_path).unwrap_or(());
561602
std::fs::create_dir_all(out_path).unwrap();
@@ -576,6 +617,39 @@ impl JoinFuzzTestCase {
576617

577618
println!("The data {out_name} saved as parquet into {out_path}");
578619
}
620+
621+
/// Read parquet files preserving partitions, i.e. 1 file -> 1 partition
622+
/// Files can be of different sizes
623+
/// The method can be useful to read partitions have been saved by `save_partitioned_batches_as_parquet`
624+
/// for test debugging purposes
625+
#[allow(dead_code)]
626+
async fn load_partitioned_batches_from_parquet(
627+
dir: &str,
628+
) -> std::io::Result<Vec<RecordBatch>> {
629+
let ctx: SessionContext = SessionContext::new();
630+
let mut batches: Vec<RecordBatch> = vec![];
631+
632+
for entry in std::fs::read_dir(dir)? {
633+
let entry = entry?;
634+
let path = entry.path();
635+
636+
if path.is_file() {
637+
let mut batch = ctx
638+
.read_parquet(
639+
path.to_str().unwrap(),
640+
datafusion::prelude::ParquetReadOptions::default(),
641+
)
642+
.await
643+
.unwrap()
644+
.collect()
645+
.await
646+
.unwrap();
647+
648+
batches.append(&mut batch);
649+
}
650+
}
651+
Ok(batches)
652+
}
579653
}
580654

581655
/// Return randomly sized record batches with:

datafusion/physical-plan/src/joins/sort_merge_join.rs

Lines changed: 21 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1532,17 +1532,21 @@ fn get_filtered_join_mask(
15321532
for i in 0..streamed_indices_length {
15331533
// LeftSemi respects only first true values for specific streaming index,
15341534
// others true values for the same index must be false
1535-
if mask.value(i) && !seen_as_true {
1535+
let streamed_idx = streamed_indices.value(i);
1536+
if mask.value(i)
1537+
&& !seen_as_true
1538+
&& !matched_indices.contains(&streamed_idx)
1539+
{
15361540
seen_as_true = true;
15371541
corrected_mask.append_value(true);
1538-
filter_matched_indices.push(streamed_indices.value(i));
1542+
filter_matched_indices.push(streamed_idx);
15391543
} else {
15401544
corrected_mask.append_value(false);
15411545
}
15421546

15431547
// if switched to next streaming index(e.g. from 0 to 1, or from 1 to 2), we reset seen_as_true flag
15441548
if i < streamed_indices_length - 1
1545-
&& streamed_indices.value(i) != streamed_indices.value(i + 1)
1549+
&& streamed_idx != streamed_indices.value(i + 1)
15461550
{
15471551
seen_as_true = false;
15481552
}
@@ -2940,6 +2944,20 @@ mod tests {
29402944
))
29412945
);
29422946

2947+
assert_eq!(
2948+
get_filtered_join_mask(
2949+
LeftSemi,
2950+
&UInt64Array::from(vec![0, 0, 0, 1, 1, 1]),
2951+
&BooleanArray::from(vec![true, false, false, false, false, true]),
2952+
&HashSet::from_iter(vec![1]),
2953+
&0,
2954+
),
2955+
Some((
2956+
BooleanArray::from(vec![true, false, false, false, false, false]),
2957+
vec![0]
2958+
))
2959+
);
2960+
29432961
Ok(())
29442962
}
29452963

0 commit comments

Comments
 (0)