Skip to content

Commit

Permalink
Move SMJ join filtered part out of join_output stage. LeftOuter, Left…
Browse files Browse the repository at this point in the history
…Semi (apache#12764)

* WIP: move filtered join out of join_output stage

* WIP: move filtered join out of join_output stage

* WIP: move filtered join out of join_output stage

* cleanup

* cleanup

* Move Left/LeftAnti filtered SMJ join out of join partial stage

* Move Left/LeftAnti filtered SMJ join out of join partial stage

* Address comments
  • Loading branch information
comphead authored Oct 18, 2024
1 parent 42f9060 commit 3405234
Show file tree
Hide file tree
Showing 3 changed files with 1,061 additions and 524 deletions.
12 changes: 4 additions & 8 deletions datafusion/core/tests/fuzz_cases/join_fuzz.rs
Original file line number Diff line number Diff line change
Expand Up @@ -125,16 +125,14 @@ async fn test_left_join_1k() {
}

#[tokio::test]
// flaky for HjSmj case
// https://github.com/apache/datafusion/issues/12359
async fn test_left_join_1k_filtered() {
JoinFuzzTestCase::new(
make_staggered_batches(1000),
make_staggered_batches(1000),
JoinType::Left,
Some(Box::new(col_lt_col_filter)),
)
.run_test(&[JoinTestType::NljHj], false)
.run_test(&[JoinTestType::HjSmj, JoinTestType::NljHj], false)
.await
}

Expand Down Expand Up @@ -229,6 +227,7 @@ async fn test_anti_join_1k() {
#[tokio::test]
// flaky for HjSmj case, giving 1 rows difference sometimes
// https://github.com/apache/datafusion/issues/11555
#[ignore]
async fn test_anti_join_1k_filtered() {
JoinFuzzTestCase::new(
make_staggered_batches(1000),
Expand Down Expand Up @@ -515,14 +514,11 @@ impl JoinFuzzTestCase {
"input2",
);

if join_tests.contains(&JoinTestType::NljHj)
&& join_tests.contains(&JoinTestType::NljHj)
&& nlj_rows != hj_rows
{
if join_tests.contains(&JoinTestType::NljHj) && nlj_rows != hj_rows {
println!("=============== HashJoinExec ==================");
hj_formatted_sorted.iter().for_each(|s| println!("{}", s));
println!("=============== NestedLoopJoinExec ==================");
smj_formatted_sorted.iter().for_each(|s| println!("{}", s));
nlj_formatted_sorted.iter().for_each(|s| println!("{}", s));

Self::save_partitioned_batches_as_parquet(
&nlj_collected,
Expand Down
Loading

0 comments on commit 3405234

Please sign in to comment.