From 2e2e5f6afcc959ec5330c67838584598dc5f199b Mon Sep 17 00:00:00 2001 From: andylokandy Date: Tue, 15 Oct 2024 19:12:30 +0800 Subject: [PATCH] fix --- .../expression/src/converts/arrow2/from.rs | 3 +- .../expression/src/filter/filter_executor.rs | 2 +- .../select_value/select_column_scalar.rs | 8 +-- src/query/expression/src/kernels/concat.rs | 1 - .../group_by_hash/method_single_string.rs | 1 - .../src/kernels/group_by_hash/utils.rs | 2 - src/query/expression/src/kernels/scatter.rs | 17 +----- src/query/expression/src/kernels/sort.rs | 2 +- src/query/expression/src/kernels/take.rs | 18 ++---- .../expression/src/kernels/take_chunks.rs | 55 ++++++------------- .../expression/src/kernels/take_compact.rs | 1 - .../expression/src/kernels/take_ranges.rs | 1 - src/query/expression/src/types/binary.rs | 6 -- src/query/expression/src/types/geography.rs | 2 - src/query/expression/src/types/string.rs | 4 -- src/query/expression/src/utils/arrow.rs | 7 +-- src/query/expression/tests/it/common.rs | 2 +- src/query/expression/tests/it/kernel.rs | 18 ++---- src/query/functions/src/scalars/binary.rs | 1 - .../src/pipelines/builders/builder_join.rs | 5 -- .../hash_join/hash_join_probe_state.rs | 4 -- .../merge_into_hash_join_optimization.rs | 5 +- .../hash_join/probe_join/inner_join.rs | 7 +-- .../hash_join/probe_join/left_anti_join.rs | 14 +---- .../hash_join/probe_join/left_join.rs | 13 +---- .../hash_join/probe_join/left_mark_join.rs | 7 +-- .../hash_join/probe_join/left_semi_join.rs | 9 +-- .../hash_join/probe_join/right_join.rs | 7 +-- .../hash_join/probe_join/right_mark_join.rs | 7 +-- .../probe_join/right_semi_anti_join.rs | 7 +-- .../transforms/hash_join/probe_state.rs | 12 +--- .../processors/transforms/hash_join/row.rs | 2 - .../hash_join/transform_hash_join_probe.rs | 2 - .../fuse/src/statistics/cluster_statistics.rs | 2 +- 34 files changed, 52 insertions(+), 202 deletions(-) diff --git a/src/query/expression/src/converts/arrow2/from.rs b/src/query/expression/src/converts/arrow2/from.rs index d58fe5a0b202..6989b95ed324 100644 --- a/src/query/expression/src/converts/arrow2/from.rs +++ b/src/query/expression/src/converts/arrow2/from.rs @@ -31,7 +31,6 @@ use super::ARROW_EXT_TYPE_EMPTY_MAP; use super::ARROW_EXT_TYPE_GEOMETRY; use super::ARROW_EXT_TYPE_VARIANT; use crate::types::array::ArrayColumn; -use crate::types::binary; use crate::types::binary::BinaryColumn; use crate::types::binary::BinaryColumnBuilder; use crate::types::decimal::DecimalColumn; @@ -365,7 +364,7 @@ impl Column { ); Column::Binary(binary_array_to_binary_column(arrow_col)) } - (DataType::Binary, ArrowDataType::FixedSizeBinary(size)) => { + (DataType::Binary, ArrowDataType::FixedSizeBinary(_)) => { let arrow_col = arrow_col .as_any() .downcast_ref::() diff --git a/src/query/expression/src/filter/filter_executor.rs b/src/query/expression/src/filter/filter_executor.rs index 007f6d319e9c..495ff1f80cc1 100644 --- a/src/query/expression/src/filter/filter_executor.rs +++ b/src/query/expression/src/filter/filter_executor.rs @@ -125,7 +125,7 @@ impl FilterExecutor { if self.keep_order && self.has_or { self.true_selection[0..result_count].sort(); } - data_block.take(&self.true_selection[0..result_count], &mut None) + data_block.take(&self.true_selection[0..result_count]) } } diff --git a/src/query/expression/src/filter/select_value/select_column_scalar.rs b/src/query/expression/src/filter/select_value/select_column_scalar.rs index d85e44427ca3..8c2bad57e108 100644 --- a/src/query/expression/src/filter/select_value/select_column_scalar.rs +++ b/src/query/expression/src/filter/select_value/select_column_scalar.rs @@ -242,9 +242,9 @@ impl<'a> Selector<'a> { for idx in 0u32..count as u32 { let ret = if NOT { validity.get_bit_unchecked(idx as usize) - && !searcher + && searcher .search(column.index_unchecked_bytes(idx as usize)) - .is_some() + .is_none() } else { validity.get_bit_unchecked(idx as usize) && searcher @@ -273,9 +273,9 @@ impl<'a> Selector<'a> { if let LikePattern::SurroundByPercent(searcher) = like_pattern { for idx in 0u32..count as u32 { let ret = if NOT { - !searcher + searcher .search(column.index_unchecked_bytes(idx as usize)) - .is_some() + .is_none() } else { searcher .search(column.index_unchecked_bytes(idx as usize)) diff --git a/src/query/expression/src/kernels/concat.rs b/src/query/expression/src/kernels/concat.rs index 39035c5a02cf..11fd9f26abca 100644 --- a/src/query/expression/src/kernels/concat.rs +++ b/src/query/expression/src/kernels/concat.rs @@ -23,7 +23,6 @@ use itertools::Itertools; use crate::copy_continuous_bits; use crate::kernels::take::BIT_MASK; -use crate::kernels::utils::copy_advance_aligned; use crate::kernels::utils::set_vec_len_by_ptr; use crate::store_advance_aligned; use crate::types::array::ArrayColumnBuilder; diff --git a/src/query/expression/src/kernels/group_by_hash/method_single_string.rs b/src/query/expression/src/kernels/group_by_hash/method_single_string.rs index 22a59a09a110..ac3f5192f18c 100644 --- a/src/query/expression/src/kernels/group_by_hash/method_single_string.rs +++ b/src/query/expression/src/kernels/group_by_hash/method_single_string.rs @@ -13,7 +13,6 @@ // limitations under the License. use databend_common_arrow::arrow::array::BinaryViewArray; -use databend_common_arrow::arrow::buffer::Buffer; use databend_common_exception::Result; use databend_common_hashtable::hash_join_fast_string_hash; diff --git a/src/query/expression/src/kernels/group_by_hash/utils.rs b/src/query/expression/src/kernels/group_by_hash/utils.rs index 85bcb54d6ae4..9de74a45a73d 100644 --- a/src/query/expression/src/kernels/group_by_hash/utils.rs +++ b/src/query/expression/src/kernels/group_by_hash/utils.rs @@ -15,9 +15,7 @@ use ethnum::i256; use crate::kernels::utils::copy_advance_aligned; -use crate::kernels::utils::set_vec_len_by_ptr; use crate::kernels::utils::store_advance; -use crate::kernels::utils::store_advance_aligned; use crate::types::binary::BinaryColumn; use crate::types::binary::BinaryColumnBuilder; use crate::types::decimal::DecimalColumn; diff --git a/src/query/expression/src/kernels/scatter.rs b/src/query/expression/src/kernels/scatter.rs index 11d4ffbea53f..2edb5353eb00 100644 --- a/src/query/expression/src/kernels/scatter.rs +++ b/src/query/expression/src/kernels/scatter.rs @@ -33,24 +33,9 @@ impl DataBlock { let scatter_indices = Self::divide_indices_by_scatter_size(indices, scatter_size); - let has_string_column = self - .columns() - .iter() - .any(|col| col.data_type.is_string_column()); - let mut string_items_buf = if has_string_column { - let max_num_rows = scatter_indices - .iter() - .map(|indices| indices.len()) - .max() - .unwrap(); - Some(vec![(0, 0); max_num_rows]) - } else { - None - }; - let mut results = Vec::with_capacity(scatter_size); for indices in scatter_indices.iter().take(scatter_size) { - let block = self.take(indices, &mut string_items_buf)?; + let block = self.take(indices)?; results.push(block); } diff --git a/src/query/expression/src/kernels/sort.rs b/src/query/expression/src/kernels/sort.rs index 9c4c478834f4..ef6c51fd6fdd 100644 --- a/src/query/expression/src/kernels/sort.rs +++ b/src/query/expression/src/kernels/sort.rs @@ -115,7 +115,7 @@ impl DataBlock { } let permutations = sort_compare.take_permutation(); - DataBlock::take(block, &permutations, &mut None) + DataBlock::take(block, &permutations) } } diff --git a/src/query/expression/src/kernels/take.rs b/src/query/expression/src/kernels/take.rs index c68870ed5e7f..2021d8edb80a 100644 --- a/src/query/expression/src/kernels/take.rs +++ b/src/query/expression/src/kernels/take.rs @@ -19,8 +19,6 @@ use databend_common_arrow::arrow::bitmap::Bitmap; use databend_common_arrow::arrow::buffer::Buffer; use databend_common_exception::Result; -use crate::kernels::utils::copy_advance_aligned; -use crate::kernels::utils::set_vec_len_by_ptr; use crate::types::binary::BinaryColumn; use crate::types::nullable::NullableColumn; use crate::types::string::StringColumn; @@ -34,19 +32,13 @@ use crate::Value; pub const BIT_MASK: [u8; 8] = [1, 2, 4, 8, 16, 32, 64, 128]; impl DataBlock { - pub fn take( - &self, - indices: &[I], - string_items_buf: &mut Option>, - ) -> Result - where - I: databend_common_arrow::arrow::types::Index, - { + pub fn take(&self, indices: &[I]) -> Result + where I: databend_common_arrow::arrow::types::Index { if indices.is_empty() { return Ok(self.slice(0..0)); } - let mut taker = TakeVisitor::new(indices, string_items_buf); + let mut taker = TakeVisitor::new(indices); let after_columns = self .columns() @@ -73,17 +65,15 @@ struct TakeVisitor<'a, I> where I: databend_common_arrow::arrow::types::Index { indices: &'a [I], - string_items_buf: &'a mut Option>, result: Option>, } impl<'a, I> TakeVisitor<'a, I> where I: databend_common_arrow::arrow::types::Index { - fn new(indices: &'a [I], string_items_buf: &'a mut Option>) -> Self { + fn new(indices: &'a [I]) -> Self { Self { indices, - string_items_buf, result: None, } } diff --git a/src/query/expression/src/kernels/take_chunks.rs b/src/query/expression/src/kernels/take_chunks.rs index bc4fb363ff45..343b577add02 100644 --- a/src/query/expression/src/kernels/take_chunks.rs +++ b/src/query/expression/src/kernels/take_chunks.rs @@ -22,8 +22,6 @@ use databend_common_hashtable::RowPtr; use itertools::Itertools; use crate::kernels::take::BIT_MASK; -use crate::kernels::utils::copy_advance_aligned; -use crate::kernels::utils::set_vec_len_by_ptr; use crate::types::array::ArrayColumnBuilder; use crate::types::binary::BinaryColumn; use crate::types::bitmap::BitmapType; @@ -112,7 +110,6 @@ impl DataBlock { build_columns_data_type: &[DataType], indices: &[RowPtr], result_size: usize, - binary_items_buf: &mut Option>, ) -> Self { let num_columns = build_columns.len(); let result_columns = (0..num_columns) @@ -123,7 +120,6 @@ impl DataBlock { data_type.clone(), indices, result_size, - binary_items_buf, ); BlockEntry::new(data_type.clone(), Value::Column(column)) }) @@ -632,7 +628,6 @@ impl Column { data_type: DataType, indices: &[RowPtr], result_size: usize, - binary_items_buf: &mut Option>, ) -> Column { match &columns { ColumnVec::Null { .. } => Column::Null { len: result_size }, @@ -656,12 +651,12 @@ impl Column { ColumnVec::Boolean(columns) => { Column::Boolean(Self::take_block_vec_boolean_types(columns, indices)) } - ColumnVec::Binary(columns) => BinaryType::upcast_column( - Self::take_block_vec_binary_types(columns, indices, binary_items_buf.as_mut()), - ), - ColumnVec::String(columns) => StringType::upcast_column( - Self::take_block_vec_string_types(columns, indices, binary_items_buf.as_mut()), - ), + ColumnVec::Binary(columns) => { + BinaryType::upcast_column(Self::take_block_vec_binary_types(columns, indices)) + } + ColumnVec::String(columns) => { + StringType::upcast_column(Self::take_block_vec_string_types(columns, indices)) + } ColumnVec::Timestamp(columns) => { let builder = Self::take_block_vec_primitive_types(columns, indices); let ts = >::upcast_column(>::column_from_vec( @@ -714,9 +709,9 @@ impl Column { columns, builder, indices, ) } - ColumnVec::Bitmap(columns) => BitmapType::upcast_column( - Self::take_block_vec_binary_types(columns, indices, binary_items_buf.as_mut()), - ), + ColumnVec::Bitmap(columns) => { + BitmapType::upcast_column(Self::take_block_vec_binary_types(columns, indices)) + } ColumnVec::Nullable(columns) => { let inner_data_type = data_type.as_nullable().unwrap(); let inner_column = Self::take_column_vec_indices( @@ -724,7 +719,6 @@ impl Column { *inner_data_type.clone(), indices, result_size, - binary_items_buf, ); let inner_bitmap = Self::take_column_vec_indices( @@ -732,7 +726,6 @@ impl Column { DataType::Boolean, indices, result_size, - binary_items_buf, ); NullableColumn::new_column( @@ -751,25 +744,22 @@ impl Column { ty.clone(), indices, result_size, - binary_items_buf, ) }) .collect(); Column::Tuple(fields) } - ColumnVec::Variant(columns) => VariantType::upcast_column( - Self::take_block_vec_binary_types(columns, indices, binary_items_buf.as_mut()), - ), - ColumnVec::Geometry(columns) => GeometryType::upcast_column( - Self::take_block_vec_binary_types(columns, indices, binary_items_buf.as_mut()), - ), + ColumnVec::Variant(columns) => { + VariantType::upcast_column(Self::take_block_vec_binary_types(columns, indices)) + } + ColumnVec::Geometry(columns) => { + GeometryType::upcast_column(Self::take_block_vec_binary_types(columns, indices)) + } ColumnVec::Geography(columns) => { let columns = columns.iter().map(|x| x.0.clone()).collect::>(); GeographyType::upcast_column(GeographyColumn(Self::take_block_vec_binary_types( - &columns, - indices, - binary_items_buf.as_mut(), + &columns, indices, ))) } } @@ -785,11 +775,7 @@ impl Column { builder } - pub fn take_block_vec_binary_types( - col: &[BinaryColumn], - indices: &[RowPtr], - binary_items_buf: Option<&mut Vec<(u64, usize)>>, - ) -> BinaryColumn { + pub fn take_block_vec_binary_types(col: &[BinaryColumn], indices: &[RowPtr]) -> BinaryColumn { let mut builder = BinaryColumnBuilder::with_capacity(indices.len(), 0); for row_ptr in indices { unsafe { @@ -802,11 +788,7 @@ impl Column { builder.build() } - pub fn take_block_vec_string_types( - cols: &[StringColumn], - indices: &[RowPtr], - binary_items_buf: Option<&mut Vec<(u64, usize)>>, - ) -> StringColumn { + pub fn take_block_vec_string_types(cols: &[StringColumn], indices: &[RowPtr]) -> StringColumn { let binary_cols = cols .iter() .map(|col| col.clone().into()) @@ -815,7 +797,6 @@ impl Column { StringColumn::from_binary_unchecked(Self::take_block_vec_binary_types( &binary_cols, indices, - binary_items_buf, )) } } diff --git a/src/query/expression/src/kernels/take_compact.rs b/src/query/expression/src/kernels/take_compact.rs index 0f33479f3100..1ed81cd76a28 100644 --- a/src/query/expression/src/kernels/take_compact.rs +++ b/src/query/expression/src/kernels/take_compact.rs @@ -18,7 +18,6 @@ use databend_common_exception::Result; use crate::kernels::utils::copy_advance_aligned; use crate::kernels::utils::set_vec_len_by_ptr; -use crate::kernels::utils::store_advance_aligned; use crate::types::binary::BinaryColumn; use crate::types::nullable::NullableColumn; use crate::types::string::StringColumn; diff --git a/src/query/expression/src/kernels/take_ranges.rs b/src/query/expression/src/kernels/take_ranges.rs index 4a89a84a2a4f..7a5c769fb954 100644 --- a/src/query/expression/src/kernels/take_ranges.rs +++ b/src/query/expression/src/kernels/take_ranges.rs @@ -22,7 +22,6 @@ use databend_common_exception::Result; use crate::copy_continuous_bits; use crate::kernels::take::BIT_MASK; -use crate::kernels::utils::copy_advance_aligned; use crate::kernels::utils::set_vec_len_by_ptr; use crate::kernels::utils::store_advance_aligned; use crate::types::binary::BinaryColumn; diff --git a/src/query/expression/src/types/binary.rs b/src/query/expression/src/types/binary.rs index 8956998ff42d..3676ed76b8c9 100644 --- a/src/query/expression/src/types/binary.rs +++ b/src/query/expression/src/types/binary.rs @@ -14,16 +14,11 @@ use std::cmp::Ordering; use std::iter::once; -use std::marker::PhantomData; use std::ops::Range; -use databend_common_arrow::arrow::array::Array; use databend_common_arrow::arrow::array::BinaryViewArray; use databend_common_arrow::arrow::array::MutableBinaryViewArray; -use databend_common_arrow::arrow::buffer::Buffer; use databend_common_arrow::arrow::trusted_len::TrustedLen; -use databend_common_exception::ErrorCode; -use databend_common_exception::Result; use serde::Deserialize; use serde::Serialize; @@ -33,7 +28,6 @@ use crate::types::DataType; use crate::types::DecimalSize; use crate::types::GenericMap; use crate::types::ValueType; -use crate::utils::arrow::buffer_into_mut; use crate::values::Column; use crate::values::Scalar; use crate::ColumnBuilder; diff --git a/src/query/expression/src/types/geography.rs b/src/query/expression/src/types/geography.rs index 403afa408a06..921dd7de0c06 100644 --- a/src/query/expression/src/types/geography.rs +++ b/src/query/expression/src/types/geography.rs @@ -15,13 +15,11 @@ use std::cmp::Ordering; use std::fmt::Debug; use std::hash::Hash; -use std::marker::PhantomData; use std::ops::Range; use borsh::BorshDeserialize; use borsh::BorshSerialize; use databend_common_arrow::arrow::trusted_len::TrustedLen; -use databend_common_arrow::parquet::encoding::plain_byte_array::BinaryIter; use databend_common_exception::Result; use databend_common_io::geography::*; use databend_common_io::wkb::make_point; diff --git a/src/query/expression/src/types/string.rs b/src/query/expression/src/types/string.rs index bff4c5e6ecaa..cf26dff46ada 100644 --- a/src/query/expression/src/types/string.rs +++ b/src/query/expression/src/types/string.rs @@ -13,12 +13,9 @@ // limitations under the License. use std::cmp::Ordering; -use std::iter::once; use std::ops::Range; -use databend_common_arrow::arrow::array::Array; use databend_common_arrow::arrow::array::BinaryViewArray; -use databend_common_arrow::arrow::buffer::Buffer; use databend_common_arrow::arrow::trusted_len::TrustedLen; use databend_common_exception::ErrorCode; use databend_common_exception::Result; @@ -34,7 +31,6 @@ use crate::types::DataType; use crate::types::DecimalSize; use crate::types::GenericMap; use crate::types::ValueType; -use crate::utils::arrow::buffer_into_mut; use crate::values::Column; use crate::values::Scalar; use crate::ColumnBuilder; diff --git a/src/query/expression/src/utils/arrow.rs b/src/query/expression/src/utils/arrow.rs index 36e6ce396119..7d3f20dc0127 100644 --- a/src/query/expression/src/utils/arrow.rs +++ b/src/query/expression/src/utils/arrow.rs @@ -19,8 +19,6 @@ use std::io::Write; use databend_common_arrow::arrow; use databend_common_arrow::arrow::array::Array; -use databend_common_arrow::arrow::array::BinaryArray; -use databend_common_arrow::arrow::array::BinaryViewArray; use databend_common_arrow::arrow::bitmap::Bitmap; use databend_common_arrow::arrow::bitmap::MutableBitmap; use databend_common_arrow::arrow::buffer::Buffer; @@ -29,13 +27,10 @@ use databend_common_arrow::arrow::io::ipc::read::read_file_metadata; use databend_common_arrow::arrow::io::ipc::read::FileReader; use databend_common_arrow::arrow::io::ipc::write::Compression; use databend_common_arrow::arrow::io::ipc::write::FileWriter; -use databend_common_arrow::arrow::io::ipc::write::WriteOptions as IpcWriteOptions; -use databend_common_arrow::arrow::types::Offset; +use databend_common_arrow::arrow::io::ipc::write::WriteOptions; use databend_common_exception::ErrorCode; use databend_common_exception::Result; -use crate::types::binary::BinaryColumnBuilder; -use crate::types::BinaryColumn; use crate::BlockEntry; use crate::Column; use crate::ColumnBuilder; diff --git a/src/query/expression/tests/it/common.rs b/src/query/expression/tests/it/common.rs index c2dd01b0f952..38a3bdaf16ad 100644 --- a/src/query/expression/tests/it/common.rs +++ b/src/query/expression/tests/it/common.rs @@ -127,7 +127,7 @@ pub fn run_scatter(file: &mut impl Write, block: &DataBlock, indices: &[u32], sc } pub fn run_take(file: &mut impl Write, indices: &[u32], block: &DataBlock) { - let result = DataBlock::take(block, indices, &mut None); + let result = DataBlock::take(block, indices); match result { Ok(result_block) => { diff --git a/src/query/expression/tests/it/kernel.rs b/src/query/expression/tests/it/kernel.rs index 1a9e078d8d91..a37de74e342d 100644 --- a/src/query/expression/tests/it/kernel.rs +++ b/src/query/expression/tests/it/kernel.rs @@ -310,15 +310,9 @@ pub fn test_take_and_filter_and_concat() -> databend_common_exception::Result<() .collect_vec(); let concated_blocks = DataBlock::concat(&blocks)?; - let block_1 = concated_blocks.take(&take_indices, &mut None)?; + let block_1 = concated_blocks.take(&take_indices)?; let block_2 = concated_blocks.take_compacted_indices(&take_compact_indices, count)?; - let block_3 = DataBlock::take_column_vec( - &column_vec, - &data_types, - &take_chunks_indices, - count, - &mut None, - ); + let block_3 = DataBlock::take_column_vec(&column_vec, &data_types, &take_chunks_indices, count); let block_4 = DataBlock::concat(&filtered_blocks)?; let block_5 = concated_blocks.take_ranges( &build_range_selection(&take_indices, take_indices.len()), @@ -417,7 +411,7 @@ pub fn test_take_compact() -> databend_common_exception::Result<()> { take_indices.extend(std::iter::repeat(batch_index as u32).take(batch_size)); take_compact_indices.push((batch_index as u32, batch_size as u32)); } - let block_1 = block.take(&take_indices, &mut None)?; + let block_1 = block.take(&take_indices)?; let block_2 = block.take_compacted_indices(&take_compact_indices, count)?; assert_eq!(block_1.num_columns(), block_2.num_columns()); @@ -485,8 +479,8 @@ pub fn test_filters() -> databend_common_exception::Result<()> { .map(|(i, _)| i as u32) .collect::>(); - let t_b = bb.take(&indices, &mut None)?; - let t_c = cc.take(&indices, &mut None)?; + let t_b = bb.take(&indices)?; + let t_c = cc.take(&indices)?; let f_b = bb.filter_with_bitmap(&f)?; let f_c = cc.filter_with_bitmap(&f)?; @@ -574,7 +568,7 @@ pub fn test_scatter() -> databend_common_exception::Result<()> { } } - let block_1 = random_block.take(&take_indices, &mut None)?; + let block_1 = random_block.take(&take_indices)?; let block_2 = DataBlock::concat(&scattered_blocks)?; assert_eq!(block_1.num_columns(), block_2.num_columns()); diff --git a/src/query/functions/src/scalars/binary.rs b/src/query/functions/src/scalars/binary.rs index 8f79ad010da1..fad04ace15ca 100644 --- a/src/query/functions/src/scalars/binary.rs +++ b/src/query/functions/src/scalars/binary.rs @@ -30,7 +30,6 @@ use databend_common_expression::types::NumberDataType; use databend_common_expression::types::NumberType; use databend_common_expression::types::StringType; use databend_common_expression::types::UInt8Type; -use databend_common_expression::types::ValueType; use databend_common_expression::vectorize_1_arg; use databend_common_expression::Column; use databend_common_expression::EvalContext; diff --git a/src/query/service/src/pipelines/builders/builder_join.rs b/src/query/service/src/pipelines/builders/builder_join.rs index b8833274a1e9..fd5ef8c2f906 100644 --- a/src/query/service/src/pipelines/builders/builder_join.rs +++ b/src/query/service/src/pipelines/builders/builder_join.rs @@ -197,10 +197,6 @@ impl PipelineBuilder { self.main_pipeline.output_len(), barrier, )?); - let mut has_string_column = false; - for field in join.output_schema()?.fields() { - has_string_column |= field.data_type().is_string_column(); - } self.main_pipeline.add_transform(|input, output| { Ok(ProcessorPtr::create(TransformHashJoinProbe::create( @@ -212,7 +208,6 @@ impl PipelineBuilder { self.func_ctx.clone(), &join.join_type, !join.non_equi_conditions.is_empty(), - has_string_column, )?)) })?; diff --git a/src/query/service/src/pipelines/processors/transforms/hash_join/hash_join_probe_state.rs b/src/query/service/src/pipelines/processors/transforms/hash_join/hash_join_probe_state.rs index 0c38cad7d69a..5e27be5eb94a 100644 --- a/src/query/service/src/pipelines/processors/transforms/hash_join/hash_join_probe_state.rs +++ b/src/query/service/src/pipelines/processors/transforms/hash_join/hash_join_probe_state.rs @@ -508,7 +508,6 @@ impl HashJoinProbeState { &generation_state.build_columns, &generation_state.build_columns_data_type, &generation_state.build_num_rows, - &mut probe_state.generation_state.string_items_buf, )?; if self.hash_join_state.hash_join_desc.join_type == JoinType::Full { @@ -597,7 +596,6 @@ impl HashJoinProbeState { &generation_state.build_columns, &generation_state.build_columns_data_type, &generation_state.build_num_rows, - &mut probe_state.generation_state.string_items_buf, )?); build_indexes_idx = 0; } @@ -659,7 +657,6 @@ impl HashJoinProbeState { &generation_state.build_columns, &generation_state.build_columns_data_type, &generation_state.build_num_rows, - &mut probe_state.generation_state.string_items_buf, )?); build_indexes_idx = 0; } @@ -747,7 +744,6 @@ impl HashJoinProbeState { &generation_state.build_columns, &generation_state.build_columns_data_type, &generation_state.build_num_rows, - &mut probe_state.generation_state.string_items_buf, )?; result_blocks.push(self.merge_eq_block( Some(build_block), diff --git a/src/query/service/src/pipelines/processors/transforms/hash_join/merge_into_hash_join_optimization.rs b/src/query/service/src/pipelines/processors/transforms/hash_join/merge_into_hash_join_optimization.rs index 26af688afad6..3d5cdbaf0719 100644 --- a/src/query/service/src/pipelines/processors/transforms/hash_join/merge_into_hash_join_optimization.rs +++ b/src/query/service/src/pipelines/processors/transforms/hash_join/merge_into_hash_join_optimization.rs @@ -344,10 +344,7 @@ impl TransformHashJoinProbe { { let end = (interval.1 - chunk_start).min(start + self.max_block_size as u32 - 1); let range = (start..=end).collect::>(); - let data_block = chunk_block.take( - &range, - &mut self.probe_state.generation_state.string_items_buf, - )?; + let data_block = chunk_block.take(&range)?; assert!(!data_block.is_empty()); let (segment_idx, block_idx) = split_prefix(prefix); info!( diff --git a/src/query/service/src/pipelines/processors/transforms/hash_join/probe_join/inner_join.rs b/src/query/service/src/pipelines/processors/transforms/hash_join/probe_join/inner_join.rs index 32294afc57b7..51bcbbbb4cc6 100644 --- a/src/query/service/src/pipelines/processors/transforms/hash_join/probe_join/inner_join.rs +++ b/src/query/service/src/pipelines/processors/transforms/hash_join/probe_join/inner_join.rs @@ -210,11 +210,7 @@ impl HashJoinProbeState { } let probe_block = if probe_state.is_probe_projected { - Some(DataBlock::take( - input, - &probe_indexes[0..matched_idx], - &mut probe_state.string_items_buf, - )?) + Some(DataBlock::take(input, &probe_indexes[0..matched_idx])?) } else { None }; @@ -224,7 +220,6 @@ impl HashJoinProbeState { &build_state.build_columns, &build_state.build_columns_data_type, &build_state.build_num_rows, - &mut probe_state.string_items_buf, )?) } else { None diff --git a/src/query/service/src/pipelines/processors/transforms/hash_join/probe_join/left_anti_join.rs b/src/query/service/src/pipelines/processors/transforms/hash_join/probe_join/left_anti_join.rs index 813e5b168c1c..6e42f3c92fb2 100644 --- a/src/query/service/src/pipelines/processors/transforms/hash_join/probe_join/left_anti_join.rs +++ b/src/query/service/src/pipelines/processors/transforms/hash_join/probe_join/left_anti_join.rs @@ -79,11 +79,7 @@ impl HashJoinProbeState { )); } - let result_block = DataBlock::take( - &process_state.input, - &probe_indexes[0..count], - &mut probe_state.generation_state.string_items_buf, - )?; + let result_block = DataBlock::take(&process_state.input, &probe_indexes[0..count])?; probe_state.process_state = None; @@ -237,7 +233,6 @@ impl HashJoinProbeState { result_blocks.push(DataBlock::take( &process_state.input, &probe_indexes[0..unmatched_idx], - &mut probe_state.generation_state.string_items_buf, )?); } @@ -266,11 +261,7 @@ impl HashJoinProbeState { } let probe_block = if probe_state.is_probe_projected { - Some(DataBlock::take( - input, - &probe_indexes[0..matched_idx], - &mut probe_state.string_items_buf, - )?) + Some(DataBlock::take(input, &probe_indexes[0..matched_idx])?) } else { None }; @@ -280,7 +271,6 @@ impl HashJoinProbeState { &build_state.build_columns, &build_state.build_columns_data_type, &build_state.build_num_rows, - &mut probe_state.string_items_buf, )?) } else { None diff --git a/src/query/service/src/pipelines/processors/transforms/hash_join/probe_join/left_join.rs b/src/query/service/src/pipelines/processors/transforms/hash_join/probe_join/left_join.rs index f8dc4a100396..6afb2d80ba34 100644 --- a/src/query/service/src/pipelines/processors/transforms/hash_join/probe_join/left_join.rs +++ b/src/query/service/src/pipelines/processors/transforms/hash_join/probe_join/left_join.rs @@ -372,11 +372,7 @@ impl HashJoinProbeState { } let probe_block = if probe_state.is_probe_projected { - let mut probe_block = DataBlock::take( - input, - &probe_indexes[0..unmatched_idx], - &mut probe_state.string_items_buf, - )?; + let mut probe_block = DataBlock::take(input, &probe_indexes[0..unmatched_idx])?; // For full join, wrap nullable for probe block if self.hash_join_state.hash_join_desc.join_type == JoinType::Full { let nullable_probe_columns = probe_block @@ -435,11 +431,7 @@ impl HashJoinProbeState { } let probe_block = if probe_state.is_probe_projected { - let mut probe_block = DataBlock::take( - input, - &probe_indexes[0..matched_idx], - &mut probe_state.string_items_buf, - )?; + let mut probe_block = DataBlock::take(input, &probe_indexes[0..matched_idx])?; // For full join, wrap nullable for probe block if self.hash_join_state.hash_join_desc.join_type == JoinType::Full { let nullable_probe_columns = probe_block @@ -459,7 +451,6 @@ impl HashJoinProbeState { &build_state.build_columns, &build_state.build_columns_data_type, &build_state.build_num_rows, - &mut probe_state.string_items_buf, )?; // For left or full join, wrap nullable for build block. let nullable_columns = if build_state.build_num_rows == 0 { diff --git a/src/query/service/src/pipelines/processors/transforms/hash_join/probe_join/left_mark_join.rs b/src/query/service/src/pipelines/processors/transforms/hash_join/probe_join/left_mark_join.rs index 23f47e26512f..0292ec9eb554 100644 --- a/src/query/service/src/pipelines/processors/transforms/hash_join/probe_join/left_mark_join.rs +++ b/src/query/service/src/pipelines/processors/transforms/hash_join/probe_join/left_mark_join.rs @@ -341,11 +341,7 @@ impl HashJoinProbeState { } let probe_block = if probe_state.is_probe_projected { - Some(DataBlock::take( - input, - &probe_indexes[0..matched_idx], - &mut probe_state.string_items_buf, - )?) + Some(DataBlock::take(input, &probe_indexes[0..matched_idx])?) } else { None }; @@ -355,7 +351,6 @@ impl HashJoinProbeState { &build_state.build_columns, &build_state.build_columns_data_type, &build_state.build_num_rows, - &mut probe_state.string_items_buf, )?) } else { None diff --git a/src/query/service/src/pipelines/processors/transforms/hash_join/probe_join/left_semi_join.rs b/src/query/service/src/pipelines/processors/transforms/hash_join/probe_join/left_semi_join.rs index 1f98cc837261..bae30ec7ccd4 100644 --- a/src/query/service/src/pipelines/processors/transforms/hash_join/probe_join/left_semi_join.rs +++ b/src/query/service/src/pipelines/processors/transforms/hash_join/probe_join/left_semi_join.rs @@ -82,7 +82,6 @@ impl HashJoinProbeState { result_blocks.push(DataBlock::take( &process_state.input, &probe_indexes[0..matched_idx], - &mut probe_state.generation_state.string_items_buf, )?); } @@ -232,7 +231,6 @@ impl HashJoinProbeState { result_blocks.push(DataBlock::take( &process_state.input, &probe_indexes[0..matched_idx], - &mut probe_state.generation_state.string_items_buf, )?); } @@ -261,11 +259,7 @@ impl HashJoinProbeState { } let probe_block = if probe_state.is_probe_projected { - Some(DataBlock::take( - input, - &probe_indexes[0..matched_idx], - &mut probe_state.string_items_buf, - )?) + Some(DataBlock::take(input, &probe_indexes[0..matched_idx])?) } else { None }; @@ -275,7 +269,6 @@ impl HashJoinProbeState { &build_state.build_columns, &build_state.build_columns_data_type, &build_state.build_num_rows, - &mut probe_state.string_items_buf, )?) } else { None diff --git a/src/query/service/src/pipelines/processors/transforms/hash_join/probe_join/right_join.rs b/src/query/service/src/pipelines/processors/transforms/hash_join/probe_join/right_join.rs index f5730d477934..cac83b169073 100644 --- a/src/query/service/src/pipelines/processors/transforms/hash_join/probe_join/right_join.rs +++ b/src/query/service/src/pipelines/processors/transforms/hash_join/probe_join/right_join.rs @@ -242,11 +242,7 @@ impl HashJoinProbeState { } let probe_block = if probe_state.is_probe_projected { - let probe_block = DataBlock::take( - input, - &probe_indexes[0..matched_idx], - &mut probe_state.string_items_buf, - )?; + let probe_block = DataBlock::take(input, &probe_indexes[0..matched_idx])?; // The join type is right join, we need to wrap nullable for probe side. let nullable_columns = probe_block @@ -264,7 +260,6 @@ impl HashJoinProbeState { &build_state.build_columns, &build_state.build_columns_data_type, &build_state.build_num_rows, - &mut probe_state.string_items_buf, )?) } else { None diff --git a/src/query/service/src/pipelines/processors/transforms/hash_join/probe_join/right_mark_join.rs b/src/query/service/src/pipelines/processors/transforms/hash_join/probe_join/right_mark_join.rs index 270a67cfe9cb..372e46aa0b03 100644 --- a/src/query/service/src/pipelines/processors/transforms/hash_join/probe_join/right_mark_join.rs +++ b/src/query/service/src/pipelines/processors/transforms/hash_join/probe_join/right_mark_join.rs @@ -276,11 +276,7 @@ impl HashJoinProbeState { } let probe_block = if probe_state.is_probe_projected { - Some(DataBlock::take( - input, - &probe_indexes[0..matched_idx], - &mut probe_state.string_items_buf, - )?) + Some(DataBlock::take(input, &probe_indexes[0..matched_idx])?) } else { None }; @@ -290,7 +286,6 @@ impl HashJoinProbeState { &build_state.build_columns, &build_state.build_columns_data_type, &build_state.build_num_rows, - &mut probe_state.string_items_buf, )?) } else { None diff --git a/src/query/service/src/pipelines/processors/transforms/hash_join/probe_join/right_semi_anti_join.rs b/src/query/service/src/pipelines/processors/transforms/hash_join/probe_join/right_semi_anti_join.rs index 95098d589c17..d9d345d5fa49 100644 --- a/src/query/service/src/pipelines/processors/transforms/hash_join/probe_join/right_semi_anti_join.rs +++ b/src/query/service/src/pipelines/processors/transforms/hash_join/probe_join/right_semi_anti_join.rs @@ -282,11 +282,7 @@ impl HashJoinProbeState { } let probe_block = if probe_state.is_probe_projected { - Some(DataBlock::take( - input, - &probe_indexes[0..matched_idx], - &mut probe_state.string_items_buf, - )?) + Some(DataBlock::take(input, &probe_indexes[0..matched_idx])?) } else { None }; @@ -296,7 +292,6 @@ impl HashJoinProbeState { &build_state.build_columns, &build_state.build_columns_data_type, &build_state.build_num_rows, - &mut probe_state.string_items_buf, )?) } else { None diff --git a/src/query/service/src/pipelines/processors/transforms/hash_join/probe_state.rs b/src/query/service/src/pipelines/processors/transforms/hash_join/probe_state.rs index 87d7c063b1df..87ac376e47da 100644 --- a/src/query/service/src/pipelines/processors/transforms/hash_join/probe_state.rs +++ b/src/query/service/src/pipelines/processors/transforms/hash_join/probe_state.rs @@ -85,7 +85,6 @@ impl ProbeState { max_block_size: usize, join_type: &JoinType, with_conjunction: bool, - has_string_column: bool, func_ctx: FunctionContext, other_predicate: Option, ) -> Self { @@ -134,7 +133,7 @@ impl ProbeState { process_state: None, max_block_size, mutable_indexes: MutableIndexes::new(max_block_size), - generation_state: ProbeBlockGenerationState::new(max_block_size, has_string_column), + generation_state: ProbeBlockGenerationState::new(max_block_size), selection: vec![0; max_block_size], hashes: vec![0; max_block_size], selection_count: 0, @@ -185,20 +184,13 @@ pub struct ProbeBlockGenerationState { pub(crate) is_probe_projected: bool, // When we need a bitmap that is all true, we can directly slice it to reduce memory usage. pub(crate) true_validity: Bitmap, - // The string_items_buf is used as a buffer to reduce memory allocation when taking [u8] Columns. - pub(crate) string_items_buf: Option>, } impl ProbeBlockGenerationState { - fn new(size: usize, has_string_column: bool) -> Self { + fn new(size: usize) -> Self { Self { is_probe_projected: false, true_validity: Bitmap::new_constant(true, size), - string_items_buf: if has_string_column { - Some(vec![(0, 0); size]) - } else { - None - }, } } } diff --git a/src/query/service/src/pipelines/processors/transforms/hash_join/row.rs b/src/query/service/src/pipelines/processors/transforms/hash_join/row.rs index 776c88dfeb8b..059e4ed2483a 100644 --- a/src/query/service/src/pipelines/processors/transforms/hash_join/row.rs +++ b/src/query/service/src/pipelines/processors/transforms/hash_join/row.rs @@ -65,7 +65,6 @@ impl RowSpace { build_columns: &[ColumnVec], build_columns_data_type: &[DataType], num_rows: &usize, - string_items_buf: &mut Option>, ) -> Result { if *num_rows != 0 { let data_block = DataBlock::take_column_vec( @@ -73,7 +72,6 @@ impl RowSpace { build_columns_data_type, row_ptrs, row_ptrs.len(), - string_items_buf, ); Ok(data_block) } else { diff --git a/src/query/service/src/pipelines/processors/transforms/hash_join/transform_hash_join_probe.rs b/src/query/service/src/pipelines/processors/transforms/hash_join/transform_hash_join_probe.rs index c478a19f5ebf..cd113272c8b3 100644 --- a/src/query/service/src/pipelines/processors/transforms/hash_join/transform_hash_join_probe.rs +++ b/src/query/service/src/pipelines/processors/transforms/hash_join/transform_hash_join_probe.rs @@ -126,7 +126,6 @@ impl TransformHashJoinProbe { func_ctx: FunctionContext, join_type: &JoinType, with_conjunct: bool, - has_string_column: bool, ) -> Result> { join_probe_state.probe_attach(); // Create a hash join spiller. @@ -153,7 +152,6 @@ impl TransformHashJoinProbe { max_block_size, join_type, with_conjunct, - has_string_column, func_ctx, other_predicate, ); diff --git a/src/query/storages/fuse/src/statistics/cluster_statistics.rs b/src/query/storages/fuse/src/statistics/cluster_statistics.rs index 01b2316a70e0..0b72a4794aee 100644 --- a/src/query/storages/fuse/src/statistics/cluster_statistics.rs +++ b/src/query/storages/fuse/src/statistics/cluster_statistics.rs @@ -106,7 +106,7 @@ impl ClusterStatsGenerator { if !self.cluster_key_index.is_empty() { let indices = vec![0u32, block.num_rows() as u32 - 1]; - block = block.take(&indices, &mut None)?; + block = block.take(&indices)?; } block = self