Skip to content

Commit

Permalink
Minor: SMJ fuzz tests fix for rowcounts (apache#10891)
Browse files Browse the repository at this point in the history
* Fix: Sort Merge Join crashes on TPCH Q21

* Fix LeftAnti SMJ join when the join filter is set

* rm dbg

* Minor: Fix fuzz testing row counts
  • Loading branch information
comphead authored Jun 12, 2024
1 parent dfdda7c commit 908a3a1
Showing 1 changed file with 31 additions and 24 deletions.
55 changes: 31 additions & 24 deletions datafusion/core/tests/fuzz_cases/join_fuzz.rs
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ async fn test_inner_join_1k() {
.await
}

fn less_than_10_join_filter(schema1: Arc<Schema>, _schema2: Arc<Schema>) -> JoinFilter {
fn less_than_100_join_filter(schema1: Arc<Schema>, _schema2: Arc<Schema>) -> JoinFilter {
let less_than_100 = Arc::new(BinaryExpr::new(
Arc::new(Column::new("a", 0)),
Operator::Lt,
Expand All @@ -77,7 +77,7 @@ async fn test_inner_join_1k_filtered() {
make_staggered_batches(1000),
make_staggered_batches(1000),
JoinType::Inner,
Some(Box::new(less_than_10_join_filter)),
Some(Box::new(less_than_100_join_filter)),
)
.run_test()
.await
Expand Down Expand Up @@ -113,7 +113,7 @@ async fn test_left_join_1k_filtered() {
make_staggered_batches(1000),
make_staggered_batches(1000),
JoinType::Left,
Some(Box::new(less_than_10_join_filter)),
Some(Box::new(less_than_100_join_filter)),
)
.run_test()
.await
Expand All @@ -138,7 +138,7 @@ async fn test_right_join_1k_filtered() {
make_staggered_batches(1000),
make_staggered_batches(1000),
JoinType::Right,
Some(Box::new(less_than_10_join_filter)),
Some(Box::new(less_than_100_join_filter)),
)
.run_test()
.await
Expand All @@ -162,7 +162,7 @@ async fn test_full_join_1k_filtered() {
make_staggered_batches(1000),
make_staggered_batches(1000),
JoinType::Full,
Some(Box::new(less_than_10_join_filter)),
Some(Box::new(less_than_100_join_filter)),
)
.run_test()
.await
Expand All @@ -179,15 +179,14 @@ async fn test_semi_join_1k() {
.run_test()
.await
}
// See https://github.com/apache/datafusion/issues/10886
#[ignore]

#[tokio::test]
async fn test_semi_join_1k_filtered() {
JoinFuzzTestCase::new(
make_staggered_batches(1000),
make_staggered_batches(1000),
JoinType::LeftSemi,
Some(Box::new(less_than_10_join_filter)),
Some(Box::new(less_than_100_join_filter)),
)
.run_test()
.await
Expand All @@ -213,7 +212,7 @@ async fn test_anti_join_1k_filtered() {
make_staggered_batches(1000),
make_staggered_batches(1000),
JoinType::LeftAnti,
Some(Box::new(less_than_10_join_filter)),
Some(Box::new(less_than_100_join_filter)),
)
.run_test()
.await
Expand Down Expand Up @@ -392,6 +391,15 @@ impl JoinFuzzTestCase {
let hj = self.hash_join();
let hj_collected = collect(hj, task_ctx.clone()).await.unwrap();

// Get actual row counts(without formatting overhead) for HJ and SMJ
let hj_rows = hj_collected.iter().fold(0, |acc, b| acc + b.num_rows());
let smj_rows = smj_collected.iter().fold(0, |acc, b| acc + b.num_rows());

assert_eq!(
hj_rows, smj_rows,
"SortMergeJoinExec and HashJoinExec produced different row counts"
);

let nlj = self.nested_loop_join();
let nlj_collected = collect(nlj, task_ctx.clone()).await.unwrap();

Expand All @@ -414,21 +422,20 @@ impl JoinFuzzTestCase {
nlj_formatted.trim().lines().collect();
nlj_formatted_sorted.sort_unstable();

assert_eq!(
smj_formatted_sorted.len(),
hj_formatted_sorted.len(),
"SortMergeJoinExec and HashJoinExec produced different row counts"
);
for (i, (smj_line, hj_line)) in smj_formatted_sorted
.iter()
.zip(&hj_formatted_sorted)
.enumerate()
{
assert_eq!(
(i, smj_line),
(i, hj_line),
"SortMergeJoinExec and HashJoinExec produced different results"
);
// row level compare if any of joins returns the result
// the reason is different formatting when there is no rows
if smj_rows > 0 || hj_rows > 0 {
for (i, (smj_line, hj_line)) in smj_formatted_sorted
.iter()
.zip(&hj_formatted_sorted)
.enumerate()
{
assert_eq!(
(i, smj_line),
(i, hj_line),
"SortMergeJoinExec and HashJoinExec produced different results"
);
}
}

for (i, (nlj_line, hj_line)) in nlj_formatted_sorted
Expand Down

0 comments on commit 908a3a1

Please sign in to comment.