From a6c2e06c46eef334d5973157906dcbbb8bb1eb04 Mon Sep 17 00:00:00 2001 From: Jonathan Chen Date: Mon, 4 Nov 2024 12:52:22 -0500 Subject: [PATCH] draft right mark --- .../common/src/functional_dependencies.rs | 2 +- datafusion/common/src/join_type.rs | 9 +- datafusion/core/src/dataframe/mod.rs | 4 +- .../enforce_distribution.rs | 7 +- .../src/physical_optimizer/join_selection.rs | 3 + .../src/physical_optimizer/sort_pushdown.rs | 3 +- datafusion/core/tests/fuzz_cases/join_fuzz.rs | 24 +++++ datafusion/expr/src/logical_plan/builder.rs | 4 + datafusion/expr/src/logical_plan/plan.rs | 8 +- datafusion/optimizer/src/analyzer/subquery.rs | 2 +- .../optimizer/src/optimize_projections/mod.rs | 3 +- datafusion/optimizer/src/push_down_filter.rs | 9 +- datafusion/optimizer/src/push_down_limit.rs | 2 +- .../physical-expr/src/equivalence/class.rs | 2 +- .../physical-plan/src/joins/hash_join.rs | 101 ++++++++++++++++++ .../src/joins/nested_loop_join.rs | 31 ++++++ .../src/joins/sort_merge_join.rs | 4 +- .../src/joins/symmetric_hash_join.rs | 27 ++++- datafusion/physical-plan/src/joins/utils.rs | 19 +++- .../proto/datafusion_common.proto | 1 + .../proto-common/src/generated/pbjson.rs | 3 + .../proto-common/src/generated/prost.rs | 3 + .../src/generated/datafusion_proto_common.rs | 3 + .../proto/src/logical_plan/from_proto.rs | 1 + datafusion/proto/src/logical_plan/to_proto.rs | 1 + datafusion/sql/src/unparser/plan.rs | 2 +- .../substrait/src/logical_plan/consumer.rs | 3 +- .../substrait/src/logical_plan/producer.rs | 2 +- 28 files changed, 256 insertions(+), 27 deletions(-) diff --git a/datafusion/common/src/functional_dependencies.rs b/datafusion/common/src/functional_dependencies.rs index 31eafc744390..f22df66a1805 100644 --- a/datafusion/common/src/functional_dependencies.rs +++ b/datafusion/common/src/functional_dependencies.rs @@ -338,7 +338,7 @@ impl FunctionalDependencies { // These joins preserve functional dependencies of the left side: left_func_dependencies } - JoinType::RightSemi | JoinType::RightAnti => { + JoinType::RightSemi | JoinType::RightAnti | JoinType::RightMark => { // These joins preserve functional dependencies of the right side: right_func_dependencies } diff --git a/datafusion/common/src/join_type.rs b/datafusion/common/src/join_type.rs index e98f34199b27..5f6798ad586a 100644 --- a/datafusion/common/src/join_type.rs +++ b/datafusion/common/src/join_type.rs @@ -44,7 +44,7 @@ pub enum JoinType { LeftAnti, /// Right Anti Join RightAnti, - /// Left Mark join + /// Left Mark Join /// /// Returns one record for each record from the left input. The output contains an additional /// column "mark" which is true if there is at least one match in the right input where the @@ -58,6 +58,11 @@ pub enum JoinType { /// /// [1]: http://btw2017.informatik.uni-stuttgart.de/slidesandpapers/F1-10-37/paper_web.pdf LeftMark, + /// Righ Mark Join + /// + /// Same logic as the LeftMark Join above, however it returns a record for each record from the + /// right input. + RightMark, } impl JoinType { @@ -78,6 +83,7 @@ impl Display for JoinType { JoinType::LeftAnti => "LeftAnti", JoinType::RightAnti => "RightAnti", JoinType::LeftMark => "LeftMark", + JoinType::RightMark => "RightMark", }; write!(f, "{join_type}") } @@ -98,6 +104,7 @@ impl FromStr for JoinType { "LEFTANTI" => Ok(JoinType::LeftAnti), "RIGHTANTI" => Ok(JoinType::RightAnti), "LEFTMARK" => Ok(JoinType::LeftMark), + "RIGHTMARK" => Ok(JoinType::RightMark), _ => _not_impl_err!("The join type {s} does not exist or is not implemented"), } } diff --git a/datafusion/core/src/dataframe/mod.rs b/datafusion/core/src/dataframe/mod.rs index 2c71cb80d755..78a04e177a3c 100644 --- a/datafusion/core/src/dataframe/mod.rs +++ b/datafusion/core/src/dataframe/mod.rs @@ -3865,6 +3865,7 @@ mod tests { JoinType::LeftAnti, JoinType::RightAnti, JoinType::LeftMark, + JoinType::RightMark, ]; let default_partition_count = SessionConfig::new().target_partitions(); @@ -3898,7 +3899,8 @@ mod tests { JoinType::Inner | JoinType::Right | JoinType::RightSemi - | JoinType::RightAnti => { + | JoinType::RightAnti + | JoinType::RightMark => { let right_exprs: Vec> = vec![ Arc::new(Column::new_with_schema("c2_c1", &join_schema)?), Arc::new(Column::new_with_schema("c2_c2", &join_schema)?), diff --git a/datafusion/core/src/physical_optimizer/enforce_distribution.rs b/datafusion/core/src/physical_optimizer/enforce_distribution.rs index ff8f16f4ee9c..9037ae0c4e36 100644 --- a/datafusion/core/src/physical_optimizer/enforce_distribution.rs +++ b/datafusion/core/src/physical_optimizer/enforce_distribution.rs @@ -322,7 +322,7 @@ fn adjust_input_keys_ordering( left.schema().fields().len(), ) .unwrap_or_default(), - JoinType::RightSemi | JoinType::RightAnti => { + JoinType::RightSemi | JoinType::RightAnti | JoinType::RightMark => { requirements.data.clone() } JoinType::Left @@ -1963,6 +1963,7 @@ pub(crate) mod tests { JoinType::LeftMark, JoinType::RightSemi, JoinType::RightAnti, + JoinType::RightMark, ]; // Join on (a == b1) @@ -2036,7 +2037,7 @@ pub(crate) mod tests { assert_optimized!(expected, top_join.clone(), true); assert_optimized!(expected, top_join, false); } - JoinType::RightSemi | JoinType::RightAnti => {} + JoinType::RightSemi | JoinType::RightAnti | JoinType::RightMark => {} } match join_type { @@ -2101,7 +2102,7 @@ pub(crate) mod tests { assert_optimized!(expected, top_join.clone(), true); assert_optimized!(expected, top_join, false); } - JoinType::LeftSemi | JoinType::LeftAnti | JoinType::LeftMark => {} + JoinType::LeftSemi | JoinType::LeftAnti | JoinType::LeftMark | JoinType::RightMark => {} } } diff --git a/datafusion/core/src/physical_optimizer/join_selection.rs b/datafusion/core/src/physical_optimizer/join_selection.rs index 2bf706f33d60..6bbe87345046 100644 --- a/datafusion/core/src/physical_optimizer/join_selection.rs +++ b/datafusion/core/src/physical_optimizer/join_selection.rs @@ -135,6 +135,9 @@ fn swap_join_type(join_type: JoinType) -> JoinType { JoinType::LeftMark => { unreachable!("LeftMark join type does not support swapping") } + JoinType::RightMark => { + unreachable!("RightMark join type does not support swapping") + } } } diff --git a/datafusion/core/src/physical_optimizer/sort_pushdown.rs b/datafusion/core/src/physical_optimizer/sort_pushdown.rs index fdbda1fe52f7..0b9509e5d767 100644 --- a/datafusion/core/src/physical_optimizer/sort_pushdown.rs +++ b/datafusion/core/src/physical_optimizer/sort_pushdown.rs @@ -427,7 +427,8 @@ fn expr_source_side( | JoinType::Left | JoinType::Right | JoinType::Full - | JoinType::LeftMark => { + | JoinType::LeftMark + | JoinType::RightMark => { let all_column_sides = required_exprs .iter() .filter_map(|r| { diff --git a/datafusion/core/tests/fuzz_cases/join_fuzz.rs b/datafusion/core/tests/fuzz_cases/join_fuzz.rs index d7a3460e4987..03c1850bcc16 100644 --- a/datafusion/core/tests/fuzz_cases/join_fuzz.rs +++ b/datafusion/core/tests/fuzz_cases/join_fuzz.rs @@ -258,6 +258,30 @@ async fn test_left_mark_join_1k_filtered() { .await } +#[tokio::test] +async fn test_right_mark_join_1k() { + JoinFuzzTestCase::new( + make_staggered_batches(1000), + make_staggered_batches(1000), + JoinType::RightMark, + None, + ) + .run_test(&[JoinTestType::HjSmj, JoinTestType::NljHj], false) + .await +} + +#[tokio::test] +async fn test_right_mark_join_1k_filtered() { + JoinFuzzTestCase::new( + make_staggered_batches(1000), + make_staggered_batches(1000), + JoinType::RightMark, + Some(Box::new(col_lt_col_filter)), + ) + .run_test(&[JoinTestType::HjSmj, JoinTestType::NljHj], false) + .await +} + type JoinFilterBuilder = Box, Arc) -> JoinFilter>; struct JoinFuzzTestCase { diff --git a/datafusion/expr/src/logical_plan/builder.rs b/datafusion/expr/src/logical_plan/builder.rs index b7839c4873af..6c241ee2d48f 100644 --- a/datafusion/expr/src/logical_plan/builder.rs +++ b/datafusion/expr/src/logical_plan/builder.rs @@ -1422,6 +1422,10 @@ pub fn build_join_schema( .map(|(q, f)| (q.cloned(), Arc::clone(f))) .collect() } + JoinType::RightMark => right_fields + .map(|(q, f)| (q.cloned(), Arc::clone(f))) + .chain(once(mark_field(left))) + .collect(), }; let func_dependencies = left.functional_dependencies().join( right.functional_dependencies(), diff --git a/datafusion/expr/src/logical_plan/plan.rs b/datafusion/expr/src/logical_plan/plan.rs index 191a42e38e3a..daa35cbb14a2 100644 --- a/datafusion/expr/src/logical_plan/plan.rs +++ b/datafusion/expr/src/logical_plan/plan.rs @@ -539,7 +539,9 @@ impl LogicalPlan { JoinType::LeftSemi | JoinType::LeftAnti | JoinType::LeftMark => { left.head_output_expr() } - JoinType::RightSemi | JoinType::RightAnti => right.head_output_expr(), + JoinType::RightSemi | JoinType::RightAnti | JoinType::RightMark => { + right.head_output_expr() + } }, LogicalPlan::RecursiveQuery(RecursiveQuery { static_term, .. }) => { static_term.head_output_expr() @@ -1309,7 +1311,9 @@ impl LogicalPlan { JoinType::LeftSemi | JoinType::LeftAnti | JoinType::LeftMark => { left.max_rows() } - JoinType::RightSemi | JoinType::RightAnti => right.max_rows(), + JoinType::RightSemi | JoinType::RightAnti | JoinType::RightMark => { + right.max_rows() + } }, LogicalPlan::Repartition(Repartition { input, .. }) => input.max_rows(), LogicalPlan::Union(Union { inputs, .. }) => inputs diff --git a/datafusion/optimizer/src/analyzer/subquery.rs b/datafusion/optimizer/src/analyzer/subquery.rs index fa04835f0967..56e052936e53 100644 --- a/datafusion/optimizer/src/analyzer/subquery.rs +++ b/datafusion/optimizer/src/analyzer/subquery.rs @@ -188,7 +188,7 @@ fn check_inner_plan(inner_plan: &LogicalPlan, can_contain_outer_ref: bool) -> Re check_inner_plan(left, can_contain_outer_ref)?; check_inner_plan(right, false) } - JoinType::Right | JoinType::RightSemi | JoinType::RightAnti => { + JoinType::Right | JoinType::RightSemi | JoinType::RightAnti | JoinType::RightMark => { check_inner_plan(left, false)?; check_inner_plan(right, can_contain_outer_ref) } diff --git a/datafusion/optimizer/src/optimize_projections/mod.rs b/datafusion/optimizer/src/optimize_projections/mod.rs index ec2225bbc042..fad6812ad0b1 100644 --- a/datafusion/optimizer/src/optimize_projections/mod.rs +++ b/datafusion/optimizer/src/optimize_projections/mod.rs @@ -682,7 +682,8 @@ fn split_join_requirements( | JoinType::Left | JoinType::Right | JoinType::Full - | JoinType::LeftMark => { + | JoinType::LeftMark + | JoinType::RightMark => { // Decrease right side indices by `left_len` so that they point to valid // positions within the right child: indices.split_off(left_len) diff --git a/datafusion/optimizer/src/push_down_filter.rs b/datafusion/optimizer/src/push_down_filter.rs index 269ce2910074..80739edfdaa5 100644 --- a/datafusion/optimizer/src/push_down_filter.rs +++ b/datafusion/optimizer/src/push_down_filter.rs @@ -162,11 +162,11 @@ pub(crate) fn lr_is_preserved(join_type: JoinType) -> (bool, bool) { JoinType::Right => (false, true), JoinType::Full => (false, false), // No columns from the right side of the join can be referenced in output - // predicates for semi/anti joins, so whether we specify t/f doesn't matter. + // predicates for semi/anti/mark joins, so whether we specify t/f doesn't matter. JoinType::LeftSemi | JoinType::LeftAnti | JoinType::LeftMark => (true, false), // No columns from the left side of the join can be referenced in output - // predicates for semi/anti joins, so whether we specify t/f doesn't matter. - JoinType::RightSemi | JoinType::RightAnti => (false, true), + // predicates for semi/anti/mark joins, so whether we specify t/f doesn't matter. + JoinType::RightSemi | JoinType::RightAnti | JoinType::RightMark => (false, true), } } @@ -189,6 +189,7 @@ pub(crate) fn on_lr_is_preserved(join_type: JoinType) -> (bool, bool) { JoinType::LeftAnti => (false, true), JoinType::RightAnti => (true, false), JoinType::LeftMark => (false, true), + JoinType::RightMark => (true, false), } } @@ -740,7 +741,7 @@ fn infer_join_predicates_from_on_filters( inferred_predicates, ) } - JoinType::Right | JoinType::RightSemi => { + JoinType::Right | JoinType::RightSemi | JoinType::RightMark => { infer_join_predicates_impl::( join_col_keys, on_filters, diff --git a/datafusion/optimizer/src/push_down_limit.rs b/datafusion/optimizer/src/push_down_limit.rs index 8a3aa4bb8459..0a3900741c0e 100644 --- a/datafusion/optimizer/src/push_down_limit.rs +++ b/datafusion/optimizer/src/push_down_limit.rs @@ -249,7 +249,7 @@ fn push_down_join(mut join: Join, limit: usize) -> Transformed { match join.join_type { Left | Right | Full | Inner => (Some(limit), Some(limit)), LeftAnti | LeftSemi | LeftMark => (Some(limit), None), - RightAnti | RightSemi => (None, Some(limit)), + RightAnti | RightSemi | RightMark => (None, Some(limit)), } } else { match join.join_type { diff --git a/datafusion/physical-expr/src/equivalence/class.rs b/datafusion/physical-expr/src/equivalence/class.rs index 7305bc1b0a2b..c46b8ede32ed 100644 --- a/datafusion/physical-expr/src/equivalence/class.rs +++ b/datafusion/physical-expr/src/equivalence/class.rs @@ -633,7 +633,7 @@ impl EquivalenceGroup { result } JoinType::LeftSemi | JoinType::LeftAnti | JoinType::LeftMark => self.clone(), - JoinType::RightSemi | JoinType::RightAnti => right_equivalences.clone(), + JoinType::RightSemi | JoinType::RightAnti | JoinType::RightMark => right_equivalences.clone(), } } } diff --git a/datafusion/physical-plan/src/joins/hash_join.rs b/datafusion/physical-plan/src/joins/hash_join.rs index c56c179c17eb..31475cf2f7f8 100644 --- a/datafusion/physical-plan/src/joins/hash_join.rs +++ b/datafusion/physical-plan/src/joins/hash_join.rs @@ -3180,6 +3180,96 @@ mod tests { Ok(()) } + #[apply(batch_sizes)] +#[tokio::test] +async fn join_right_mark(batch_size: usize) -> Result<()> { + let task_ctx = prepare_task_ctx(batch_size); + let left = build_table( + ("a1", &vec![1, 2, 3]), + ("b1", &vec![4, 5, 7]), // 7 does not exist on the right + ("c1", &vec![7, 8, 9]), + ); + let right = build_table( + ("a2", &vec![10, 20, 30]), + ("b1", &vec![4, 5, 6]), // 6 does not exist on the left + ("c2", &vec![70, 80, 90]), + ); + let on = vec![( + Arc::new(Column::new_with_schema("b1", &left.schema())?) as _, + Arc::new(Column::new_with_schema("b1", &right.schema())?) as _, + )]; + + let (columns, batches) = join_collect( + Arc::clone(&left), + Arc::clone(&right), + on.clone(), + &JoinType::RightMark, + false, + task_ctx, + ) + .await?; + assert_eq!(columns, vec!["a2", "b1", "c2", "mark"]); + + let expected = [ + "+----+----+----+-------+", + "| a2 | b1 | c2 | mark |", + "+----+----+----+-------+", + "| 10 | 4 | 70 | true |", + "| 20 | 5 | 80 | true |", + "| 30 | 6 | 90 | false |", + "+----+----+----+-------+", + ]; + assert_batches_sorted_eq!(expected, &batches); + + Ok(()) +} + +#[apply(batch_sizes)] +#[tokio::test] +async fn partitioned_join_right_mark(batch_size: usize) -> Result<()> { + let task_ctx = prepare_task_ctx(batch_size); + let left = build_table( + ("a1", &vec![1, 2, 3]), + ("b1", &vec![4, 5, 7]), // 7 does not exist on the right + ("c1", &vec![7, 8, 9]), + ); + let right = build_table( + ("a2", &vec![10, 20, 30, 40]), + ("b1", &vec![4, 4, 5, 6]), // 6 does not exist on the left + ("c2", &vec![60, 70, 80, 90]), + ); + let on = vec![( + Arc::new(Column::new_with_schema("b1", &left.schema())?) as _, + Arc::new(Column::new_with_schema("b1", &right.schema())?) as _, + )]; + + let (columns, batches) = partitioned_join_collect( + Arc::clone(&left), + Arc::clone(&right), + on.clone(), + &JoinType::RightMark, + false, + task_ctx, + ) + .await?; + assert_eq!(columns, vec!["a2", "b1", "c2", "mark"]); + + let expected = [ + "+----+----+----+-------+", + "| a2 | b1 | c2 | mark |", + "+----+----+----+-------+", + "| 10 | 4 | 60 | true |", + "| 20 | 4 | 70 | true |", + "| 30 | 5 | 80 | true |", + "| 40 | 6 | 90 | false |", + "+----+----+----+-------+", + ]; + assert_batches_sorted_eq!(expected, &batches); + + Ok(()) +} + + #[test] fn join_with_hash_collision() -> Result<()> { let mut hashmap_left = RawTable::with_capacity(2); @@ -3574,6 +3664,15 @@ mod tests { "| 3 | 7 | 9 | false |", "+----+----+----+-------+", ]; + let expected_right_mark = vec![ + "+----+----+----+-------+", + "| a2 | b2 | c2 | mark |", + "+----+----+----+-------+", + "| 10 | 4 | 70 | true |", + "| 20 | 5 | 80 | true |", + "| 30 | 6 | 90 | false |", + "+----+----+----+-------+", + ]; let test_cases = vec![ (JoinType::Inner, expected_inner), @@ -3585,6 +3684,7 @@ mod tests { (JoinType::RightSemi, expected_right_semi), (JoinType::RightAnti, expected_right_anti), (JoinType::LeftMark, expected_left_mark), + (JoinType::RightMark, expected_right_mark), ]; for (join_type, expected) in test_cases { @@ -3868,6 +3968,7 @@ mod tests { JoinType::RightSemi, JoinType::RightAnti, JoinType::LeftMark, + JoinType::RightMark, ]; for join_type in join_types { diff --git a/datafusion/physical-plan/src/joins/nested_loop_join.rs b/datafusion/physical-plan/src/joins/nested_loop_join.rs index 957230f51372..96f64a5a8b2b 100644 --- a/datafusion/physical-plan/src/joins/nested_loop_join.rs +++ b/datafusion/physical-plan/src/joins/nested_loop_join.rs @@ -1275,6 +1275,36 @@ pub(crate) mod tests { Ok(()) } + #[tokio::test] + async fn join_right_mark_with_filter() -> Result<()> { + let task_ctx = Arc::new(TaskContext::default()); + let left = build_left_table(); + let right = build_right_table(); + + let filter = prepare_join_filter(); + let (columns, batches) = multi_partitioned_join_collect( + left, + right, + &JoinType::RightMark, + Some(filter), + task_ctx, + ) + .await?; + assert_eq!(columns, vec!["a2", "b2", "c2", "mark"]); + let expected = vec![ + "+----+----+-----+-------+", + "| a2 | b2 | c2 | mark |", + "+----+----+-----+-------+", + "| 12 | 10 | 40 | false |", + "| 2 | 2 | 80 | false |", + "+----+----+-----+-------+", + ]; + + assert_batches_sorted_eq!(expected, &batches); + + Ok(()) + } + #[tokio::test] async fn test_overallocation() -> Result<()> { let left = build_table( @@ -1303,6 +1333,7 @@ pub(crate) mod tests { JoinType::LeftMark, JoinType::RightSemi, JoinType::RightAnti, + JoinType::RightMark, ]; for join_type in join_types { diff --git a/datafusion/physical-plan/src/joins/sort_merge_join.rs b/datafusion/physical-plan/src/joins/sort_merge_join.rs index 20fafcc34773..c1e0e43cd52b 100644 --- a/datafusion/physical-plan/src/joins/sort_merge_join.rs +++ b/datafusion/physical-plan/src/joins/sort_merge_join.rs @@ -173,7 +173,7 @@ impl SortMergeJoinExec { // When output schema contains only the right side, probe side is right. // Otherwise probe side is the left side. match join_type { - JoinType::Right | JoinType::RightSemi | JoinType::RightAnti => { + JoinType::Right | JoinType::RightSemi | JoinType::RightAnti | JoinType::RightMark => { JoinSide::Right } JoinType::Inner @@ -193,7 +193,7 @@ impl SortMergeJoinExec { | JoinType::LeftSemi | JoinType::LeftAnti | JoinType::LeftMark => vec![true, false], - JoinType::Right | JoinType::RightSemi | JoinType::RightAnti => { + JoinType::Right | JoinType::RightSemi | JoinType::RightAnti | JoinType::RightMark => { vec![false, true] } _ => vec![false, false], diff --git a/datafusion/physical-plan/src/joins/symmetric_hash_join.rs b/datafusion/physical-plan/src/joins/symmetric_hash_join.rs index 3e0cd48da2bf..a70fc4829def 100644 --- a/datafusion/physical-plan/src/joins/symmetric_hash_join.rs +++ b/datafusion/physical-plan/src/joins/symmetric_hash_join.rs @@ -680,7 +680,7 @@ fn need_to_produce_result_in_final(build_side: JoinSide, join_type: JoinType) -> } else { matches!( join_type, - JoinType::Right | JoinType::RightAnti | JoinType::Full | JoinType::RightSemi + JoinType::Right | JoinType::RightAnti | JoinType::Full | JoinType::RightSemi | JoinType::RightMark ) } } @@ -728,6 +728,22 @@ where .collect(); (build_indices, probe_indices) } + (JoinSide::Right, JoinType::RightMark) => { + let probe_indices = (0..prune_length) + .map(R::Native::from_usize) + .collect::>(); + let build_indices = (0..prune_length) + .map(|idx| { + // For mark join we output a dummy index 0 to indicate the row had a match + if visited_rows.contains(&(idx + deleted_offset)) { + Some(L::Native::from_usize(0).unwrap()) + } else { + None + } + }) + .collect(); + (build_indices, probe_indices) + } // In the case of `Left` or `Right` join, or `Full` join, get the anti indices (JoinSide::Left, JoinType::Left | JoinType::LeftAnti) | (JoinSide::Right, JoinType::Right | JoinType::RightAnti) @@ -893,6 +909,7 @@ pub(crate) fn join_with_probe_batch( | JoinType::LeftSemi | JoinType::LeftMark | JoinType::RightSemi + | JoinType::RightMark ) { Ok(None) } else { @@ -1729,6 +1746,7 @@ mod tests { JoinType::LeftAnti, JoinType::LeftMark, JoinType::RightAnti, + JoinType::RightMark, JoinType::Full )] join_type: JoinType, @@ -1814,6 +1832,7 @@ mod tests { JoinType::LeftAnti, JoinType::LeftMark, JoinType::RightAnti, + JoinType::RightMark, JoinType::Full )] join_type: JoinType, @@ -1879,6 +1898,7 @@ mod tests { JoinType::LeftAnti, JoinType::LeftMark, JoinType::RightAnti, + JoinType::RightMark, JoinType::Full )] join_type: JoinType, @@ -1931,6 +1951,7 @@ mod tests { JoinType::LeftAnti, JoinType::LeftMark, JoinType::RightAnti, + JoinType::RightMark, JoinType::Full )] join_type: JoinType, @@ -1959,6 +1980,7 @@ mod tests { JoinType::LeftAnti, JoinType::LeftMark, JoinType::RightAnti, + JoinType::RightMark, JoinType::Full )] join_type: JoinType, @@ -2325,6 +2347,7 @@ mod tests { JoinType::LeftAnti, JoinType::LeftMark, JoinType::RightAnti, + JoinType::RightMark, JoinType::Full )] join_type: JoinType, @@ -2408,6 +2431,7 @@ mod tests { JoinType::LeftAnti, JoinType::LeftMark, JoinType::RightAnti, + JoinType::RightMark, JoinType::Full )] join_type: JoinType, @@ -2483,6 +2507,7 @@ mod tests { JoinType::LeftAnti, JoinType::LeftMark, JoinType::RightAnti, + JoinType::RightMark, JoinType::Full )] join_type: JoinType, diff --git a/datafusion/physical-plan/src/joins/utils.rs b/datafusion/physical-plan/src/joins/utils.rs index e7c191f9835e..288e47e53d95 100644 --- a/datafusion/physical-plan/src/joins/utils.rs +++ b/datafusion/physical-plan/src/joins/utils.rs @@ -621,6 +621,7 @@ fn output_join_field(old_field: &Field, join_type: &JoinType, is_left: bool) -> JoinType::LeftAnti => false, // doesn't introduce nulls (or can it??) JoinType::RightAnti => false, // doesn't introduce nulls (or can it??) JoinType::LeftMark => false, + JoinType::RightMark => false, }; if force_nullable { @@ -687,6 +688,16 @@ pub fn build_join_schema( left_fields().chain(right_field).unzip() } JoinType::RightSemi | JoinType::RightAnti => right_fields().unzip(), + JoinType::RightMark => { + let left_field = once(( + Field::new("mark", arrow_schema::DataType::Boolean, false), + ColumnIndex { + index: 0, // 'mark' is not associated with either side + side: JoinSide::None, + }, + )); + right_fields().chain(left_field).unzip() + } }; let metadata = left @@ -891,7 +902,7 @@ fn estimate_join_cardinality( }) } - JoinType::LeftMark => { + JoinType::LeftMark | JoinType::RightMark => { let num_rows = *left_stats.num_rows.get_value()?; let mut column_statistics = left_stats.column_statistics; column_statistics.push(ColumnStatistics::new_unknown()); @@ -1335,7 +1346,7 @@ pub(crate) fn adjust_indices_by_join_type( // the left_indices will not be used later for the `right anti` join Ok((left_indices, right_indices)) } - JoinType::LeftSemi | JoinType::LeftAnti | JoinType::LeftMark => { + JoinType::LeftSemi | JoinType::LeftAnti | JoinType::LeftMark | JoinType::RightMark => { // matched or unmatched left row will be produced in the end of loop // When visit the right batch, we can output the matched left row and don't need to wait the end of loop Ok(( @@ -1661,7 +1672,7 @@ pub(crate) fn symmetric_join_output_partitioning( JoinType::Left | JoinType::LeftSemi | JoinType::LeftAnti | JoinType::LeftMark => { left_partitioning.clone() } - JoinType::RightSemi | JoinType::RightAnti => right_partitioning.clone(), + JoinType::RightSemi | JoinType::RightAnti | JoinType::RightMark => right_partitioning.clone(), JoinType::Inner | JoinType::Right => { adjust_right_output_partitioning(right_partitioning, left_columns_len) } @@ -1682,7 +1693,7 @@ pub(crate) fn asymmetric_join_output_partitioning( right.output_partitioning(), left.schema().fields().len(), ), - JoinType::RightSemi | JoinType::RightAnti => right.output_partitioning().clone(), + JoinType::RightSemi | JoinType::RightAnti | JoinType::RightMark => right.output_partitioning().clone(), JoinType::Left | JoinType::LeftSemi | JoinType::LeftAnti diff --git a/datafusion/proto-common/proto/datafusion_common.proto b/datafusion/proto-common/proto/datafusion_common.proto index 65cd33d523cd..94135caec777 100644 --- a/datafusion/proto-common/proto/datafusion_common.proto +++ b/datafusion/proto-common/proto/datafusion_common.proto @@ -85,6 +85,7 @@ enum JoinType { RIGHTSEMI = 6; RIGHTANTI = 7; LEFTMARK = 8; + RIGHTMARK = 9; } enum JoinConstraint { diff --git a/datafusion/proto-common/src/generated/pbjson.rs b/datafusion/proto-common/src/generated/pbjson.rs index e8235ef7b9dd..8eae5747dce5 100644 --- a/datafusion/proto-common/src/generated/pbjson.rs +++ b/datafusion/proto-common/src/generated/pbjson.rs @@ -3842,6 +3842,7 @@ impl serde::Serialize for JoinType { Self::Rightsemi => "RIGHTSEMI", Self::Rightanti => "RIGHTANTI", Self::Leftmark => "LEFTMARK", + Self::Rightmark => "RIGHTMARK", }; serializer.serialize_str(variant) } @@ -3862,6 +3863,7 @@ impl<'de> serde::Deserialize<'de> for JoinType { "RIGHTSEMI", "RIGHTANTI", "LEFTMARK", + "RIGHTMARK", ]; struct GeneratedVisitor; @@ -3911,6 +3913,7 @@ impl<'de> serde::Deserialize<'de> for JoinType { "RIGHTSEMI" => Ok(JoinType::Rightsemi), "RIGHTANTI" => Ok(JoinType::Rightanti), "LEFTMARK" => Ok(JoinType::Leftmark), + "RIGHTMARK" => Ok(JoinTYpe::Rightmark), _ => Err(serde::de::Error::unknown_variant(value, FIELDS)), } } diff --git a/datafusion/proto-common/src/generated/prost.rs b/datafusion/proto-common/src/generated/prost.rs index 68e7f74c7f49..50a2133489ec 100644 --- a/datafusion/proto-common/src/generated/prost.rs +++ b/datafusion/proto-common/src/generated/prost.rs @@ -884,6 +884,7 @@ pub enum JoinType { Rightsemi = 6, Rightanti = 7, Leftmark = 8, + Rightmark = 9, } impl JoinType { /// String value of the enum field names used in the ProtoBuf definition. @@ -901,6 +902,7 @@ impl JoinType { Self::Rightsemi => "RIGHTSEMI", Self::Rightanti => "RIGHTANTI", Self::Leftmark => "LEFTMARK", + Self::Rightmark => "RIGHTMARK", } } /// Creates an enum from field names used in the ProtoBuf definition. @@ -915,6 +917,7 @@ impl JoinType { "RIGHTSEMI" => Some(Self::Rightsemi), "RIGHTANTI" => Some(Self::Rightanti), "LEFTMARK" => Some(Self::Leftmark), + "RIGHTMARK" => Some(Self::Rightmark), _ => None, } } diff --git a/datafusion/proto/src/generated/datafusion_proto_common.rs b/datafusion/proto/src/generated/datafusion_proto_common.rs index 68e7f74c7f49..50a2133489ec 100644 --- a/datafusion/proto/src/generated/datafusion_proto_common.rs +++ b/datafusion/proto/src/generated/datafusion_proto_common.rs @@ -884,6 +884,7 @@ pub enum JoinType { Rightsemi = 6, Rightanti = 7, Leftmark = 8, + Rightmark = 9, } impl JoinType { /// String value of the enum field names used in the ProtoBuf definition. @@ -901,6 +902,7 @@ impl JoinType { Self::Rightsemi => "RIGHTSEMI", Self::Rightanti => "RIGHTANTI", Self::Leftmark => "LEFTMARK", + Self::Rightmark => "RIGHTMARK", } } /// Creates an enum from field names used in the ProtoBuf definition. @@ -915,6 +917,7 @@ impl JoinType { "RIGHTSEMI" => Some(Self::Rightsemi), "RIGHTANTI" => Some(Self::Rightanti), "LEFTMARK" => Some(Self::Leftmark), + "RIGHTMARK" => Some(Self::Rightmark), _ => None, } } diff --git a/datafusion/proto/src/logical_plan/from_proto.rs b/datafusion/proto/src/logical_plan/from_proto.rs index f25fb0bf2561..42fbacd3e66b 100644 --- a/datafusion/proto/src/logical_plan/from_proto.rs +++ b/datafusion/proto/src/logical_plan/from_proto.rs @@ -214,6 +214,7 @@ impl From for JoinType { protobuf::JoinType::Leftanti => JoinType::LeftAnti, protobuf::JoinType::Rightanti => JoinType::RightAnti, protobuf::JoinType::Leftmark => JoinType::LeftMark, + protobuf::JoinType::Rightmark => JoinType::RightMark, } } } diff --git a/datafusion/proto/src/logical_plan/to_proto.rs b/datafusion/proto/src/logical_plan/to_proto.rs index 8af7b19d9091..4e4ac63507d4 100644 --- a/datafusion/proto/src/logical_plan/to_proto.rs +++ b/datafusion/proto/src/logical_plan/to_proto.rs @@ -686,6 +686,7 @@ impl From for protobuf::JoinType { JoinType::LeftAnti => protobuf::JoinType::Leftanti, JoinType::RightAnti => protobuf::JoinType::Rightanti, JoinType::LeftMark => protobuf::JoinType::Leftmark, + JoinType::RightMark => protobuf::JoinType::Rightmark, } } } diff --git a/datafusion/sql/src/unparser/plan.rs b/datafusion/sql/src/unparser/plan.rs index 8167ddacffb4..ee111be19d1e 100644 --- a/datafusion/sql/src/unparser/plan.rs +++ b/datafusion/sql/src/unparser/plan.rs @@ -866,7 +866,7 @@ impl Unparser<'_> { JoinType::LeftSemi => ast::JoinOperator::LeftSemi(constraint), JoinType::RightAnti => ast::JoinOperator::RightAnti(constraint), JoinType::RightSemi => ast::JoinOperator::RightSemi(constraint), - JoinType::LeftMark => unimplemented!("Unparsing of Left Mark join type"), + JoinType::LeftMark | JoinType::RightMark => unimplemented!("Unparsing of Left Mark join type"), }) } diff --git a/datafusion/substrait/src/logical_plan/consumer.rs b/datafusion/substrait/src/logical_plan/consumer.rs index a12406bd3439..5bfdc3ed9921 100644 --- a/datafusion/substrait/src/logical_plan/consumer.rs +++ b/datafusion/substrait/src/logical_plan/consumer.rs @@ -1128,7 +1128,7 @@ fn retrieve_emit_kind(rel_common: Option<&RelCommon>) -> EmitKind { fn contains_volatile_expr(proj: &Projection) -> Result { for expr in proj.expr.iter() { - if expr.is_volatile()? { + if expr.is_volatile() { return Ok(true); } } @@ -1375,6 +1375,7 @@ fn from_substrait_jointype(join_type: i32) -> Result { join_rel::JoinType::LeftAnti => Ok(JoinType::LeftAnti), join_rel::JoinType::LeftSemi => Ok(JoinType::LeftSemi), join_rel::JoinType::LeftMark => Ok(JoinType::LeftMark), + join_rel::JoinType::RightMark => Ok(JoinType::RightMark), _ => plan_err!("unsupported join type {substrait_join_type:?}"), } } else { diff --git a/datafusion/substrait/src/logical_plan/producer.rs b/datafusion/substrait/src/logical_plan/producer.rs index c73029f130ad..520a05f23f41 100644 --- a/datafusion/substrait/src/logical_plan/producer.rs +++ b/datafusion/substrait/src/logical_plan/producer.rs @@ -726,7 +726,7 @@ fn to_substrait_jointype(join_type: JoinType) -> join_rel::JoinType { JoinType::LeftAnti => join_rel::JoinType::LeftAnti, JoinType::LeftSemi => join_rel::JoinType::LeftSemi, JoinType::LeftMark => join_rel::JoinType::LeftMark, - JoinType::RightAnti | JoinType::RightSemi => { + JoinType::RightAnti | JoinType::RightSemi | JoinType::RightMark => { unimplemented!() } }