Skip to content

Commit

Permalink
Merge branch 'main' into feature_fix
Browse files Browse the repository at this point in the history
  • Loading branch information
zhyass authored Sep 24, 2024
2 parents 8037679 + 8e79625 commit 85d834b
Show file tree
Hide file tree
Showing 24 changed files with 117 additions and 138 deletions.
4 changes: 2 additions & 2 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

47 changes: 8 additions & 39 deletions src/meta/api/src/schema_api_impl.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3613,18 +3613,17 @@ fn build_upsert_table_deduplicated_label(deduplicated_label: String) -> TxnOp {
#[fastrace::trace]
async fn batch_filter_table_info(
kv_api: &(impl kvapi::KVApi<Error = MetaError> + ?Sized),
filter_db_info_with_table_name_list: &[(&TableInfoFilter, &Arc<DatabaseInfo>, u64, &String)],
args: &[(&TableInfoFilter, &Arc<DatabaseInfo>, u64, String)],
filter_tb_infos: &mut Vec<(Arc<TableInfo>, u64)>,
) -> Result<(), KVAppError> {
let table_id_idents = filter_db_info_with_table_name_list
let table_id_idents = args
.iter()
.map(|(_f, _db, table_id, _table_name)| TableId::new(*table_id));

let seq_metas = kv_api.get_pb_values_vec(table_id_idents).await?;

for (seq_meta, (filter, db_info, table_id, table_name)) in seq_metas
.into_iter()
.zip(filter_db_info_with_table_name_list.iter())
for (seq_meta, (filter, db_info, table_id, table_name)) in
seq_metas.into_iter().zip(args.iter())
{
let Some(seq_meta) = seq_meta else {
error!(
Expand Down Expand Up @@ -3671,42 +3670,12 @@ async fn get_gc_table_info(
limit: usize,
table_id_list: &TableFilterInfoList<'_>,
) -> Result<Vec<(Arc<TableInfo>, u64)>, KVAppError> {
let mut filter_tb_infos = vec![];

let mut filter_db_info_with_table_name_list: Vec<(
&TableInfoFilter,
&Arc<DatabaseInfo>,
u64,
&String,
)> = vec![];

for (filter, db_info, table_id, table_name) in table_id_list {
filter_db_info_with_table_name_list.push((filter, db_info, *table_id, table_name));
if filter_db_info_with_table_name_list.len() < DEFAULT_MGET_SIZE {
continue;
}
let table_id_list = &table_id_list[..std::cmp::min(limit, table_id_list.len())];

batch_filter_table_info(
kv_api,
&filter_db_info_with_table_name_list,
&mut filter_tb_infos,
)
.await?;

filter_db_info_with_table_name_list.clear();

if filter_tb_infos.len() >= limit {
return Ok(filter_tb_infos);
}
}
let mut filter_tb_infos = vec![];

if !filter_db_info_with_table_name_list.is_empty() {
batch_filter_table_info(
kv_api,
&filter_db_info_with_table_name_list,
&mut filter_tb_infos,
)
.await?;
for chunk in table_id_list.chunks(DEFAULT_MGET_SIZE) {
batch_filter_table_info(kv_api, chunk, &mut filter_tb_infos).await?;
}

Ok(filter_tb_infos)
Expand Down
2 changes: 1 addition & 1 deletion src/query/expression/src/utils/display.rs
Original file line number Diff line number Diff line change
Expand Up @@ -755,7 +755,7 @@ impl<Index: ColumnIndex> Expr<Index> {
precedence: usize,
min_precedence: usize,
) -> String {
if precedence < min_precedence {
if precedence < min_precedence || matches!(op, "AND" | "OR") {
format!(
"({} {op} {})",
write_expr(lhs, precedence),
Expand Down
2 changes: 1 addition & 1 deletion src/query/functions/tests/it/scalars/testdata/boolean.txt
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ error:
--> SQL:1:1
|
1 | 'a' and 1
| ^^^ cannot parse to type `BOOLEAN` while evaluating function `to_boolean('a')` in expr `to_boolean('a')`, during run expr: `to_boolean('a') AND to_boolean(1)`
| ^^^ cannot parse to type `BOOLEAN` while evaluating function `to_boolean('a')` in expr `to_boolean('a')`, during run expr: `(to_boolean('a') AND to_boolean(1))`



Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,8 +35,19 @@ impl HashJoinProbeState {
if build_num_rows == 0 || input_num_rows == 0 {
return Ok(vec![]);
}
let probe_block = input.project(&self.probe_projections);
let mut probe_block = input.project(&self.probe_projections);
let build_block = DataBlock::concat(build_blocks)?;
if build_num_rows == 1 {
for col in build_block.columns() {
let value_ref = col.value.as_ref();
let scalar = unsafe { value_ref.index_unchecked(0) };
probe_block.add_column(BlockEntry::new(
col.data_type.clone(),
Value::Scalar(scalar.to_owned()),
));
}
return Ok(vec![probe_block]);
}
let mut result_blocks = Vec::with_capacity(input_num_rows);
for i in 0..input_num_rows {
result_blocks.push(self.merge_with_constant_block(
Expand Down
10 changes: 5 additions & 5 deletions tests/sqllogictests/suites/mode/cluster/explain_v2.test
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ Exchange
├── exchange type: Merge
└── Filter
├── output columns: [t1.a (#0), t1.b (#1), t2.b (#3), t2.a (#2)]
├── filters: [t1.a (#0) > 3 OR t2.a (#2) > 5 AND t1.a (#0) > 1]
├── filters: [(t1.a (#0) > 3 OR (t2.a (#2) > 5 AND t1.a (#0) > 1))]
├── estimated rows: 99.60
└── HashJoin
├── output columns: [t1.a (#0), t1.b (#1), t2.b (#3), t2.a (#2)]
Expand All @@ -56,7 +56,7 @@ Exchange
│ ├── exchange type: Broadcast
│ └── Filter
│ ├── output columns: [t2.a (#2), t2.b (#3)]
│ ├── filters: [t2.a (#2) > 3 OR t2.a (#2) > 1]
│ ├── filters: [(t2.a (#2) > 3 OR t2.a (#2) > 1)]
│ ├── estimated rows: 99.92
│ └── TableScan
│ ├── table: default.default.t2
Expand All @@ -66,11 +66,11 @@ Exchange
│ ├── partitions total: 3
│ ├── partitions scanned: 3
│ ├── pruning stats: [segments: <range pruning: 1 to 1>, blocks: <range pruning: 3 to 3>]
│ ├── push downs: [filters: [t2.a (#2) > 3 OR t2.a (#2) > 1], limit: NONE]
│ ├── push downs: [filters: [(t2.a (#2) > 3 OR t2.a (#2) > 1)], limit: NONE]
│ └── estimated rows: 100.00
└── Filter(Probe)
├── output columns: [t1.a (#0), t1.b (#1)]
├── filters: [t1.a (#0) > 3 OR t1.a (#0) > 1]
├── filters: [(t1.a (#0) > 3 OR t1.a (#0) > 1)]
├── estimated rows: 99.92
└── TableScan
├── table: default.default.t1
Expand All @@ -80,7 +80,7 @@ Exchange
├── partitions total: 3
├── partitions scanned: 3
├── pruning stats: [segments: <range pruning: 1 to 1>, blocks: <range pruning: 3 to 3>]
├── push downs: [filters: [t1.a (#0) > 3 OR t1.a (#0) > 1], limit: NONE]
├── push downs: [filters: [(t1.a (#0) > 3 OR t1.a (#0) > 1)], limit: NONE]
└── estimated rows: 100.00

query T
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -491,7 +491,7 @@ Sort
├── estimated rows: 0.00
└── Filter
├── output columns: [onebrc.station_name (#0), onebrc.measurement (#1)]
├── filters: [is_true(onebrc.measurement (#1) > 0), is_true(onebrc.station_name (#0) = 'Beijing'), is_true(onebrc.measurement (#1) = 1 OR onebrc.measurement (#1) = 2)]
├── filters: [is_true(onebrc.measurement (#1) > 0), is_true(onebrc.station_name (#0) = 'Beijing'), is_true((onebrc.measurement (#1) = 1 OR onebrc.measurement (#1) = 2))]
├── estimated rows: 0.00
└── TableScan
├── table: default.test_index_db.onebrc
Expand All @@ -500,9 +500,9 @@ Sort
├── read size: 0
├── partitions total: 0
├── partitions scanned: 0
├── push downs: [filters: [and_filters(and_filters(onebrc.measurement (#1) > 0, onebrc.station_name (#0) = 'Beijing'), onebrc.measurement (#1) = 1 OR onebrc.measurement (#1) = 2)], limit: NONE]
├── push downs: [filters: [and_filters(and_filters(onebrc.measurement (#1) > 0, onebrc.station_name (#0) = 'Beijing'), (onebrc.measurement (#1) = 1 OR onebrc.measurement (#1) = 2))], limit: NONE]
├── aggregating index: [SELECT station_name, measurement, COUNT(), COUNT(measurement), MAX(measurement), MIN(measurement), SUM(measurement) FROM test_index_db.onebrc GROUP BY station_name, measurement]
├── rewritten query: [selection: [index_col_0 (#0), index_col_1 (#1), index_col_5 (#5), index_col_6 (#6), index_col_3 (#3), index_col_4 (#4)], filter: is_true(index_col_1 (#1) > CAST(0 AS Float64 NULL) AND index_col_0 (#0) = CAST('Beijing' AS String NULL) AND (index_col_1 (#1) = CAST(1 AS Float64 NULL) OR index_col_1 (#1) = CAST(2 AS Float64 NULL)))]
├── rewritten query: [selection: [index_col_0 (#0), index_col_1 (#1), index_col_5 (#5), index_col_6 (#6), index_col_3 (#3), index_col_4 (#4)], filter: is_true(((index_col_1 (#1) > CAST(0 AS Float64 NULL) AND index_col_0 (#0) = CAST('Beijing' AS String NULL)) AND (index_col_1 (#1) = CAST(1 AS Float64 NULL) OR index_col_1 (#1) = CAST(2 AS Float64 NULL))))]
└── estimated rows: 0.00

statement ok
Expand Down Expand Up @@ -620,7 +620,7 @@ Sort
├── estimated rows: 0.00
└── Filter
├── output columns: [onebrc.station_name (#0), onebrc.measurement (#1)]
├── filters: [is_true(onebrc.station_name (#0) = 'Paris' OR onebrc.station_name (#0) = 'Beijing')]
├── filters: [is_true((onebrc.station_name (#0) = 'Paris' OR onebrc.station_name (#0) = 'Beijing'))]
├── estimated rows: 0.00
└── TableScan
├── table: default.test_index_db.onebrc
Expand All @@ -629,7 +629,7 @@ Sort
├── read size: 0
├── partitions total: 0
├── partitions scanned: 0
├── push downs: [filters: [is_true(onebrc.station_name (#0) = 'Paris' OR onebrc.station_name (#0) = 'Beijing')], limit: NONE]
├── push downs: [filters: [is_true((onebrc.station_name (#0) = 'Paris' OR onebrc.station_name (#0) = 'Beijing'))], limit: NONE]
├── aggregating index: [SELECT station_name, COUNT(), COUNT(measurement), MAX(measurement), MIN(measurement), SUM(measurement) FROM test_index_db.onebrc WHERE station_name IN('Paris', 'Beijing') GROUP BY station_name]
├── rewritten query: [selection: [index_col_0 (#0), index_col_4 (#4), index_col_5 (#5), index_col_2 (#2), index_col_3 (#3)]]
└── estimated rows: 0.00
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -538,7 +538,7 @@ explain select * from t left join t t1 on t.a = t1.a where t1.a <= 1 or t.a < 1
----
Filter
├── output columns: [t.a (#0), t1.a (#1)]
├── filters: [is_true(t1.a (#1) <= 1 OR t.a (#0) < 1)]
├── filters: [is_true((t1.a (#1) <= 1 OR t.a (#0) < 1))]
├── estimated rows: 1.90
└── HashJoin
├── output columns: [t.a (#0), t1.a (#1)]
Expand Down Expand Up @@ -573,7 +573,7 @@ explain select * from t left join t t1 on t.a = t1.a where t1.a <= 1 or (t.a > 1
----
Filter
├── output columns: [t.a (#0), t1.a (#1)]
├── filters: [is_true(t1.a (#1) <= 1 OR t.a (#0) > 1 AND t1.a (#1) > 1)]
├── filters: [is_true((t1.a (#1) <= 1 OR (t.a (#0) > 1 AND t1.a (#1) > 1)))]
├── estimated rows: 6.13
└── HashJoin
├── output columns: [t.a (#0), t1.a (#1)]
Expand All @@ -584,7 +584,7 @@ Filter
├── estimated rows: 7.47
├── Filter(Build)
│ ├── output columns: [t1.a (#1)]
│ ├── filters: [is_true(t1.a (#1) <= 1 OR t1.a (#1) > 1)]
│ ├── filters: [is_true((t1.a (#1) <= 1 OR t1.a (#1) > 1))]
│ ├── estimated rows: 8.20
│ └── TableScan
│ ├── table: default.eliminate_outer_join.t
Expand All @@ -594,11 +594,11 @@ Filter
│ ├── partitions total: 1
│ ├── partitions scanned: 1
│ ├── pruning stats: [segments: <range pruning: 1 to 1>, blocks: <range pruning: 1 to 1>]
│ ├── push downs: [filters: [is_true(t.a (#1) <= 1 OR t.a (#1) > 1)], limit: NONE]
│ ├── push downs: [filters: [is_true((t.a (#1) <= 1 OR t.a (#1) > 1))], limit: NONE]
│ └── estimated rows: 10.00
└── Filter(Probe)
├── output columns: [t.a (#0)]
├── filters: [is_true(t.a (#0) <= 1 OR t.a (#0) > 1)]
├── filters: [is_true((t.a (#0) <= 1 OR t.a (#0) > 1))]
├── estimated rows: 8.20
└── TableScan
├── table: default.eliminate_outer_join.t
Expand All @@ -608,15 +608,15 @@ Filter
├── partitions total: 1
├── partitions scanned: 1
├── pruning stats: [segments: <range pruning: 1 to 1>, blocks: <range pruning: 1 to 1>]
├── push downs: [filters: [is_true(t.a (#0) <= 1 OR t.a (#0) > 1)], limit: NONE]
├── push downs: [filters: [is_true((t.a (#0) <= 1 OR t.a (#0) > 1))], limit: NONE]
└── estimated rows: 10.00

query T
explain select * from t left join t t1 on t.a = t1.a where t1.a <= 1 or (t.a > 1 and t.a < 2)
----
Filter
├── output columns: [t.a (#0), t1.a (#1)]
├── filters: [is_true(t1.a (#1) <= 1 OR t.a (#0) > 1 AND t.a (#0) < 2)]
├── filters: [is_true((t1.a (#1) <= 1 OR (t.a (#0) > 1 AND t.a (#0) < 2)))]
├── estimated rows: 2.80
└── HashJoin
├── output columns: [t.a (#0), t1.a (#1)]
Expand Down
Loading

0 comments on commit 85d834b

Please sign in to comment.