Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: merge into panic #16581

Merged
merged 5 commits into from
Oct 11, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -370,8 +370,10 @@ impl PhysicalPlanBuilder {

let mut field_index_of_input_schema = HashMap::<FieldIndex, usize>::new();
for (field_index, value) in field_index_map {
field_index_of_input_schema
.insert(*field_index, output_schema.index_of(value).unwrap());
// Safe to set field index, to fix issue #16588.
if let Ok(value) = output_schema.index_of(value) {
field_index_of_input_schema.insert(*field_index, value);
}
}

plan = PhysicalPlan::MutationManipulate(Box::new(MutationManipulate {
Expand Down
39 changes: 37 additions & 2 deletions src/query/sql/src/planner/optimizer/optimizer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -49,8 +49,11 @@ use crate::optimizer::DEFAULT_REWRITE_RULES;
use crate::plans::CopyIntoLocationPlan;
use crate::plans::Join;
use crate::plans::JoinType;
use crate::plans::MatchedEvaluator;
use crate::plans::Mutation;
use crate::plans::Operator;
use crate::plans::Plan;
use crate::plans::RelOp;
use crate::plans::RelOperator;
use crate::plans::SetScalarsOrQuery;
use crate::InsertInputSource;
Expand Down Expand Up @@ -504,9 +507,41 @@ async fn optimize_mutation(mut opt_ctx: OptimizerContext, s_expr: SExpr) -> Resu
let mut mutation: Mutation = s_expr.plan().clone().try_into()?;
mutation.distributed = opt_ctx.enable_distributed_optimization;

let schema = mutation.schema();
// To fix issue #16588, if target table is rewritten as an empty scan, that means
// the condition is false and the match branch can never be executed.
// Therefore, the match evaluators can be reset.
let inner_rel_op = input_s_expr.plan.rel_op();
if !mutation.matched_evaluators.is_empty() {
match inner_rel_op {
RelOp::ConstantTableScan => {
mutation.matched_evaluators = vec![MatchedEvaluator {
condition: None,
update: None,
}];
mutation.can_try_update_column_only = false;
}
RelOp::Join => {
let right_child = input_s_expr.child(1)?;
let mut right_child_rel = right_child.plan.rel_op();
if right_child_rel == RelOp::Exchange {
right_child_rel = right_child.child(0)?.plan.rel_op();
}
if right_child_rel == RelOp::ConstantTableScan {
mutation.matched_evaluators = vec![MatchedEvaluator {
condition: None,
update: None,
}];
mutation.can_try_update_column_only = false;
}
}
_ => (),
}
}

input_s_expr = match mutation.mutation_type {
MutationType::Merge => {
if mutation.distributed {
if mutation.distributed && inner_rel_op == RelOp::Join {
let join = Join::try_from(input_s_expr.plan().clone())?;
let broadcast_to_shuffle = BroadcastToShuffleOptimizer::create();
let is_broadcast = broadcast_to_shuffle.matcher.matches(&input_s_expr)
Expand Down Expand Up @@ -537,7 +572,7 @@ async fn optimize_mutation(mut opt_ctx: OptimizerContext, s_expr: SExpr) -> Resu
};

Ok(Plan::DataMutation {
schema: mutation.schema(),
schema,
s_expr: Box::new(SExpr::create_unary(
Arc::new(RelOperator::Mutation(mutation)),
Arc::new(input_s_expr),
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
statement ok
create or replace database i16588;

statement ok
use i16588;

statement ok
create table base(a int);

statement ok
create table sink(a int);

query II
merge into sink using base on 1 != 1 when matched then update * when not matched then insert *;
Dousir9 marked this conversation as resolved.
Show resolved Hide resolved
----
0 0

query I
merge into sink using base on 1 != 1 when matched then update *;
----
0

statement ok
drop database i16588;
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,61 @@ select * from t2 order by a;
----
8

## issue 16588
query T
explain merge into t1 using (select * from t2) as t on 1 <> 1 when matched then update * when not matched then insert *
----
CommitSink
└── Exchange
├── output columns: []
├── exchange type: Merge
└── DataMutation
├── target table: [catalog: default] [database: default] [table: t1]
├── matched delete: [condition: None]
├── unmatched insert: [condition: None, insert into (a) values(a (#0))]
└── Exchange
├── output columns: [t2.a (#0), t1._row_id (#2)]
├── exchange type: Hash(bit_and(bit_shift_right(t1._row_id (#2), CAST(31 AS UInt64 NULL)), CAST(2047 AS UInt64 NULL)))
└── HashJoin
├── output columns: [t2.a (#0), t1._row_id (#2)]
├── join type: LEFT OUTER
├── build keys: []
├── probe keys: []
├── filters: []
├── estimated rows: 1.00
├── Exchange(Build)
│ ├── output columns: [t1._row_id (#2)]
│ ├── exchange type: Hash()
│ └── EmptyResultScan
└── Exchange(Probe)
├── output columns: [t2.a (#0)]
├── exchange type: Hash()
└── TableScan
├── table: default.default.t2
├── output columns: [a (#0)]
├── read rows: 1
├── read size: < 1 KiB
├── partitions total: 1
├── partitions scanned: 1
├── pruning stats: [segments: <range pruning: 1 to 1>, blocks: <range pruning: 1 to 1>]
├── push downs: [filters: [], limit: NONE]
└── estimated rows: 1.00

query T
explain merge into t1 using (select * from t2) as t on 1 <> 1 when matched then update *
----
CommitSink
└── Exchange
├── output columns: []
├── exchange type: Merge
└── DataMutation
├── target table: [catalog: default] [database: default] [table: t1]
├── matched delete: [condition: None]
└── Exchange
├── output columns: [t2.a (#0), t1._row_id (#2)]
├── exchange type: Hash(bit_and(bit_shift_right(t1._row_id (#2), 31), 2047))
└── EmptyResultScan

## check there is add row_number.
query T
explain merge into t1 using (select * from t2) as t on t1.a > t.a when matched then update * when not matched then insert *;
Expand Down
37 changes: 37 additions & 0 deletions tests/sqllogictests/suites/mode/standalone/explain/merge_into.test
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,43 @@ MERGE INTO salaries2 USING (SELECT * FROM employees2) as employees2 ON salaries2
----
2 2

## issue 16588
query T
explain merge into salaries2 using employees2 on 1 != 1 when matched AND employees2.department = 'HR' THEN UPDATE SET salaries2.salary = salaries2.salary + 1000.00 WHEN MATCHED THEN UPDATE SET salaries2.salary = salaries2.salary + 500.00 WHEN NOT MATCHED THEN INSERT (employee_id, salary) VALUES (employees2.employee_id, 55000.00)
----
CommitSink
└── DataMutation
├── target table: [catalog: default] [database: default] [table: salaries2]
├── matched delete: [condition: None]
├── unmatched insert: [condition: None, insert into (employee_id,salary) values(employees2.employee_id (#0),55000.00)]
└── HashJoin
├── output columns: [employees2.employee_id (#0), employees2.employee_name (#1), employees2.department (#2), salaries2._row_id (#5)]
├── join type: LEFT OUTER
├── build keys: []
├── probe keys: []
├── filters: []
├── estimated rows: 4.00
├── EmptyResultScan(Build)
└── TableScan(Probe)
├── table: default.default.employees2
├── output columns: [employee_id (#0), employee_name (#1), department (#2)]
├── read rows: 4
├── read size: < 1 KiB
├── partitions total: 1
├── partitions scanned: 1
├── pruning stats: [segments: <range pruning: 1 to 1>, blocks: <range pruning: 1 to 1>]
├── push downs: [filters: [], limit: NONE]
└── estimated rows: 4.00

query T
explain merge into salaries2 using employees2 on 1 != 1 when matched AND employees2.department = 'HR' THEN UPDATE SET salaries2.salary = salaries2.salary + 1000.00 WHEN MATCHED THEN UPDATE SET salaries2.salary = salaries2.salary + 500.00
----
CommitSink
└── DataMutation
├── target table: [catalog: default] [database: default] [table: salaries2]
├── matched delete: [condition: None]
└── EmptyResultScan

query T
explain MERGE INTO salaries2 USING (SELECT * FROM employees2) as employees2 ON salaries2.employee_id = employees2.employee_id WHEN MATCHED AND employees2.department = 'HR' THEN UPDATE SET salaries2.salary = salaries2.salary + 1000.00 WHEN MATCHED THEN UPDATE SET salaries2.salary = salaries2.salary + 500.00 WHEN NOT MATCHED THEN INSERT (employee_id, salary) VALUES (employees2.employee_id, 55000.00);
----
Expand Down
Loading