From d24909766d810de5d22213cd00a600f778125c17 Mon Sep 17 00:00:00 2001 From: sundyli <543950155@qq.com> Date: Mon, 14 Oct 2024 22:58:32 +0800 Subject: [PATCH 1/2] chore(query): add more logs on aggregation (#16552) * add consume_convert_blocks: * update * add tests * add more logs * add more logs --- Cargo.lock | 29 ----------- Cargo.toml | 18 +++++-- .../src/aggregate/aggregate_hashtable.rs | 2 + src/query/expression/src/block.rs | 12 +++++ src/query/expression/src/converts/arrow/to.rs | 2 +- src/query/expression/tests/it/sort.rs | 50 +++++++++++++++++++ src/query/functions/Cargo.toml | 1 - .../src/aggregates/aggregate_min_max_any.rs | 31 ++++++++++++ .../functions/src/aggregates/aggregate_sum.rs | 42 +++++++++++----- .../src/aggregates/aggregate_unary.rs | 30 +++++------ .../aggregator/transform_aggregate_partial.rs | 37 +++++++++++++- .../aggregator/transform_group_by_partial.rs | 44 ++++++++++++++-- .../aggregator/transform_single_key.rs | 37 +++++++++++++- .../hash_join/transform_hash_join_probe.rs | 2 +- .../transforms/window/transform_window.rs | 2 +- .../mysql/writers/query_result_writer.rs | 2 +- .../fuse/operations/internal_column.rs | 4 +- .../storages/system/src/temp_files_table.rs | 2 +- 18 files changed, 271 insertions(+), 76 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index d6f1aee416be1..5b5e4c190107c 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3577,7 +3577,6 @@ dependencies = [ "libm", "match-template", "md-5", - "multiversion", "naive-cityhash", "num-traits", "once_cell", @@ -10209,28 +10208,6 @@ version = "0.10.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "defc4c55412d89136f966bbb339008b474350e5e6e78d2714439c386b3137a03" -[[package]] -name = "multiversion" -version = "0.7.4" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "c4851161a11d3ad0bf9402d90ffc3967bf231768bfd7aeb61755ad06dbf1a142" -dependencies = [ - "multiversion-macros", - "target-features", -] - -[[package]] -name = "multiversion-macros" -version = "0.7.4" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "79a74ddee9e0c27d2578323c13905793e91622148f138ba29738f9dddb835e90" -dependencies = [ - "proc-macro2", - "quote", - "syn 1.0.109", - "target-features", -] - [[package]] name = "mur3" version = "0.1.0" @@ -14964,12 +14941,6 @@ version = "1.0.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "55937e1799185b12863d447f42597ed69d9928686b8d88a1df17376a097d8369" -[[package]] -name = "target-features" -version = "0.1.6" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "c1bbb9f3c5c463a01705937a24fdabc5047929ac764b2d5b9cf681c1f5041ed5" - [[package]] name = "target-lexicon" version = "0.12.16" diff --git a/Cargo.toml b/Cargo.toml index d3d9aad56f3f9..c8273d4b01bd5 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -351,18 +351,26 @@ useless_format = "allow" mutable_key_type = "allow" result_large_err = "allow" +## DONT'T DELETE THIS: If we want best performance, we should use this profile but it will take longer time to compile. +## Test SQL: +## select sum(number) from numbers_mt(10000000000); ~ 3x performance +## select max(number) from numbers_mt(10000000000); ~ 3x performance +# [profile.release] +# debug = 1 +# lto = "thin" +# overflow-checks = false +# incremental = false +# codegen-units = 1 + [profile.release] debug = 1 lto = "thin" overflow-checks = false +opt-level = "s" ## defaults to be 3 incremental = false -opt-level = "s" - -# codegen-units = 1 # Reduce number of codegen units to increase optimizations. # [profile.release.package] -# arrow2 = { codegen-units = 4 } -# common-functions = { codegen-units = 16 } +# databend-common-arrow = { codegen-units = 16 } # databend-query = { codegen-units = 4 } # databend-binaries = { codegen-units = 4 } diff --git a/src/query/expression/src/aggregate/aggregate_hashtable.rs b/src/query/expression/src/aggregate/aggregate_hashtable.rs index 00c48e83293eb..07d403518e0b1 100644 --- a/src/query/expression/src/aggregate/aggregate_hashtable.rs +++ b/src/query/expression/src/aggregate/aggregate_hashtable.rs @@ -49,6 +49,7 @@ pub struct AggregateHashTable { // use for append rows directly during deserialize pub direct_append: bool, pub config: HashTableConfig, + current_radix_bits: u64, entries: Vec, count: usize, @@ -585,6 +586,7 @@ impl AggregateHashTable { .iter() .map(|arena| arena.allocated_bytes()) .sum::() + + self.entries.len() * std::mem::size_of::() } } diff --git a/src/query/expression/src/block.rs b/src/query/expression/src/block.rs index 820d5841d4a3e..866f976644f34 100644 --- a/src/query/expression/src/block.rs +++ b/src/query/expression/src/block.rs @@ -240,6 +240,18 @@ impl DataBlock { self.columns().iter().map(|entry| entry.memory_size()).sum() } + pub fn consume_convert_to_full(self) -> Self { + if self + .columns() + .iter() + .all(|entry| entry.value.as_column().is_some()) + { + return self; + } + + self.convert_to_full() + } + pub fn convert_to_full(&self) -> Self { let columns = self .columns() diff --git a/src/query/expression/src/converts/arrow/to.rs b/src/query/expression/src/converts/arrow/to.rs index 98379b1cd1f07..391d2893f596e 100644 --- a/src/query/expression/src/converts/arrow/to.rs +++ b/src/query/expression/src/converts/arrow/to.rs @@ -101,7 +101,7 @@ impl DataBlock { let arrow_schema = table_schema_to_arrow_schema(table_schema); let mut arrays = Vec::with_capacity(self.columns().len()); for (entry, arrow_field) in self - .convert_to_full() + .consume_convert_to_full() .columns() .iter() .zip(arrow_schema.fields()) diff --git a/src/query/expression/tests/it/sort.rs b/src/query/expression/tests/it/sort.rs index 9c72d7d6ab286..1d1dbf6268440 100644 --- a/src/query/expression/tests/it/sort.rs +++ b/src/query/expression/tests/it/sort.rs @@ -24,6 +24,7 @@ use databend_common_expression::FromData; use databend_common_expression::SortColumnDescription; use crate::common::new_block; +use crate::rand_block_for_all_types; #[test] fn test_block_sort() -> Result<()> { @@ -201,3 +202,52 @@ fn test_block_sort() -> Result<()> { Ok(()) } + +#[test] +fn sort_concat() { + // Sort(Sort A || Sort B) = Sort (A || B) + use databend_common_expression::DataBlock; + use itertools::Itertools; + use rand::seq::SliceRandom; + use rand::Rng; + + let mut rng = rand::thread_rng(); + let num_blocks = 100; + + for _i in 0..num_blocks { + let block_a = rand_block_for_all_types(rng.gen_range(0..100)); + let block_b = rand_block_for_all_types(rng.gen_range(0..100)); + + let mut sort_index: Vec = (0..block_a.num_columns()).collect(); + sort_index.shuffle(&mut rng); + + let sort_desc = sort_index + .iter() + .map(|i| SortColumnDescription { + offset: *i, + asc: rng.gen_bool(0.5), + nulls_first: rng.gen_bool(0.5), + is_nullable: rng.gen_bool(0.5), + }) + .collect_vec(); + + let concat_ab_0 = DataBlock::concat(&[block_a.clone(), block_b.clone()]).unwrap(); + + let sort_a = DataBlock::sort(&block_a, &sort_desc, None).unwrap(); + let sort_b = DataBlock::sort(&block_b, &sort_desc, None).unwrap(); + let concat_ab_1 = DataBlock::concat(&[sort_a, sort_b]).unwrap(); + + let block_1 = DataBlock::sort(&concat_ab_0, &sort_desc, None).unwrap(); + let block_2 = DataBlock::sort(&concat_ab_1, &sort_desc, None).unwrap(); + + assert_eq!(block_1.num_columns(), block_2.num_columns()); + assert_eq!(block_1.num_rows(), block_2.num_rows()); + + let columns_1 = block_1.columns(); + let columns_2 = block_2.columns(); + for idx in 0..columns_1.len() { + assert_eq!(columns_1[idx].data_type, columns_2[idx].data_type); + assert_eq!(columns_1[idx].value, columns_2[idx].value); + } + } +} diff --git a/src/query/functions/Cargo.toml b/src/query/functions/Cargo.toml index 16e6f6c6d2e41..615ff7f9d0d82 100644 --- a/src/query/functions/Cargo.toml +++ b/src/query/functions/Cargo.toml @@ -47,7 +47,6 @@ lexical-core = "0.8.5" libm = "0.2.6" match-template = { workspace = true } md-5 = "0.10.5" -multiversion = "0.7.4" naive-cityhash = "0.2.0" num-traits = "0.2.15" once_cell = { workspace = true } diff --git a/src/query/functions/src/aggregates/aggregate_min_max_any.rs b/src/query/functions/src/aggregates/aggregate_min_max_any.rs index 9efdc985f13d9..6ab35d792d80f 100644 --- a/src/query/functions/src/aggregates/aggregate_min_max_any.rs +++ b/src/query/functions/src/aggregates/aggregate_min_max_any.rs @@ -17,6 +17,7 @@ use std::sync::Arc; use borsh::BorshDeserialize; use borsh::BorshSerialize; +use databend_common_arrow::arrow::bitmap::Bitmap; use databend_common_exception::ErrorCode; use databend_common_exception::Result; use databend_common_expression::types::decimal::*; @@ -92,6 +93,36 @@ where Ok(()) } + fn add_batch( + &mut self, + other: T::Column, + validity: Option<&Bitmap>, + function_data: Option<&dyn FunctionData>, + ) -> Result<()> { + let column_len = T::column_len(&other); + if column_len == 0 { + return Ok(()); + } + + let column_iter = T::iter_column(&other); + if let Some(validity) = validity { + if validity.unset_bits() == column_len { + return Ok(()); + } + for (data, valid) in column_iter.zip(validity.iter()) { + if valid { + let _ = self.add(data, function_data); + } + } + } else { + let v = column_iter.reduce(|l, r| if !C::change_if(&l, &r) { l } else { r }); + if let Some(v) = v { + let _ = self.add(v, function_data); + } + } + Ok(()) + } + fn merge(&mut self, rhs: &Self) -> Result<()> { if let Some(v) = &rhs.value { self.add(T::to_scalar_ref(v), None)?; diff --git a/src/query/functions/src/aggregates/aggregate_sum.rs b/src/query/functions/src/aggregates/aggregate_sum.rs index 116ba6e46c46f..355d8dfa8a410 100644 --- a/src/query/functions/src/aggregates/aggregate_sum.rs +++ b/src/query/functions/src/aggregates/aggregate_sum.rs @@ -15,6 +15,7 @@ use borsh::BorshDeserialize; use borsh::BorshSerialize; use databend_common_arrow::arrow::bitmap::Bitmap; +use databend_common_arrow::arrow::buffer::Buffer; use databend_common_exception::ErrorCode; use databend_common_exception::Result; use databend_common_expression::types::decimal::*; @@ -80,21 +81,33 @@ where } } -#[multiversion::multiversion(targets("x86_64+avx", "x86_64+sse"))] -fn sum_batch(other: T::Column) -> N::Scalar +// #[multiversion::multiversion(targets("x86_64+avx", "x86_64+sse"))] +#[inline] +pub fn sum_batch(inner: Buffer, validity: Option<&Bitmap>) -> TSum where - T: ValueType + Sync + Send, - N: ValueType, - T::Scalar: Number + AsPrimitive, - N::Scalar: Number + AsPrimitive + std::ops::AddAssign, - for<'a> T::ScalarRef<'a>: Number + AsPrimitive, + T: Number + AsPrimitive, + TSum: Number + std::ops::AddAssign, { - // use temp variable to hint the compiler to unroll the loop - let mut sum = N::Scalar::default(); - for value in T::iter_column(&other) { - sum += value.as_(); + match validity { + Some(v) if v.unset_bits() > 0 => { + let mut sum = TSum::default(); + inner.iter().zip(v.iter()).for_each(|(t, b)| { + if b { + sum += t.as_(); + } + }); + + sum + } + _ => { + let mut sum = TSum::default(); + inner.iter().for_each(|t| { + sum += t.as_(); + }); + + sum + } } - sum } impl UnaryState for NumberSumState @@ -117,9 +130,12 @@ where fn add_batch( &mut self, other: T::Column, + validity: Option<&Bitmap>, _function_data: Option<&dyn FunctionData>, ) -> Result<()> { - self.value += sum_batch::(other); + let col = T::upcast_column(other); + let buffer = NumberType::::try_downcast_column(&col).unwrap(); + self.value += sum_batch::(buffer, validity); Ok(()) } diff --git a/src/query/functions/src/aggregates/aggregate_unary.rs b/src/query/functions/src/aggregates/aggregate_unary.rs index fa85cffcce0b8..5ac0bc5a4d22b 100644 --- a/src/query/functions/src/aggregates/aggregate_unary.rs +++ b/src/query/functions/src/aggregates/aggregate_unary.rs @@ -47,10 +47,22 @@ where fn add_batch( &mut self, other: T::Column, + validity: Option<&Bitmap>, function_data: Option<&dyn FunctionData>, ) -> Result<()> { - for value in T::iter_column(&other) { - self.add(value, function_data)?; + match validity { + Some(validity) => { + for (data, valid) in T::iter_column(&other).zip(validity.iter()) { + if valid { + self.add(data, function_data)?; + } + } + } + None => { + for value in T::iter_column(&other) { + self.add(value, function_data)?; + } + } } Ok(()) } @@ -206,18 +218,8 @@ where ) -> Result<()> { let column = T::try_downcast_column(&columns[0]).unwrap(); let state: &mut S = place.get::(); - match validity { - Some(bitmap) if bitmap.unset_bits() > 0 => { - let column_iter = T::iter_column(&column); - for (value, is_valid) in column_iter.zip(bitmap.iter()) { - if is_valid { - state.add(value, self.function_data.as_deref())?; - } - } - Ok(()) - } - _ => state.add_batch(column, self.function_data.as_deref()), - } + + state.add_batch(column, validity, self.function_data.as_deref()) } fn accumulate_row(&self, place: StateAddr, columns: InputColumns, row: usize) -> Result<()> { diff --git a/src/query/service/src/pipelines/processors/transforms/aggregator/transform_aggregate_partial.rs b/src/query/service/src/pipelines/processors/transforms/aggregator/transform_aggregate_partial.rs index 28d6dea1eedd1..cbd229d9b7ff7 100644 --- a/src/query/service/src/pipelines/processors/transforms/aggregator/transform_aggregate_partial.rs +++ b/src/query/service/src/pipelines/processors/transforms/aggregator/transform_aggregate_partial.rs @@ -13,6 +13,7 @@ // limitations under the License. use std::sync::Arc; +use std::time::Instant; use std::vec; use bumpalo::Bump; @@ -111,6 +112,10 @@ pub struct TransformPartialAggregate { hash_table: HashTable, probe_state: ProbeState, params: Arc, + start: Instant, + first_block_start: Option, + processed_bytes: usize, + processed_rows: usize, } impl TransformPartialAggregate { @@ -164,6 +169,10 @@ impl TransformPartialAggregate { hash_table, probe_state: ProbeState::default(), settings: AggregateSettings::try_from(ctx)?, + start: Instant::now(), + first_block_start: None, + processed_bytes: 0, + processed_rows: 0, }, )) } @@ -239,10 +248,16 @@ impl TransformPartialAggregate { .map(|index| index.is_agg) .unwrap_or_default(); - let block = block.convert_to_full(); + let block = block.consume_convert_to_full(); let group_columns = InputColumns::new_block_proxy(&self.params.group_columns, &block); let rows_num = block.num_rows(); + self.processed_bytes += block.memory_size(); + self.processed_rows += rows_num; + if self.first_block_start.is_none() { + self.first_block_start = Some(Instant::now()); + } + { match &mut self.hash_table { HashTable::MovedOut => unreachable!(), @@ -449,6 +464,26 @@ impl AccumulatingTransform for TransformPartialAggrega HashTable::AggregateHashTable(hashtable) => { let partition_count = hashtable.payload.partition_count(); let mut blocks = Vec::with_capacity(partition_count); + + log::info!( + "Aggregated {} to {} rows in {} sec(real: {}). ({} rows/sec, {}/sec, {})", + self.processed_rows, + hashtable.payload.len(), + self.start.elapsed().as_secs_f64(), + if let Some(t) = &self.first_block_start { + t.elapsed().as_secs_f64() + } else { + self.start.elapsed().as_secs_f64() + }, + convert_number_size( + self.processed_rows as f64 / self.start.elapsed().as_secs_f64() + ), + convert_byte_size( + self.processed_bytes as f64 / self.start.elapsed().as_secs_f64() + ), + convert_byte_size(self.processed_bytes as f64), + ); + for (bucket, payload) in hashtable.payload.payloads.into_iter().enumerate() { if payload.len() != 0 { blocks.push(DataBlock::empty_with_meta( diff --git a/src/query/service/src/pipelines/processors/transforms/aggregator/transform_group_by_partial.rs b/src/query/service/src/pipelines/processors/transforms/aggregator/transform_group_by_partial.rs index e4062b84a70df..7878df9e5ef87 100644 --- a/src/query/service/src/pipelines/processors/transforms/aggregator/transform_group_by_partial.rs +++ b/src/query/service/src/pipelines/processors/transforms/aggregator/transform_group_by_partial.rs @@ -13,6 +13,7 @@ // limitations under the License. use std::sync::Arc; +use std::time::Instant; use std::vec; use bumpalo::Bump; @@ -107,6 +108,11 @@ pub struct TransformPartialGroupBy { probe_state: ProbeState, settings: GroupBySettings, params: Arc, + + start: Instant, + first_block_start: Option, + processed_rows: usize, + processed_bytes: usize, } impl TransformPartialGroupBy { @@ -142,6 +148,10 @@ impl TransformPartialGroupBy { probe_state: ProbeState::default(), params, settings: GroupBySettings::try_from(ctx)?, + start: Instant::now(), + first_block_start: None, + processed_bytes: 0, + processed_rows: 0, }, )) } @@ -151,12 +161,19 @@ impl AccumulatingTransform for TransformPartialGroupBy const NAME: &'static str = "TransformPartialGroupBy"; fn transform(&mut self, block: DataBlock) -> Result> { - let block = block.convert_to_full(); + let block = block.consume_convert_to_full(); + + let rows_num = block.num_rows(); + + self.processed_bytes += block.memory_size(); + self.processed_rows += rows_num; + if self.first_block_start.is_none() { + self.first_block_start = Some(Instant::now()); + } + let group_columns = InputColumns::new_block_proxy(&self.params.group_columns, &block); { - let rows_num = block.num_rows(); - match &mut self.hash_table { HashTable::MovedOut => unreachable!(), HashTable::HashTable(cell) => { @@ -305,6 +322,26 @@ impl AccumulatingTransform for TransformPartialGroupBy HashTable::AggregateHashTable(hashtable) => { let partition_count = hashtable.payload.partition_count(); let mut blocks = Vec::with_capacity(partition_count); + + log::info!( + "Aggregated {} to {} rows in {} sec(real: {}). ({} rows/sec, {}/sec, {})", + self.processed_rows, + hashtable.payload.len(), + self.start.elapsed().as_secs_f64(), + if let Some(t) = &self.first_block_start { + t.elapsed().as_secs_f64() + } else { + self.start.elapsed().as_secs_f64() + }, + convert_number_size( + self.processed_rows as f64 / self.start.elapsed().as_secs_f64() + ), + convert_byte_size( + self.processed_bytes as f64 / self.start.elapsed().as_secs_f64() + ), + convert_byte_size(self.processed_bytes as f64), + ); + for (bucket, payload) in hashtable.payload.payloads.into_iter().enumerate() { if payload.len() != 0 { blocks.push(DataBlock::empty_with_meta( @@ -316,7 +353,6 @@ impl AccumulatingTransform for TransformPartialGroupBy )); } } - blocks } }) diff --git a/src/query/service/src/pipelines/processors/transforms/aggregator/transform_single_key.rs b/src/query/service/src/pipelines/processors/transforms/aggregator/transform_single_key.rs index de05ea7d2e316..1de36a979b1fb 100644 --- a/src/query/service/src/pipelines/processors/transforms/aggregator/transform_single_key.rs +++ b/src/query/service/src/pipelines/processors/transforms/aggregator/transform_single_key.rs @@ -15,9 +15,12 @@ use std::alloc::Layout; use std::borrow::BorrowMut; use std::sync::Arc; +use std::time::Instant; use std::vec; use bumpalo::Bump; +use databend_common_base::base::convert_byte_size; +use databend_common_base::base::convert_number_size; use databend_common_catalog::plan::AggIndexMeta; use databend_common_exception::ErrorCode; use databend_common_exception::Result; @@ -47,6 +50,11 @@ pub struct PartialSingleStateAggregator { places: Vec, arg_indices: Vec>, funcs: Vec, + + start: Instant, + first_block_start: Option, + rows: usize, + bytes: usize, } impl PartialSingleStateAggregator { @@ -76,6 +84,10 @@ impl PartialSingleStateAggregator { places, funcs: params.aggregate_functions.clone(), arg_indices: params.aggregate_functions_arguments.clone(), + start: Instant::now(), + first_block_start: None, + rows: 0, + bytes: 0, }) } } @@ -84,13 +96,17 @@ impl AccumulatingTransform for PartialSingleStateAggregator { const NAME: &'static str = "AggregatorPartialTransform"; fn transform(&mut self, block: DataBlock) -> Result> { + if self.first_block_start.is_none() { + self.first_block_start = Some(Instant::now()); + } + let is_agg_index_block = block .get_meta() .and_then(AggIndexMeta::downcast_ref_from) .map(|index| index.is_agg) .unwrap_or_default(); - let block = block.convert_to_full(); + let block = block.consume_convert_to_full(); for (idx, func) in self.funcs.iter().enumerate() { let place = self.places[idx]; @@ -107,6 +123,9 @@ impl AccumulatingTransform for PartialSingleStateAggregator { } } + self.rows += block.num_rows(); + self.bytes += block.memory_size(); + Ok(vec![]) } @@ -137,6 +156,20 @@ impl AccumulatingTransform for PartialSingleStateAggregator { } } + log::info!( + "Aggregated {} to 1 rows in {} sec (real: {}). ({} rows/sec, {}/sec, {})", + self.rows, + self.start.elapsed().as_secs_f64(), + if let Some(t) = &self.first_block_start { + t.elapsed().as_secs_f64() + } else { + self.start.elapsed().as_secs_f64() + }, + convert_number_size(self.rows as f64 / self.start.elapsed().as_secs_f64()), + convert_byte_size(self.bytes as f64 / self.start.elapsed().as_secs_f64()), + convert_byte_size(self.bytes as _), + ); + Ok(generate_data_block) } } @@ -195,7 +228,7 @@ impl AccumulatingTransform for FinalSingleStateAggregator { fn transform(&mut self, block: DataBlock) -> Result> { if !block.is_empty() { - let block = block.convert_to_full(); + let block = block.consume_convert_to_full(); for (index, _) in self.funcs.iter().enumerate() { let binary_array = block.get_by_offset(index).value.as_column().unwrap(); diff --git a/src/query/service/src/pipelines/processors/transforms/hash_join/transform_hash_join_probe.rs b/src/query/service/src/pipelines/processors/transforms/hash_join/transform_hash_join_probe.rs index fb5f9a649b358..c478a19f5ebf1 100644 --- a/src/query/service/src/pipelines/processors/transforms/hash_join/transform_hash_join_probe.rs +++ b/src/query/service/src/pipelines/processors/transforms/hash_join/transform_hash_join_probe.rs @@ -352,7 +352,7 @@ impl Processor for TransformHashJoinProbe { { self.probe_hash_table(data_block)?; } else if let Some(data_block) = self.input_data_blocks.pop_front() { - let data_block = data_block.convert_to_full(); + let data_block = data_block.consume_convert_to_full(); self.probe_hash_table(data_block)?; } } diff --git a/src/query/service/src/pipelines/processors/transforms/window/transform_window.rs b/src/query/service/src/pipelines/processors/transforms/window/transform_window.rs index 21371abcec8e9..a168b43cacb17 100644 --- a/src/query/service/src/pipelines/processors/transforms/window/transform_window.rs +++ b/src/query/service/src/pipelines/processors/transforms/window/transform_window.rs @@ -1005,7 +1005,7 @@ where T: Number + ResultTypeOfUnary let num_rows = data.num_rows(); if num_rows != 0 { self.blocks.push_back(WindowBlock { - block: data.convert_to_full(), + block: data.consume_convert_to_full(), builder: ColumnBuilder::with_capacity(&self.func.return_type()?, num_rows), }); } diff --git a/src/query/service/src/servers/mysql/writers/query_result_writer.rs b/src/query/service/src/servers/mysql/writers/query_result_writer.rs index 6beefcb0525e6..d0251eefa50a3 100644 --- a/src/query/service/src/servers/mysql/writers/query_result_writer.rs +++ b/src/query/service/src/servers/mysql/writers/query_result_writer.rs @@ -242,7 +242,7 @@ impl<'a, W: AsyncWrite + Send + Unpin> DFQueryResultWriter<'a, W> { let mut buf = Vec::::new(); let columns = block - .convert_to_full() + .consume_convert_to_full() .columns() .iter() .map(|column| column.value.clone().into_column().unwrap()) diff --git a/src/query/service/tests/it/storages/fuse/operations/internal_column.rs b/src/query/service/tests/it/storages/fuse/operations/internal_column.rs index 5527734d4cb91..a759e15d1eb00 100644 --- a/src/query/service/tests/it/storages/fuse/operations/internal_column.rs +++ b/src/query/service/tests/it/storages/fuse/operations/internal_column.rs @@ -71,8 +71,8 @@ fn expected_data_block( } fn check_data_block(expected: Vec, blocks: Vec) -> Result<()> { - let expected_data_block = DataBlock::concat(&expected)?.convert_to_full(); - let data_block = DataBlock::concat(&blocks)?.convert_to_full(); + let expected_data_block = DataBlock::concat(&expected)?.consume_convert_to_full(); + let data_block = DataBlock::concat(&blocks)?.consume_convert_to_full(); for (expected_column, column) in expected_data_block .columns() diff --git a/src/query/storages/system/src/temp_files_table.rs b/src/query/storages/system/src/temp_files_table.rs index 34ca6ddfa459a..6c53b84a73d65 100644 --- a/src/query/storages/system/src/temp_files_table.rs +++ b/src/query/storages/system/src/temp_files_table.rs @@ -119,7 +119,7 @@ impl AsyncSystemTable for TempFilesTable { num_rows, ); - Ok(data_block.convert_to_full()) + Ok(data_block.consume_convert_to_full()) } } From 86cd6084b1c9ce08791517ab75d9c1ff8e4c6a06 Mon Sep 17 00:00:00 2001 From: Winnie-Hong0927 <136137323+Winnie-Hong0927@users.noreply.github.com> Date: Tue, 15 Oct 2024 11:11:05 +0800 Subject: [PATCH 2/2] feat(query): Support `show dictionaries` DQL. (#16602) * feat: show dictionaries stmt,planner. * feat: add storages dictionaries * feat: dictionaries_table. * update: disctionaries_table and binder. * fix * fix * fix and test. * fmt * update. * update binder and test. * update privilege_access and testdata. --- .../ast/src/ast/statements/dictionary.rs | 20 ++ src/query/ast/src/ast/statements/statement.rs | 11 +- src/query/ast/src/parser/statement.rs | 10 +- .../src/databases/system/system_database.rs | 2 + .../interpreters/access/privilege_access.rs | 6 +- .../it/storages/testdata/columns_table.txt | 10 + src/query/sql/src/planner/binder/binder.rs | 2 +- .../sql/src/planner/binder/ddl/dictionary.rs | 52 +++++ src/query/sql/src/planner/plans/plan.rs | 1 + .../storages/system/src/dictionaries_table.rs | 200 ++++++++++++++++++ src/query/storages/system/src/lib.rs | 2 + .../06_show/06_0024_show_dictionaries.test | 65 ++++++ 12 files changed, 367 insertions(+), 14 deletions(-) create mode 100644 src/query/storages/system/src/dictionaries_table.rs create mode 100644 tests/sqllogictests/suites/base/06_show/06_0024_show_dictionaries.test diff --git a/src/query/ast/src/ast/statements/dictionary.rs b/src/query/ast/src/ast/statements/dictionary.rs index 88508fb75c195..2ddf086c7e110 100644 --- a/src/query/ast/src/ast/statements/dictionary.rs +++ b/src/query/ast/src/ast/statements/dictionary.rs @@ -19,6 +19,7 @@ use std::fmt::Formatter; use derive_visitor::Drive; use derive_visitor::DriveMut; +use super::ShowLimit; use crate::ast::write_comma_separated_list; use crate::ast::write_dot_separated_list; use crate::ast::write_space_separated_string_map; @@ -123,3 +124,22 @@ impl Display for ShowCreateDictionaryStmt { ) } } + +#[derive(Debug, Clone, PartialEq, Drive, DriveMut)] +pub struct ShowDictionariesStmt { + pub database: Option, + pub limit: Option, +} + +impl Display for ShowDictionariesStmt { + fn fmt(&self, f: &mut Formatter) -> std::fmt::Result { + write!(f, "SHOW DICTIONARIES")?; + if let Some(database) = &self.database { + write!(f, " FROM {database}")?; + } + if let Some(limit) = &self.limit { + write!(f, " {limit}")?; + } + Ok(()) + } +} diff --git a/src/query/ast/src/ast/statements/statement.rs b/src/query/ast/src/ast/statements/statement.rs index 0607f1682d765..945679be3a69f 100644 --- a/src/query/ast/src/ast/statements/statement.rs +++ b/src/query/ast/src/ast/statements/statement.rs @@ -158,9 +158,7 @@ pub enum Statement { CreateDictionary(CreateDictionaryStmt), DropDictionary(DropDictionaryStmt), ShowCreateDictionary(ShowCreateDictionaryStmt), - ShowDictionaries { - show_options: Option, - }, + ShowDictionaries(ShowDictionariesStmt), // Columns ShowColumns(ShowColumnsStmt), @@ -613,12 +611,7 @@ impl Display for Statement { Statement::CreateDictionary(stmt) => write!(f, "{stmt}")?, Statement::DropDictionary(stmt) => write!(f, "{stmt}")?, Statement::ShowCreateDictionary(stmt) => write!(f, "{stmt}")?, - Statement::ShowDictionaries { show_options } => { - write!(f, "SHOW DICTIONARIES")?; - if let Some(show_options) = show_options { - write!(f, " {show_options}")?; - } - } + Statement::ShowDictionaries(stmt) => write!(f, "{stmt}")?, Statement::CreateView(stmt) => write!(f, "{stmt}")?, Statement::AlterView(stmt) => write!(f, "{stmt}")?, Statement::DropView(stmt) => write!(f, "{stmt}")?, diff --git a/src/query/ast/src/parser/statement.rs b/src/query/ast/src/parser/statement.rs index f2e50528b5e98..8c547eba458e5 100644 --- a/src/query/ast/src/parser/statement.rs +++ b/src/query/ast/src/parser/statement.rs @@ -957,9 +957,15 @@ pub fn statement_body(i: Input) -> IResult { ); let show_dictionaries = map( rule! { - SHOW ~ DICTIONARIES ~ #show_options? + SHOW ~ DICTIONARIES ~ ((FROM|IN) ~ #ident)? ~ #show_limit? + }, + |(_, _, db, limit)| { + let database = match db { + Some((_, d)) => Some(d), + _ => None, + }; + Statement::ShowDictionaries(ShowDictionariesStmt { database, limit }) }, - |(_, _, show_options)| Statement::ShowDictionaries { show_options }, ); let show_create_dictionary = map( rule! { diff --git a/src/query/service/src/databases/system/system_database.rs b/src/query/service/src/databases/system/system_database.rs index 0ae6e34ee9dc7..93702af95b023 100644 --- a/src/query/service/src/databases/system/system_database.rs +++ b/src/query/service/src/databases/system/system_database.rs @@ -35,6 +35,7 @@ use databend_common_storages_system::ConfigsTable; use databend_common_storages_system::ContributorsTable; use databend_common_storages_system::CreditsTable; use databend_common_storages_system::DatabasesTable; +use databend_common_storages_system::DictionariesTable; use databend_common_storages_system::EnginesTable; use databend_common_storages_system::FullStreamsTable; use databend_common_storages_system::FunctionsTable; @@ -144,6 +145,7 @@ impl SystemDatabase { ViewsTableWithoutHistory::create(sys_db_meta.next_table_id()), TemporaryTablesTable::create(sys_db_meta.next_table_id()), ProceduresTable::create(sys_db_meta.next_table_id()), + DictionariesTable::create(sys_db_meta.next_table_id()), ]; let disable_tables = Self::disable_system_tables(); diff --git a/src/query/service/src/interpreters/access/privilege_access.rs b/src/query/service/src/interpreters/access/privilege_access.rs index 009e2a8154e42..0eaa433bc235e 100644 --- a/src/query/service/src/interpreters/access/privilege_access.rs +++ b/src/query/service/src/interpreters/access/privilege_access.rs @@ -61,10 +61,11 @@ enum ObjectId { // some statements like `SELECT 1`, `SHOW USERS`, `SHOW ROLES`, `SHOW TABLES` will be // rewritten to the queries on the system tables, we need to skip the privilege check on // these tables. -const SYSTEM_TABLES_ALLOW_LIST: [&str; 19] = [ +const SYSTEM_TABLES_ALLOW_LIST: [&str; 20] = [ "catalogs", "columns", "databases", + "dictionaries", "tables", "views", "tables_with_history", @@ -709,7 +710,8 @@ impl AccessChecker for PrivilegeAccess { Some(RewriteKind::ShowDatabases) | Some(RewriteKind::ShowEngines) | Some(RewriteKind::ShowFunctions) - | Some(RewriteKind::ShowUserFunctions) => { + | Some(RewriteKind::ShowUserFunctions) + | Some(RewriteKind::ShowDictionaries(_)) => { return Ok(()); } | Some(RewriteKind::ShowTableFunctions) => { diff --git a/src/query/service/tests/it/storages/testdata/columns_table.txt b/src/query/service/tests/it/storages/testdata/columns_table.txt index 614f58070a07d..df5879922802c 100644 --- a/src/query/service/tests/it/storages/testdata/columns_table.txt +++ b/src/query/service/tests/it/storages/testdata/columns_table.txt @@ -15,6 +15,8 @@ DB.Table: 'system'.'columns', Table: columns-table_id:1, ver:0, Engine: SystemCo | 'arguments' | 'system' | 'procedures' | 'String' | 'VARCHAR' | '' | '' | 'NO' | '' | | 'arguments' | 'system' | 'user_functions' | 'Variant' | 'VARIANT' | '' | '' | 'NO' | '' | | 'attempt_number' | 'system' | 'task_history' | 'Int32' | 'INT' | '' | '' | 'NO' | '' | +| 'attribute_names' | 'system' | 'dictionaries' | 'Array(String)' | 'ARRAY(STRING)' | '' | '' | 'NO' | '' | +| 'attribute_types' | 'system' | 'dictionaries' | 'Array(String)' | 'ARRAY(STRING)' | '' | '' | 'NO' | '' | | 'auth_type' | 'system' | 'users' | 'String' | 'VARCHAR' | '' | '' | 'NO' | '' | | 'auto_increment' | 'information_schema' | 'tables' | 'NULL' | 'NULL' | '' | '' | 'NO' | '' | | 'byte_size' | 'system' | 'clustering_history' | 'UInt64' | 'BIGINT UNSIGNED' | '' | '' | 'NO' | '' | @@ -59,6 +61,7 @@ DB.Table: 'system'.'columns', Table: columns-table_id:1, ver:0, Engine: SystemCo | 'command' | 'system' | 'processes' | 'String' | 'VARCHAR' | '' | '' | 'NO' | '' | | 'comment' | 'information_schema' | 'statistics' | 'NULL' | 'NULL' | '' | '' | 'NO' | '' | | 'comment' | 'system' | 'columns' | 'String' | 'VARCHAR' | '' | '' | 'NO' | '' | +| 'comment' | 'system' | 'dictionaries' | 'String' | 'VARCHAR' | '' | '' | 'NO' | '' | | 'comment' | 'system' | 'notifications' | 'Nullable(String)' | 'VARCHAR' | '' | '' | 'YES' | '' | | 'comment' | 'system' | 'password_policies' | 'String' | 'VARCHAR' | '' | '' | 'NO' | '' | | 'comment' | 'system' | 'procedures' | 'String' | 'VARCHAR' | '' | '' | 'NO' | '' | @@ -82,6 +85,7 @@ DB.Table: 'system'.'columns', Table: columns-table_id:1, ver:0, Engine: SystemCo | 'create_time' | 'information_schema' | 'tables' | 'Timestamp' | 'TIMESTAMP' | '' | '' | 'NO' | '' | | 'created_on' | 'system' | 'background_jobs' | 'Timestamp' | 'TIMESTAMP' | '' | '' | 'NO' | '' | | 'created_on' | 'system' | 'background_tasks' | 'Timestamp' | 'TIMESTAMP' | '' | '' | 'NO' | '' | +| 'created_on' | 'system' | 'dictionaries' | 'Timestamp' | 'TIMESTAMP' | '' | '' | 'NO' | '' | | 'created_on' | 'system' | 'indexes' | 'Timestamp' | 'TIMESTAMP' | '' | '' | 'NO' | '' | | 'created_on' | 'system' | 'locks' | 'Timestamp' | 'TIMESTAMP' | '' | '' | 'NO' | '' | | 'created_on' | 'system' | 'notification_history' | 'Timestamp' | 'TIMESTAMP' | '' | '' | 'NO' | '' | @@ -116,6 +120,7 @@ DB.Table: 'system'.'columns', Table: columns-table_id:1, ver:0, Engine: SystemCo | 'data_write_bytes' | 'system' | 'processes' | 'UInt64' | 'BIGINT UNSIGNED' | '' | '' | 'NO' | '' | | 'database' | 'system' | 'clustering_history' | 'String' | 'VARCHAR' | '' | '' | 'NO' | '' | | 'database' | 'system' | 'columns' | 'String' | 'VARCHAR' | '' | '' | 'NO' | '' | +| 'database' | 'system' | 'dictionaries' | 'String' | 'VARCHAR' | '' | '' | 'NO' | '' | | 'database' | 'system' | 'processes' | 'String' | 'VARCHAR' | '' | '' | 'NO' | '' | | 'database' | 'system' | 'streams' | 'String' | 'VARCHAR' | '' | '' | 'NO' | '' | | 'database' | 'system' | 'streams_terse' | 'String' | 'VARCHAR' | '' | '' | 'NO' | '' | @@ -231,6 +236,8 @@ DB.Table: 'system'.'columns', Table: columns-table_id:1, ver:0, Engine: SystemCo | 'job_type' | 'system' | 'background_jobs' | 'Nullable(String)' | 'VARCHAR' | '' | '' | 'YES' | '' | | 'join_spilled_bytes' | 'system' | 'query_log' | 'UInt64' | 'BIGINT UNSIGNED' | '' | '' | 'NO' | '' | | 'join_spilled_rows' | 'system' | 'query_log' | 'UInt64' | 'BIGINT UNSIGNED' | '' | '' | 'NO' | '' | +| 'key_names' | 'system' | 'dictionaries' | 'Array(String)' | 'ARRAY(STRING)' | '' | '' | 'NO' | '' | +| 'key_types' | 'system' | 'dictionaries' | 'Array(String)' | 'ARRAY(STRING)' | '' | '' | 'NO' | '' | | 'keywords' | 'information_schema' | 'keywords' | 'String' | 'VARCHAR' | '' | '' | 'NO' | '' | | 'kind' | 'system' | 'metrics' | 'String' | 'VARCHAR' | '' | '' | 'NO' | '' | | 'labels' | 'system' | 'metrics' | 'String' | 'VARCHAR' | '' | '' | 'NO' | '' | @@ -266,6 +273,7 @@ DB.Table: 'system'.'columns', Table: columns-table_id:1, ver:0, Engine: SystemCo | 'name' | 'system' | 'contributors' | 'String' | 'VARCHAR' | '' | '' | 'NO' | '' | | 'name' | 'system' | 'credits' | 'String' | 'VARCHAR' | '' | '' | 'NO' | '' | | 'name' | 'system' | 'databases' | 'String' | 'VARCHAR' | '' | '' | 'NO' | '' | +| 'name' | 'system' | 'dictionaries' | 'String' | 'VARCHAR' | '' | '' | 'NO' | '' | | 'name' | 'system' | 'functions' | 'String' | 'VARCHAR' | '' | '' | 'NO' | '' | | 'name' | 'system' | 'indexes' | 'String' | 'VARCHAR' | '' | '' | 'NO' | '' | | 'name' | 'system' | 'malloc_stats_totals' | 'String' | 'VARCHAR' | '' | '' | 'NO' | '' | @@ -386,6 +394,7 @@ DB.Table: 'system'.'columns', Table: columns-table_id:1, ver:0, Engine: SystemCo | 'session_settings' | 'system' | 'query_log' | 'String' | 'VARCHAR' | '' | '' | 'NO' | '' | | 'size' | 'system' | 'caches' | 'UInt64' | 'BIGINT UNSIGNED' | '' | '' | 'NO' | '' | | 'snapshot_location' | 'system' | 'streams' | 'Nullable(String)' | 'VARCHAR' | '' | '' | 'YES' | '' | +| 'source' | 'system' | 'dictionaries' | 'String' | 'VARCHAR' | '' | '' | 'NO' | '' | | 'sql' | 'system' | 'query_cache' | 'String' | 'VARCHAR' | '' | '' | 'NO' | '' | | 'sql_path' | 'information_schema' | 'schemata' | 'NULL' | 'NULL' | '' | '' | 'NO' | '' | | 'sql_user' | 'system' | 'query_log' | 'String' | 'VARCHAR' | '' | '' | 'NO' | '' | @@ -465,6 +474,7 @@ DB.Table: 'system'.'columns', Table: columns-table_id:1, ver:0, Engine: SystemCo | 'update_on' | 'system' | 'roles' | 'Timestamp' | 'TIMESTAMP' | '' | '' | 'NO' | '' | | 'update_on' | 'system' | 'users' | 'Nullable(Timestamp)' | 'TIMESTAMP' | '' | '' | 'YES' | '' | | 'updated_on' | 'system' | 'background_tasks' | 'Timestamp' | 'TIMESTAMP' | '' | '' | 'NO' | '' | +| 'updated_on' | 'system' | 'dictionaries' | 'Timestamp' | 'TIMESTAMP' | '' | '' | 'NO' | '' | | 'updated_on' | 'system' | 'indexes' | 'Nullable(Timestamp)' | 'TIMESTAMP' | '' | '' | 'YES' | '' | | 'updated_on' | 'system' | 'password_policies' | 'Nullable(Timestamp)' | 'TIMESTAMP' | '' | '' | 'YES' | '' | | 'updated_on' | 'system' | 'streams' | 'Timestamp' | 'TIMESTAMP' | '' | '' | 'NO' | '' | diff --git a/src/query/sql/src/planner/binder/binder.rs b/src/query/sql/src/planner/binder/binder.rs index 03b90fd00bb22..b7023e6a4e2e1 100644 --- a/src/query/sql/src/planner/binder/binder.rs +++ b/src/query/sql/src/planner/binder/binder.rs @@ -288,7 +288,7 @@ impl<'a> Binder { Statement::CreateDictionary(stmt) => self.bind_create_dictionary(stmt).await?, Statement::DropDictionary(stmt) => self.bind_drop_dictionary(stmt).await?, Statement::ShowCreateDictionary(stmt) => self.bind_show_create_dictionary(stmt).await?, - Statement::ShowDictionaries { show_options: _ } => todo!(), + Statement::ShowDictionaries(stmt) => self.bind_show_dictionaries(bind_context, stmt).await?, // Views Statement::CreateView(stmt) => self.bind_create_view(stmt).await?, Statement::AlterView(stmt) => self.bind_alter_view(stmt).await?, diff --git a/src/query/sql/src/planner/binder/ddl/dictionary.rs b/src/query/sql/src/planner/binder/ddl/dictionary.rs index 6b54019fd5eea..800b16ef5bff1 100644 --- a/src/query/sql/src/planner/binder/ddl/dictionary.rs +++ b/src/query/sql/src/planner/binder/ddl/dictionary.rs @@ -19,6 +19,8 @@ use std::sync::LazyLock; use databend_common_ast::ast::CreateDictionaryStmt; use databend_common_ast::ast::DropDictionaryStmt; use databend_common_ast::ast::ShowCreateDictionaryStmt; +use databend_common_ast::ast::ShowDictionariesStmt; +use databend_common_ast::ast::ShowLimit; use databend_common_exception::ErrorCode; use databend_common_exception::Result; use databend_common_expression::types::DataType; @@ -28,12 +30,16 @@ use databend_common_expression::TableDataType; use databend_common_expression::TableSchema; use databend_common_meta_app::schema::DictionaryMeta; use itertools::Itertools; +use log::debug; use crate::plans::CreateDictionaryPlan; use crate::plans::DropDictionaryPlan; use crate::plans::Plan; +use crate::plans::RewriteKind; use crate::plans::ShowCreateDictionaryPlan; +use crate::BindContext; use crate::Binder; +use crate::SelectBuilder; pub const DICT_OPT_KEY_SQL_HOST: &str = "host"; pub const DICT_OPT_KEY_SQL_PORT: &str = "port"; @@ -383,4 +389,50 @@ impl Binder { }, ))) } + + #[async_backtrace::framed] + pub(in crate::planner::binder) async fn bind_show_dictionaries( + &mut self, + bind_context: &mut BindContext, + stmt: &ShowDictionariesStmt, + ) -> Result { + let ShowDictionariesStmt { database, limit } = stmt; + + let mut select_builder = SelectBuilder::from("system.dictionaries"); + + select_builder + .with_column("database AS Database") + .with_column("name AS Dictionary") + .with_column("key_names AS Key_Names") + .with_column("key_types AS key_Types") + .with_column("attribute_names AS Attribute_Names") + .with_column("attribute_types AS Attribute_Types") + .with_column("source AS Source") + .with_column("comment AS Comment"); + + select_builder + .with_order_by("database") + .with_order_by("name"); + + let database = self.check_database_exist(&None, database).await?; + select_builder.with_filter(format!("database = '{}'", database.clone())); + + match limit { + None => (), + Some(ShowLimit::Like { pattern }) => { + select_builder.with_filter(format!("name LIKE '{pattern}'")); + } + Some(ShowLimit::Where { selection }) => { + select_builder.with_filter(format!("({selection})")); + } + }; + let query = select_builder.build(); + debug!("show dictionaries rewrite to: {:?}", query); + self.bind_rewrite_to_query( + bind_context, + query.as_str(), + RewriteKind::ShowDictionaries(database.clone()), + ) + .await + } } diff --git a/src/query/sql/src/planner/plans/plan.rs b/src/query/sql/src/planner/plans/plan.rs index 245fa278e83ea..d2df531f5be73 100644 --- a/src/query/sql/src/planner/plans/plan.rs +++ b/src/query/sql/src/planner/plans/plan.rs @@ -390,6 +390,7 @@ pub enum RewriteKind { ShowColumns(String, String, String), ShowTablesStatus, ShowVirtualColumns, + ShowDictionaries(String), ShowStreams(String), diff --git a/src/query/storages/system/src/dictionaries_table.rs b/src/query/storages/system/src/dictionaries_table.rs new file mode 100644 index 0000000000000..9cc0c61fafcdb --- /dev/null +++ b/src/query/storages/system/src/dictionaries_table.rs @@ -0,0 +1,200 @@ +// Copyright 2021 Datafuse Labs +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::sync::Arc; + +use databend_common_catalog::plan::PushDownInfo; +use databend_common_catalog::table::Table; +use databend_common_catalog::table_context::TableContext; +use databend_common_exception::Result; +use databend_common_expression::types::DataType; +use databend_common_expression::types::StringType; +use databend_common_expression::types::TimestampType; +use databend_common_expression::ColumnBuilder; +use databend_common_expression::DataBlock; +use databend_common_expression::FromData; +use databend_common_expression::ScalarRef; +use databend_common_expression::TableDataType; +use databend_common_expression::TableField; +use databend_common_expression::TableSchemaRefExt; +use databend_common_meta_app::schema::ListDictionaryReq; +use databend_common_meta_app::schema::TableIdent; +use databend_common_meta_app::schema::TableInfo; +use databend_common_meta_app::schema::TableMeta; + +use crate::table::AsyncOneBlockSystemTable; +use crate::table::AsyncSystemTable; + +pub struct DictionariesTable { + table_info: TableInfo, +} + +#[async_trait::async_trait] +impl AsyncSystemTable for DictionariesTable { + const NAME: &'static str = "system.dictionaries"; + + fn get_table_info(&self) -> &TableInfo { + &self.table_info + } + + #[async_backtrace::framed] + async fn get_full_data( + &self, + ctx: Arc, + _push_downs: Option, + ) -> Result { + let tenant = ctx.get_tenant(); + + let mut db_names = vec![]; + let mut names = vec![]; + + let mut key_names_builder = + ColumnBuilder::with_capacity(&DataType::Array(Box::new(DataType::String)), 0); + let mut attribute_names_builder = + ColumnBuilder::with_capacity(&DataType::Array(Box::new(DataType::String)), 0); + let mut key_types_builder = + ColumnBuilder::with_capacity(&DataType::Array(Box::new(DataType::String)), 0); + let mut attribute_types_builder = + ColumnBuilder::with_capacity(&DataType::Array(Box::new(DataType::String)), 0); + + let mut sources = vec![]; + let mut comments = vec![]; + let mut created_ons = vec![]; + let mut updated_ons = vec![]; + + let catalog = ctx.get_default_catalog().unwrap(); + let databases = catalog.list_databases(&tenant).await?; + for database in databases { + let db_id = database.get_db_info().database_id.db_id; + let req = ListDictionaryReq { + tenant: tenant.clone(), + db_id, + }; + let dictionaries = catalog.list_dictionaries(req).await?; + for (dict_name, dict_meta) in dictionaries { + db_names.push(database.get_db_name().to_string()); + + names.push(dict_name.clone()); + + let comment = dict_meta.comment; + comments.push(comment); + + let created_on = dict_meta.created_on.timestamp_micros(); + created_ons.push(created_on); + let updated_on = match dict_meta.updated_on { + Some(updated_on) => updated_on.timestamp_micros(), + None => created_on, + }; + updated_ons.push(updated_on); + + let schema = dict_meta.schema; + let fields = &schema.fields; + let primary_column_ids = dict_meta.primary_column_ids; + + let mut key_names = vec![]; + let mut attribute_names = vec![]; + let mut key_types = vec![]; + let mut attribute_types = vec![]; + + for field in fields { + if primary_column_ids.contains(&field.column_id) { + key_names.push(field.name.clone()); + key_types.push(field.data_type.sql_name()); + } else { + attribute_names.push(field.name.clone()); + attribute_types.push(field.data_type.sql_name()); + } + } + let key_names_column = ScalarRef::Array(StringType::from_data(key_names)); + key_names_builder.push(key_names_column); + let attribute_names_column = + ScalarRef::Array(StringType::from_data(attribute_names)); + attribute_names_builder.push(attribute_names_column); + let key_types_column = ScalarRef::Array(StringType::from_data(key_types)); + key_types_builder.push(key_types_column); + let attribute_types_column = + ScalarRef::Array(StringType::from_data(attribute_types)); + attribute_types_builder.push(attribute_types_column); + + let dict_source = dict_meta.source; + let mut options = dict_meta.options; + if let Some(password) = options.get_mut("password") { + *password = "[hidden]".to_string(); + } + let options_str: Vec = options + .iter() + .map(|(k, v)| format!("{}={}", k, v)) + .collect(); + let options_joined = options_str.join(" "); + let source = format!("{}({})", dict_source, options_joined); + sources.push(source); + } + } + return Ok(DataBlock::new_from_columns(vec![ + StringType::from_data(db_names), + StringType::from_data(names), + key_names_builder.build(), + key_types_builder.build(), + attribute_names_builder.build(), + attribute_types_builder.build(), + StringType::from_data(sources), + StringType::from_data(comments), + TimestampType::from_data(created_ons), + TimestampType::from_data(updated_ons), + ])); + } +} + +impl DictionariesTable { + pub fn create(table_id: u64) -> Arc { + let schema = TableSchemaRefExt::create(vec![ + TableField::new("database", TableDataType::String), + TableField::new("name", TableDataType::String), + TableField::new( + "key_names", + TableDataType::Array(Box::new(TableDataType::String)), + ), + TableField::new( + "key_types", + TableDataType::Array(Box::new(TableDataType::String)), + ), + TableField::new( + "attribute_names", + TableDataType::Array(Box::new(TableDataType::String)), + ), + TableField::new( + "attribute_types", + TableDataType::Array(Box::new(TableDataType::String)), + ), + TableField::new("source", TableDataType::String), + TableField::new("comment", TableDataType::String), + TableField::new("created_on", TableDataType::Timestamp), + TableField::new("updated_on", TableDataType::Timestamp), + ]); + + let table_info = TableInfo { + desc: "'system'.'dictionaries'".to_string(), + name: "dictionaries".to_string(), + ident: TableIdent::new(table_id, 0), + meta: TableMeta { + schema, + engine: "SystemDictionaries".to_string(), + ..Default::default() + }, + ..Default::default() + }; + + AsyncOneBlockSystemTable::create(DictionariesTable { table_info }) + } +} diff --git a/src/query/storages/system/src/lib.rs b/src/query/storages/system/src/lib.rs index 8cbaee5ac4db7..85c14c4045c3c 100644 --- a/src/query/storages/system/src/lib.rs +++ b/src/query/storages/system/src/lib.rs @@ -33,6 +33,7 @@ mod configs_table; mod contributors_table; mod credits_table; mod databases_table; +mod dictionaries_table; mod engines_table; mod functions_table; mod indexes_table; @@ -80,6 +81,7 @@ pub use configs_table::ConfigsTable; pub use contributors_table::ContributorsTable; pub use credits_table::CreditsTable; pub use databases_table::DatabasesTable; +pub use dictionaries_table::DictionariesTable; pub use engines_table::EnginesTable; pub use functions_table::FunctionsTable; pub use indexes_table::IndexesTable; diff --git a/tests/sqllogictests/suites/base/06_show/06_0024_show_dictionaries.test b/tests/sqllogictests/suites/base/06_show/06_0024_show_dictionaries.test new file mode 100644 index 0000000000000..914bc37fdba21 --- /dev/null +++ b/tests/sqllogictests/suites/base/06_show/06_0024_show_dictionaries.test @@ -0,0 +1,65 @@ +statement ok +CREATE OR REPLACE DICTIONARY d1(c1 int NOT NULL, c2 Varchar NOT NULL) PRIMARY KEY c1 SOURCE(mysql(host='localhost' port='3306' username='root' password='1234' db='db1' table='test_table')) + +statement ok +CREATE OR REPLACE DICTIONARY d2(a int NOT NULL, b int NOT NULL) PRIMARY KEY a SOURCE(mysql(host='localhost' port='3306' username='root' password='1234' db='db1' table='test_table')) + +query T +show dictionaries +---- +default d1 ['c1'] ['INT'] ['c2'] ['VARCHAR'] mysql(db=db1 host=localhost password=[hidden] port=3306 table=test_table username=root) (empty) +default d2 ['a'] ['INT'] ['b'] ['INT'] mysql(db=db1 host=localhost password=[hidden] port=3306 table=test_table username=root) (empty) + +statement ok +DROP DATABASE IF EXISTS show_dictionary + +statement ok +CREATE DATABASE show_dictionary + +statement ok +use show_dictionary + +statement ok +CREATE OR REPLACE DICTIONARY show_dictionary.d1(c1 VARCHAR NOT NULL, c2 VARCHAR NOT NULL) PRIMARY KEY c1 SOURCE(mysql(host='localhost' port='3306' username='root' password='1234' db='db1' table='test_table')) + +statement ok +CREATE OR REPLACE DICTIONARY show_dictionary.d2(a int NOT NULL, b int NOT NULL) PRIMARY KEY a SOURCE(mysql(host='localhost' port='3306' username='root' password='1234' db='db1' table='test_table')) + +statement ok +CREATE OR REPLACE DICTIONARY show_dictionary.d3(`a` int NOT NULL, b int NOT NULL) PRIMARY KEY a SOURCE(mysql(host='localhost' port='3306' username='root' password='1234' db='db1' table='test_table')) + +query T +show dictionaries from show_dictionary +---- +show_dictionary d1 ['c1'] ['VARCHAR'] ['c2'] ['VARCHAR'] mysql(db=db1 host=localhost password=[hidden] port=3306 table=test_table username=root) (empty) +show_dictionary d2 ['a'] ['INT'] ['b'] ['INT'] mysql(db=db1 host=localhost password=[hidden] port=3306 table=test_table username=root) (empty) +show_dictionary d3 ['a'] ['INT'] ['b'] ['INT'] mysql(db=db1 host=localhost password=[hidden] port=3306 table=test_table username=root) (empty) + +query T +show dictionaries from show_dictionary like 'd%' +---- +show_dictionary d1 ['c1'] ['VARCHAR'] ['c2'] ['VARCHAR'] mysql(db=db1 host=localhost password=[hidden] port=3306 table=test_table username=root) (empty) +show_dictionary d2 ['a'] ['INT'] ['b'] ['INT'] mysql(db=db1 host=localhost password=[hidden] port=3306 table=test_table username=root) (empty) +show_dictionary d3 ['a'] ['INT'] ['b'] ['INT'] mysql(db=db1 host=localhost password=[hidden] port=3306 table=test_table username=root) (empty) + +query T +show dictionaries from show_dictionary WHERE name = 'd2' OR 1 = 1 +---- +show_dictionary d1 ['c1'] ['VARCHAR'] ['c2'] ['VARCHAR'] mysql(db=db1 host=localhost password=[hidden] port=3306 table=test_table username=root) (empty) +show_dictionary d2 ['a'] ['INT'] ['b'] ['INT'] mysql(db=db1 host=localhost password=[hidden] port=3306 table=test_table username=root) (empty) +show_dictionary d3 ['a'] ['INT'] ['b'] ['INT'] mysql(db=db1 host=localhost password=[hidden] port=3306 table=test_table username=root) (empty) + +query T +show dictionaries from show_dictionary WHERE name = 'd2' AND 1 = 1 +---- +show_dictionary d2 ['a'] ['INT'] ['b'] ['INT'] mysql(db=db1 host=localhost password=[hidden] port=3306 table=test_table username=root) (empty) + +statement ok +show dictionaries WHERE name='d2' AND 1=0 + +query T +show dictionaries +---- +show_dictionary d1 ['c1'] ['VARCHAR'] ['c2'] ['VARCHAR'] mysql(db=db1 host=localhost password=[hidden] port=3306 table=test_table username=root) (empty) +show_dictionary d2 ['a'] ['INT'] ['b'] ['INT'] mysql(db=db1 host=localhost password=[hidden] port=3306 table=test_table username=root) (empty) +show_dictionary d3 ['a'] ['INT'] ['b'] ['INT'] mysql(db=db1 host=localhost password=[hidden] port=3306 table=test_table username=root) (empty)