Skip to content

Commit 0a82ac3

Browse files
authored
minor: improve join fuzz tests debug kit (#12397)
1 parent 09f7592 commit 0a82ac3

File tree

1 file changed

+63
-60
lines changed

1 file changed

+63
-60
lines changed

datafusion/core/tests/fuzz_cases/join_fuzz.rs

+63-60
Original file line numberDiff line numberDiff line change
@@ -15,13 +15,13 @@
1515
// specific language governing permissions and limitations
1616
// under the License.
1717

18-
use std::sync::Arc;
19-
2018
use arrow::array::{ArrayRef, Int32Array};
2119
use arrow::compute::SortOptions;
2220
use arrow::record_batch::RecordBatch;
2321
use arrow::util::pretty::pretty_format_batches;
2422
use arrow_schema::Schema;
23+
use std::sync::Arc;
24+
use std::time::SystemTime;
2525

2626
use datafusion_common::ScalarValue;
2727
use datafusion_physical_expr::expressions::Literal;
@@ -474,11 +474,34 @@ impl JoinFuzzTestCase {
474474
let smj_rows = smj_collected.iter().fold(0, |acc, b| acc + b.num_rows());
475475
let nlj_rows = nlj_collected.iter().fold(0, |acc, b| acc + b.num_rows());
476476

477-
if debug {
477+
// compare
478+
let smj_formatted =
479+
pretty_format_batches(&smj_collected).unwrap().to_string();
480+
let hj_formatted = pretty_format_batches(&hj_collected).unwrap().to_string();
481+
let nlj_formatted =
482+
pretty_format_batches(&nlj_collected).unwrap().to_string();
483+
484+
let mut smj_formatted_sorted: Vec<&str> =
485+
smj_formatted.trim().lines().collect();
486+
smj_formatted_sorted.sort_unstable();
487+
488+
let mut hj_formatted_sorted: Vec<&str> =
489+
hj_formatted.trim().lines().collect();
490+
hj_formatted_sorted.sort_unstable();
491+
492+
let mut nlj_formatted_sorted: Vec<&str> =
493+
nlj_formatted.trim().lines().collect();
494+
nlj_formatted_sorted.sort_unstable();
495+
496+
if debug
497+
&& ((join_tests.contains(&JoinTestType::NljHj) && nlj_rows != hj_rows)
498+
|| (join_tests.contains(&JoinTestType::HjSmj) && smj_rows != hj_rows))
499+
{
478500
let fuzz_debug = "fuzz_test_debug";
479501
std::fs::remove_dir_all(fuzz_debug).unwrap_or(());
480502
std::fs::create_dir_all(fuzz_debug).unwrap();
481503
let out_dir_name = &format!("{fuzz_debug}/batch_size_{batch_size}");
504+
println!("Test result data mismatch found. HJ rows {}, SMJ rows {}, NLJ rows {}", hj_rows, smj_rows, nlj_rows);
482505
println!("The debug is ON. Input data will be saved to {out_dir_name}");
483506

484507
Self::save_partitioned_batches_as_parquet(
@@ -492,7 +515,15 @@ impl JoinFuzzTestCase {
492515
"input2",
493516
);
494517

495-
if join_tests.contains(&JoinTestType::NljHj) {
518+
if join_tests.contains(&JoinTestType::NljHj)
519+
&& join_tests.contains(&JoinTestType::NljHj)
520+
&& nlj_rows != hj_rows
521+
{
522+
println!("=============== HashJoinExec ==================");
523+
hj_formatted_sorted.iter().for_each(|s| println!("{}", s));
524+
println!("=============== NestedLoopJoinExec ==================");
525+
smj_formatted_sorted.iter().for_each(|s| println!("{}", s));
526+
496527
Self::save_partitioned_batches_as_parquet(
497528
&nlj_collected,
498529
out_dir_name,
@@ -505,7 +536,12 @@ impl JoinFuzzTestCase {
505536
);
506537
}
507538

508-
if join_tests.contains(&JoinTestType::HjSmj) {
539+
if join_tests.contains(&JoinTestType::HjSmj) && smj_rows != hj_rows {
540+
println!("=============== HashJoinExec ==================");
541+
hj_formatted_sorted.iter().for_each(|s| println!("{}", s));
542+
println!("=============== SortMergeJoinExec ==================");
543+
smj_formatted_sorted.iter().for_each(|s| println!("{}", s));
544+
509545
Self::save_partitioned_batches_as_parquet(
510546
&hj_collected,
511547
out_dir_name,
@@ -519,25 +555,6 @@ impl JoinFuzzTestCase {
519555
}
520556
}
521557

522-
// compare
523-
let smj_formatted =
524-
pretty_format_batches(&smj_collected).unwrap().to_string();
525-
let hj_formatted = pretty_format_batches(&hj_collected).unwrap().to_string();
526-
let nlj_formatted =
527-
pretty_format_batches(&nlj_collected).unwrap().to_string();
528-
529-
let mut smj_formatted_sorted: Vec<&str> =
530-
smj_formatted.trim().lines().collect();
531-
smj_formatted_sorted.sort_unstable();
532-
533-
let mut hj_formatted_sorted: Vec<&str> =
534-
hj_formatted.trim().lines().collect();
535-
hj_formatted_sorted.sort_unstable();
536-
537-
let mut nlj_formatted_sorted: Vec<&str> =
538-
nlj_formatted.trim().lines().collect();
539-
nlj_formatted_sorted.sort_unstable();
540-
541558
if join_tests.contains(&JoinTestType::NljHj) {
542559
let err_msg_rowcnt = format!("NestedLoopJoinExec and HashJoinExec produced different row counts, batch_size: {}", batch_size);
543560
assert_eq!(nlj_rows, hj_rows, "{}", err_msg_rowcnt.as_str());
@@ -602,34 +619,6 @@ impl JoinFuzzTestCase {
602619
/// )
603620
/// .run_test(&[JoinTestType::HjSmj], false)
604621
/// .await;
605-
///
606-
/// let ctx: SessionContext = SessionContext::new();
607-
/// let df = ctx
608-
/// .read_parquet(
609-
/// "/tmp/input1/*.parquet",
610-
/// datafusion::prelude::ParquetReadOptions::default(),
611-
/// )
612-
/// .await
613-
/// .unwrap();
614-
/// let left = df.collect().await.unwrap();
615-
///
616-
/// let df = ctx
617-
/// .read_parquet(
618-
/// "/tmp/input2/*.parquet",
619-
/// datafusion::prelude::ParquetReadOptions::default(),
620-
/// )
621-
/// .await
622-
/// .unwrap();
623-
///
624-
/// let right = df.collect().await.unwrap();
625-
/// JoinFuzzTestCase::new(
626-
/// left,
627-
/// right,
628-
/// JoinType::LeftSemi,
629-
/// Some(Box::new(less_than_100_join_filter)),
630-
/// )
631-
/// .run_test()
632-
/// .await
633622
/// }
634623
fn save_partitioned_batches_as_parquet(
635624
input: &[RecordBatch],
@@ -641,9 +630,15 @@ impl JoinFuzzTestCase {
641630
std::fs::create_dir_all(out_path).unwrap();
642631

643632
input.iter().enumerate().for_each(|(idx, batch)| {
644-
let mut file =
645-
std::fs::File::create(format!("{out_path}/file_{}.parquet", idx))
646-
.unwrap();
633+
let file_path = format!("{out_path}/file_{}.parquet", idx);
634+
let mut file = std::fs::File::create(&file_path).unwrap();
635+
println!(
636+
"{}: Saving batch idx {} rows {} to parquet {}",
637+
&out_name,
638+
idx,
639+
batch.num_rows(),
640+
&file_path
641+
);
647642
let mut writer = parquet::arrow::ArrowWriter::try_new(
648643
&mut file,
649644
input.first().unwrap().schema(),
@@ -653,8 +648,6 @@ impl JoinFuzzTestCase {
653648
writer.write(batch).unwrap();
654649
writer.close().unwrap();
655650
});
656-
657-
println!("The data {out_name} saved as parquet into {out_path}");
658651
}
659652

660653
/// Read parquet files preserving partitions, i.e. 1 file -> 1 partition
@@ -667,10 +660,20 @@ impl JoinFuzzTestCase {
667660
) -> std::io::Result<Vec<RecordBatch>> {
668661
let ctx: SessionContext = SessionContext::new();
669662
let mut batches: Vec<RecordBatch> = vec![];
663+
let mut entries = std::fs::read_dir(dir)?
664+
.map(|res| res.map(|e| e.path()))
665+
.collect::<Result<Vec<_>, std::io::Error>>()?;
666+
667+
// important to read files using the same order as they have been written
668+
// sort by modification time
669+
entries.sort_by_key(|path| {
670+
std::fs::metadata(path)
671+
.and_then(|metadata| metadata.modified())
672+
.unwrap_or(SystemTime::UNIX_EPOCH)
673+
});
670674

671-
for entry in std::fs::read_dir(dir)? {
672-
let entry = entry?;
673-
let path = entry.path();
675+
for entry in entries {
676+
let path = entry.as_path();
674677

675678
if path.is_file() {
676679
let mut batch = ctx

0 commit comments

Comments
 (0)