Skip to content

Commit

Permalink
adjusted tests
Browse files Browse the repository at this point in the history
  • Loading branch information
jonathanc-n committed Nov 5, 2024
1 parent 03d1aa1 commit feac971
Show file tree
Hide file tree
Showing 5 changed files with 10 additions and 37 deletions.
24 changes: 0 additions & 24 deletions datafusion/core/tests/fuzz_cases/join_fuzz.rs
Original file line number Diff line number Diff line change
Expand Up @@ -258,30 +258,6 @@ async fn test_left_mark_join_1k_filtered() {
.await
}

#[tokio::test]
async fn test_right_mark_join_1k() {
JoinFuzzTestCase::new(
make_staggered_batches(1000),
make_staggered_batches(1000),
JoinType::RightMark,
None,
)
.run_test(&[JoinTestType::HjSmj, JoinTestType::NljHj], false)
.await
}

#[tokio::test]
async fn test_right_mark_join_1k_filtered() {
JoinFuzzTestCase::new(
make_staggered_batches(1000),
make_staggered_batches(1000),
JoinType::RightMark,
Some(Box::new(col_lt_col_filter)),
)
.run_test(&[JoinTestType::HjSmj, JoinTestType::NljHj], false)
.await
}

type JoinFilterBuilder = Box<dyn Fn(Arc<Schema>, Arc<Schema>) -> JoinFilter>;

struct JoinFuzzTestCase {
Expand Down
2 changes: 0 additions & 2 deletions datafusion/expr/src/logical_plan/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1334,7 +1334,6 @@ fn mark_field(schema: &DFSchema) -> (Option<TableReference>, Arc<Field>) {
.filter_map(|(qualifier, _)| qualifier)
.collect::<Vec<_>>();
table_references.dedup();
println!("MARKIN FIELDS");
let table_reference = if table_references.len() == 1 {
table_references.pop().cloned()
} else {
Expand Down Expand Up @@ -1428,7 +1427,6 @@ pub fn build_join_schema(
.chain(once(mark_field(left)))
.collect(),
};
// println!("{:?}", right_fields.);
let func_dependencies = left.functional_dependencies().join(
right.functional_dependencies(),
join_type,
Expand Down
15 changes: 8 additions & 7 deletions datafusion/optimizer/src/optimize_projections/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -678,20 +678,21 @@ fn split_join_requirements(
) -> (RequiredIndicies, RequiredIndicies) {
match join_type {
// In these cases requirements are split between left/right children:
JoinType::Inner | JoinType::Left | JoinType::Right | JoinType::Full => {
JoinType::Inner
| JoinType::Left
| JoinType::Right
| JoinType::Full
| JoinType::LeftMark
| JoinType::RightMark => {
// Decrease right side indices by `left_len` so that they point to valid
// positions within the right child:
indices.split_off(left_len)
}
// All requirements can be re-routed to left child directly.
JoinType::LeftAnti | JoinType::LeftSemi | JoinType::LeftMark => {
(indices, RequiredIndicies::new())
}
JoinType::LeftAnti | JoinType::LeftSemi => (indices, RequiredIndicies::new()),
// All requirements can be re-routed to right side directly.
// No need to change index, join schema is right child schema.
JoinType::RightSemi | JoinType::RightAnti | JoinType::RightMark => {
(RequiredIndicies::new(), indices)
}
JoinType::RightSemi | JoinType::RightAnti => (RequiredIndicies::new(), indices),
}
}

Expand Down
2 changes: 1 addition & 1 deletion datafusion/physical-plan/src/joins/sort_merge_join.rs
Original file line number Diff line number Diff line change
Expand Up @@ -794,7 +794,7 @@ fn get_corrected_filter_mask(
corrected_mask.extend(vec![Some(false); null_matched]);
Some(corrected_mask.finish())
}
JoinType::LeftMark => {
JoinType::LeftMark | JoinType::RightMark => {
for i in 0..row_indices_length {
let last_index =
last_index_for_row(i, row_indices, batch_ids, row_indices_length);
Expand Down
4 changes: 1 addition & 3 deletions datafusion/physical-plan/src/joins/utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1357,9 +1357,7 @@ pub(crate) fn adjust_indices_by_join_type(
// the left_indices will not be used later for the `right anti` join
Ok((left_indices, right_indices))
}
JoinType::LeftSemi
| JoinType::LeftAnti
| JoinType::LeftMark => {
JoinType::LeftSemi | JoinType::LeftAnti | JoinType::LeftMark => {
// matched or unmatched left row will be produced in the end of loop
// When visit the right batch, we can output the matched left row and don't need to wait the end of loop
Ok((
Expand Down

0 comments on commit feac971

Please sign in to comment.