From d3ea7791bb688e61d2540579200c8bdbe518fb99 Mon Sep 17 00:00:00 2001 From: zhyass Date: Thu, 10 Oct 2024 00:25:54 +0800 Subject: [PATCH 1/5] fix merge into panic --- .../sql/src/executor/physical_plans/physical_mutation.rs | 5 +++-- src/query/sql/src/planner/optimizer/optimizer.rs | 8 ++++++++ 2 files changed, 11 insertions(+), 2 deletions(-) diff --git a/src/query/sql/src/executor/physical_plans/physical_mutation.rs b/src/query/sql/src/executor/physical_plans/physical_mutation.rs index 36342adf6329..ed3c3c3c3fdd 100644 --- a/src/query/sql/src/executor/physical_plans/physical_mutation.rs +++ b/src/query/sql/src/executor/physical_plans/physical_mutation.rs @@ -370,8 +370,9 @@ impl PhysicalPlanBuilder { let mut field_index_of_input_schema = HashMap::::new(); for (field_index, value) in field_index_map { - field_index_of_input_schema - .insert(*field_index, output_schema.index_of(value).unwrap()); + if let Ok(value) = output_schema.index_of(value) { + field_index_of_input_schema.insert(*field_index, value); + } } plan = PhysicalPlan::MutationManipulate(Box::new(MutationManipulate { diff --git a/src/query/sql/src/planner/optimizer/optimizer.rs b/src/query/sql/src/planner/optimizer/optimizer.rs index 2e393959b945..0d4cca904fda 100644 --- a/src/query/sql/src/planner/optimizer/optimizer.rs +++ b/src/query/sql/src/planner/optimizer/optimizer.rs @@ -50,7 +50,9 @@ use crate::plans::CopyIntoLocationPlan; use crate::plans::Join; use crate::plans::JoinType; use crate::plans::Mutation; +use crate::plans::Operator; use crate::plans::Plan; +use crate::plans::RelOp; use crate::plans::RelOperator; use crate::plans::SetScalarsOrQuery; use crate::InsertInputSource; @@ -503,6 +505,12 @@ async fn optimize_mutation(mut opt_ctx: OptimizerContext, s_expr: SExpr) -> Resu let mut mutation: Mutation = s_expr.plan().clone().try_into()?; mutation.distributed = opt_ctx.enable_distributed_optimization; + if input_s_expr.plan.rel_op() == RelOp::Join { + let right_child = input_s_expr.child(1)?; + if right_child.plan.rel_op() == RelOp::ConstantTableScan { + mutation.matched_evaluators.clear(); + } + } input_s_expr = match mutation.mutation_type { MutationType::Merge => { From ecf10f5e3fbfde8b56730bdac7cf1cd68b04d0a4 Mon Sep 17 00:00:00 2001 From: zhyass Date: Thu, 10 Oct 2024 01:47:51 +0800 Subject: [PATCH 2/5] add test --- .../physical_plans/physical_mutation.rs | 1 + .../sql/src/planner/optimizer/optimizer.rs | 18 +++++++++++--- .../09_0042_merge_into_issue_16581.test | 24 +++++++++++++++++++ 3 files changed, 40 insertions(+), 3 deletions(-) create mode 100644 tests/sqllogictests/suites/base/09_fuse_engine/09_0042_merge_into_issue_16581.test diff --git a/src/query/sql/src/executor/physical_plans/physical_mutation.rs b/src/query/sql/src/executor/physical_plans/physical_mutation.rs index ed3c3c3c3fdd..6a18464fa899 100644 --- a/src/query/sql/src/executor/physical_plans/physical_mutation.rs +++ b/src/query/sql/src/executor/physical_plans/physical_mutation.rs @@ -370,6 +370,7 @@ impl PhysicalPlanBuilder { let mut field_index_of_input_schema = HashMap::::new(); for (field_index, value) in field_index_map { + // Safe to set field index, to fix issue #16581. if let Ok(value) = output_schema.index_of(value) { field_index_of_input_schema.insert(*field_index, value); } diff --git a/src/query/sql/src/planner/optimizer/optimizer.rs b/src/query/sql/src/planner/optimizer/optimizer.rs index 0d4cca904fda..d8d007d000f4 100644 --- a/src/query/sql/src/planner/optimizer/optimizer.rs +++ b/src/query/sql/src/planner/optimizer/optimizer.rs @@ -505,11 +505,23 @@ async fn optimize_mutation(mut opt_ctx: OptimizerContext, s_expr: SExpr) -> Resu let mut mutation: Mutation = s_expr.plan().clone().try_into()?; mutation.distributed = opt_ctx.enable_distributed_optimization; - if input_s_expr.plan.rel_op() == RelOp::Join { - let right_child = input_s_expr.child(1)?; - if right_child.plan.rel_op() == RelOp::ConstantTableScan { + + // To fix issue #16581, if target table is rewritten as an empty scan, that means + // the condition is false and the match branch can never be executed. + // Therefore, the match evaluators can be cleared. + match input_s_expr.plan.rel_op() { + RelOp::ConstantTableScan => { mutation.matched_evaluators.clear(); + mutation.strategy = MutationStrategy::NotMatchedOnly; + } + RelOp::Join => { + let right_child = input_s_expr.child(1)?; + if right_child.plan.rel_op() == RelOp::ConstantTableScan { + mutation.matched_evaluators.clear(); + mutation.strategy = MutationStrategy::NotMatchedOnly; + } } + _ => (), } input_s_expr = match mutation.mutation_type { diff --git a/tests/sqllogictests/suites/base/09_fuse_engine/09_0042_merge_into_issue_16581.test b/tests/sqllogictests/suites/base/09_fuse_engine/09_0042_merge_into_issue_16581.test new file mode 100644 index 000000000000..4711d650b3a4 --- /dev/null +++ b/tests/sqllogictests/suites/base/09_fuse_engine/09_0042_merge_into_issue_16581.test @@ -0,0 +1,24 @@ +statement ok +create or replace database i16581; + +statement ok +use i16581; + +statement ok +create table base(a int); + +statement ok +create table sink(a int); + +query I +merge into sink using base on 1 != 1 when matched then update * when not matched then insert *; +---- +0 + +query I +merge into sink using base on 1 != 1 when matched then update *; +---- +0 + +statement ok +drop database i16581; From 1360aedd4a7c0d03a900909fce0359dce3610a6d Mon Sep 17 00:00:00 2001 From: zhyass Date: Thu, 10 Oct 2024 14:10:03 +0800 Subject: [PATCH 3/5] fix --- .../physical_plans/physical_mutation.rs | 2 +- .../sql/src/planner/optimizer/optimizer.rs | 38 ++++++++++++------- ...st => 09_0042_merge_into_issue_16588.test} | 10 ++--- 3 files changed, 30 insertions(+), 20 deletions(-) rename tests/sqllogictests/suites/base/09_fuse_engine/{09_0042_merge_into_issue_16581.test => 09_0042_merge_into_issue_16588.test} (78%) diff --git a/src/query/sql/src/executor/physical_plans/physical_mutation.rs b/src/query/sql/src/executor/physical_plans/physical_mutation.rs index 6a18464fa899..df8573418c55 100644 --- a/src/query/sql/src/executor/physical_plans/physical_mutation.rs +++ b/src/query/sql/src/executor/physical_plans/physical_mutation.rs @@ -370,7 +370,7 @@ impl PhysicalPlanBuilder { let mut field_index_of_input_schema = HashMap::::new(); for (field_index, value) in field_index_map { - // Safe to set field index, to fix issue #16581. + // Safe to set field index, to fix issue #16588. if let Ok(value) = output_schema.index_of(value) { field_index_of_input_schema.insert(*field_index, value); } diff --git a/src/query/sql/src/planner/optimizer/optimizer.rs b/src/query/sql/src/planner/optimizer/optimizer.rs index d8d007d000f4..cbd430038ee9 100644 --- a/src/query/sql/src/planner/optimizer/optimizer.rs +++ b/src/query/sql/src/planner/optimizer/optimizer.rs @@ -49,6 +49,7 @@ use crate::optimizer::DEFAULT_REWRITE_RULES; use crate::plans::CopyIntoLocationPlan; use crate::plans::Join; use crate::plans::JoinType; +use crate::plans::MatchedEvaluator; use crate::plans::Mutation; use crate::plans::Operator; use crate::plans::Plan; @@ -506,22 +507,31 @@ async fn optimize_mutation(mut opt_ctx: OptimizerContext, s_expr: SExpr) -> Resu let mut mutation: Mutation = s_expr.plan().clone().try_into()?; mutation.distributed = opt_ctx.enable_distributed_optimization; - // To fix issue #16581, if target table is rewritten as an empty scan, that means + let schema = mutation.schema(); + // To fix issue #16588, if target table is rewritten as an empty scan, that means // the condition is false and the match branch can never be executed. - // Therefore, the match evaluators can be cleared. - match input_s_expr.plan.rel_op() { - RelOp::ConstantTableScan => { - mutation.matched_evaluators.clear(); - mutation.strategy = MutationStrategy::NotMatchedOnly; - } - RelOp::Join => { - let right_child = input_s_expr.child(1)?; - if right_child.plan.rel_op() == RelOp::ConstantTableScan { - mutation.matched_evaluators.clear(); - mutation.strategy = MutationStrategy::NotMatchedOnly; + // Therefore, the match evaluators can be reset. + if !mutation.matched_evaluators.is_empty() { + match input_s_expr.plan.rel_op() { + RelOp::ConstantTableScan => { + mutation.matched_evaluators = vec![MatchedEvaluator { + condition: None, + update: None, + }]; + mutation.can_try_update_column_only = false; + } + RelOp::Join => { + let right_child = input_s_expr.child(1)?; + if right_child.plan.rel_op() == RelOp::ConstantTableScan { + mutation.matched_evaluators = vec![MatchedEvaluator { + condition: None, + update: None, + }]; + mutation.can_try_update_column_only = false; + } } + _ => (), } - _ => (), } input_s_expr = match mutation.mutation_type { @@ -557,7 +567,7 @@ async fn optimize_mutation(mut opt_ctx: OptimizerContext, s_expr: SExpr) -> Resu }; Ok(Plan::DataMutation { - schema: mutation.schema(), + schema, s_expr: Box::new(SExpr::create_unary( Arc::new(RelOperator::Mutation(mutation)), Arc::new(input_s_expr), diff --git a/tests/sqllogictests/suites/base/09_fuse_engine/09_0042_merge_into_issue_16581.test b/tests/sqllogictests/suites/base/09_fuse_engine/09_0042_merge_into_issue_16588.test similarity index 78% rename from tests/sqllogictests/suites/base/09_fuse_engine/09_0042_merge_into_issue_16581.test rename to tests/sqllogictests/suites/base/09_fuse_engine/09_0042_merge_into_issue_16588.test index 4711d650b3a4..b651fc3cb3c9 100644 --- a/tests/sqllogictests/suites/base/09_fuse_engine/09_0042_merge_into_issue_16581.test +++ b/tests/sqllogictests/suites/base/09_fuse_engine/09_0042_merge_into_issue_16588.test @@ -1,8 +1,8 @@ statement ok -create or replace database i16581; +create or replace database i16588; statement ok -use i16581; +use i16588; statement ok create table base(a int); @@ -10,10 +10,10 @@ create table base(a int); statement ok create table sink(a int); -query I +query II merge into sink using base on 1 != 1 when matched then update * when not matched then insert *; ---- -0 +0 0 query I merge into sink using base on 1 != 1 when matched then update *; @@ -21,4 +21,4 @@ merge into sink using base on 1 != 1 when matched then update *; 0 statement ok -drop database i16581; +drop database i16588; From 957ef1ab16d832647d996695410e58a78fca30da Mon Sep 17 00:00:00 2001 From: zhyass Date: Thu, 10 Oct 2024 16:29:25 +0800 Subject: [PATCH 4/5] fix --- src/query/sql/src/planner/optimizer/optimizer.rs | 11 ++++++++--- 1 file changed, 8 insertions(+), 3 deletions(-) diff --git a/src/query/sql/src/planner/optimizer/optimizer.rs b/src/query/sql/src/planner/optimizer/optimizer.rs index cbd430038ee9..674d01b1da61 100644 --- a/src/query/sql/src/planner/optimizer/optimizer.rs +++ b/src/query/sql/src/planner/optimizer/optimizer.rs @@ -511,8 +511,9 @@ async fn optimize_mutation(mut opt_ctx: OptimizerContext, s_expr: SExpr) -> Resu // To fix issue #16588, if target table is rewritten as an empty scan, that means // the condition is false and the match branch can never be executed. // Therefore, the match evaluators can be reset. + let inner_rel_op = input_s_expr.plan.rel_op(); if !mutation.matched_evaluators.is_empty() { - match input_s_expr.plan.rel_op() { + match inner_rel_op { RelOp::ConstantTableScan => { mutation.matched_evaluators = vec![MatchedEvaluator { condition: None, @@ -522,7 +523,11 @@ async fn optimize_mutation(mut opt_ctx: OptimizerContext, s_expr: SExpr) -> Resu } RelOp::Join => { let right_child = input_s_expr.child(1)?; - if right_child.plan.rel_op() == RelOp::ConstantTableScan { + let mut right_child_rel = right_child.plan.rel_op(); + if right_child_rel == RelOp::Exchange { + right_child_rel = right_child.child(0)?.plan.rel_op(); + } + if right_child_rel == RelOp::ConstantTableScan { mutation.matched_evaluators = vec![MatchedEvaluator { condition: None, update: None, @@ -536,7 +541,7 @@ async fn optimize_mutation(mut opt_ctx: OptimizerContext, s_expr: SExpr) -> Resu input_s_expr = match mutation.mutation_type { MutationType::Merge => { - if mutation.distributed { + if mutation.distributed && inner_rel_op == RelOp::Join { let join = Join::try_from(input_s_expr.plan().clone())?; let broadcast_to_shuffle = BroadcastToShuffleOptimizer::create(); let is_broadcast = broadcast_to_shuffle.matcher.matches(&input_s_expr) From 4b4c3c05dcd170c86b8ae4b96a837dca169c3cf0 Mon Sep 17 00:00:00 2001 From: zhyass Date: Thu, 10 Oct 2024 23:01:26 +0800 Subject: [PATCH 5/5] add test --- .../merge_into_non_equal_distributed.test | 55 +++++++++++++++++++ .../mode/standalone/explain/merge_into.test | 37 +++++++++++++ 2 files changed, 92 insertions(+) diff --git a/tests/sqllogictests/suites/mode/cluster/merge_into_non_equal_distributed.test b/tests/sqllogictests/suites/mode/cluster/merge_into_non_equal_distributed.test index 27285e1dc406..ac255816b130 100644 --- a/tests/sqllogictests/suites/mode/cluster/merge_into_non_equal_distributed.test +++ b/tests/sqllogictests/suites/mode/cluster/merge_into_non_equal_distributed.test @@ -52,6 +52,61 @@ select * from t2 order by a; ---- 8 +## issue 16588 +query T +explain merge into t1 using (select * from t2) as t on 1 <> 1 when matched then update * when not matched then insert * +---- +CommitSink +└── Exchange + ├── output columns: [] + ├── exchange type: Merge + └── DataMutation + ├── target table: [catalog: default] [database: default] [table: t1] + ├── matched delete: [condition: None] + ├── unmatched insert: [condition: None, insert into (a) values(a (#0))] + └── Exchange + ├── output columns: [t2.a (#0), t1._row_id (#2)] + ├── exchange type: Hash(bit_and(bit_shift_right(t1._row_id (#2), CAST(31 AS UInt64 NULL)), CAST(2047 AS UInt64 NULL))) + └── HashJoin + ├── output columns: [t2.a (#0), t1._row_id (#2)] + ├── join type: LEFT OUTER + ├── build keys: [] + ├── probe keys: [] + ├── filters: [] + ├── estimated rows: 1.00 + ├── Exchange(Build) + │ ├── output columns: [t1._row_id (#2)] + │ ├── exchange type: Hash() + │ └── EmptyResultScan + └── Exchange(Probe) + ├── output columns: [t2.a (#0)] + ├── exchange type: Hash() + └── TableScan + ├── table: default.default.t2 + ├── output columns: [a (#0)] + ├── read rows: 1 + ├── read size: < 1 KiB + ├── partitions total: 1 + ├── partitions scanned: 1 + ├── pruning stats: [segments: , blocks: ] + ├── push downs: [filters: [], limit: NONE] + └── estimated rows: 1.00 + +query T +explain merge into t1 using (select * from t2) as t on 1 <> 1 when matched then update * +---- +CommitSink +└── Exchange + ├── output columns: [] + ├── exchange type: Merge + └── DataMutation + ├── target table: [catalog: default] [database: default] [table: t1] + ├── matched delete: [condition: None] + └── Exchange + ├── output columns: [t2.a (#0), t1._row_id (#2)] + ├── exchange type: Hash(bit_and(bit_shift_right(t1._row_id (#2), 31), 2047)) + └── EmptyResultScan + ## check there is add row_number. query T explain merge into t1 using (select * from t2) as t on t1.a > t.a when matched then update * when not matched then insert *; diff --git a/tests/sqllogictests/suites/mode/standalone/explain/merge_into.test b/tests/sqllogictests/suites/mode/standalone/explain/merge_into.test index 3d3949cd54e5..502b6ebc8c6e 100644 --- a/tests/sqllogictests/suites/mode/standalone/explain/merge_into.test +++ b/tests/sqllogictests/suites/mode/standalone/explain/merge_into.test @@ -24,6 +24,43 @@ MERGE INTO salaries2 USING (SELECT * FROM employees2) as employees2 ON salaries2 ---- 2 2 +## issue 16588 +query T +explain merge into salaries2 using employees2 on 1 != 1 when matched AND employees2.department = 'HR' THEN UPDATE SET salaries2.salary = salaries2.salary + 1000.00 WHEN MATCHED THEN UPDATE SET salaries2.salary = salaries2.salary + 500.00 WHEN NOT MATCHED THEN INSERT (employee_id, salary) VALUES (employees2.employee_id, 55000.00) +---- +CommitSink +└── DataMutation + ├── target table: [catalog: default] [database: default] [table: salaries2] + ├── matched delete: [condition: None] + ├── unmatched insert: [condition: None, insert into (employee_id,salary) values(employees2.employee_id (#0),55000.00)] + └── HashJoin + ├── output columns: [employees2.employee_id (#0), employees2.employee_name (#1), employees2.department (#2), salaries2._row_id (#5)] + ├── join type: LEFT OUTER + ├── build keys: [] + ├── probe keys: [] + ├── filters: [] + ├── estimated rows: 4.00 + ├── EmptyResultScan(Build) + └── TableScan(Probe) + ├── table: default.default.employees2 + ├── output columns: [employee_id (#0), employee_name (#1), department (#2)] + ├── read rows: 4 + ├── read size: < 1 KiB + ├── partitions total: 1 + ├── partitions scanned: 1 + ├── pruning stats: [segments: , blocks: ] + ├── push downs: [filters: [], limit: NONE] + └── estimated rows: 4.00 + +query T +explain merge into salaries2 using employees2 on 1 != 1 when matched AND employees2.department = 'HR' THEN UPDATE SET salaries2.salary = salaries2.salary + 1000.00 WHEN MATCHED THEN UPDATE SET salaries2.salary = salaries2.salary + 500.00 +---- +CommitSink +└── DataMutation + ├── target table: [catalog: default] [database: default] [table: salaries2] + ├── matched delete: [condition: None] + └── EmptyResultScan + query T explain MERGE INTO salaries2 USING (SELECT * FROM employees2) as employees2 ON salaries2.employee_id = employees2.employee_id WHEN MATCHED AND employees2.department = 'HR' THEN UPDATE SET salaries2.salary = salaries2.salary + 1000.00 WHEN MATCHED THEN UPDATE SET salaries2.salary = salaries2.salary + 500.00 WHEN NOT MATCHED THEN INSERT (employee_id, salary) VALUES (employees2.employee_id, 55000.00); ----