diff --git a/datafusion/physical-plan/src/aggregates/group_values/column.rs b/datafusion/physical-plan/src/aggregates/group_values/column.rs index 28f35b2bded2e..4ad75844f7b76 100644 --- a/datafusion/physical-plan/src/aggregates/group_values/column.rs +++ b/datafusion/physical-plan/src/aggregates/group_values/column.rs @@ -16,14 +16,16 @@ // under the License. use crate::aggregates::group_values::group_column::{ - ByteGroupValueBuilder, GroupColumn, PrimitiveGroupValueBuilder, + ByteGroupValueBuilder, ByteViewGroupValueBuilder, GroupColumn, + PrimitiveGroupValueBuilder, }; use crate::aggregates::group_values::GroupValues; use ahash::RandomState; use arrow::compute::cast; use arrow::datatypes::{ - Date32Type, Date64Type, Float32Type, Float64Type, Int16Type, Int32Type, Int64Type, - Int8Type, UInt16Type, UInt32Type, UInt64Type, UInt8Type, + BinaryViewType, Date32Type, Date64Type, Float32Type, Float64Type, Int16Type, + Int32Type, Int64Type, Int8Type, StringViewType, UInt16Type, UInt32Type, UInt64Type, + UInt8Type, }; use arrow::record_batch::RecordBatch; use arrow_array::{Array, ArrayRef}; @@ -119,6 +121,8 @@ impl GroupValuesColumn { | DataType::LargeBinary | DataType::Date32 | DataType::Date64 + | DataType::Utf8View + | DataType::BinaryView ) } } @@ -184,6 +188,14 @@ impl GroupValues for GroupValuesColumn { let b = ByteGroupValueBuilder::::new(OutputType::Binary); v.push(Box::new(b) as _) } + &DataType::Utf8View => { + let b = ByteViewGroupValueBuilder::::new(); + v.push(Box::new(b) as _) + } + &DataType::BinaryView => { + let b = ByteViewGroupValueBuilder::::new(); + v.push(Box::new(b) as _) + } dt => { return not_impl_err!("{dt} not supported in GroupValuesColumn") } diff --git a/datafusion/physical-plan/src/aggregates/group_values/group_column.rs b/datafusion/physical-plan/src/aggregates/group_values/group_column.rs index 5d00f300e960c..3206af3aa230c 100644 --- a/datafusion/physical-plan/src/aggregates/group_values/group_column.rs +++ b/datafusion/physical-plan/src/aggregates/group_values/group_column.rs @@ -15,7 +15,9 @@ // specific language governing permissions and limitations // under the License. +use arrow::array::make_view; use arrow::array::BufferBuilder; +use arrow::array::ByteView; use arrow::array::GenericBinaryArray; use arrow::array::GenericStringArray; use arrow::array::OffsetSizeTrait; @@ -24,16 +26,23 @@ use arrow::array::{Array, ArrayRef, ArrowPrimitiveType, AsArray}; use arrow::buffer::OffsetBuffer; use arrow::buffer::ScalarBuffer; use arrow::datatypes::ByteArrayType; +use arrow::datatypes::ByteViewType; use arrow::datatypes::DataType; use arrow::datatypes::GenericBinaryType; +use arrow_array::GenericByteViewArray; +use arrow_buffer::Buffer; use datafusion_common::utils::proxy::VecAllocExt; use crate::aggregates::group_values::null_builder::MaybeNullBufferBuilder; use arrow_array::types::GenericStringType; use datafusion_physical_expr_common::binary_map::{OutputType, INITIAL_BUFFER_CAPACITY}; +use std::marker::PhantomData; +use std::mem; use std::sync::Arc; use std::vec; +const BYTE_VIEW_MAX_BLOCK_SIZE: usize = 2 * 1024 * 1024; + /// Trait for storing a single column of group values in [`GroupValuesColumn`] /// /// Implementations of this trait store an in-progress collection of group values @@ -376,6 +385,405 @@ where } } +/// An implementation of [`GroupColumn`] for binary view and utf8 view types. +/// +/// Stores a collection of binary view or utf8 view group values in a buffer +/// whose structure is similar to `GenericByteViewArray`, and we can get benefits: +/// +/// 1. Efficient comparison of incoming rows to existing rows +/// 2. Efficient construction of the final output array +/// 3. Efficient to perform `take_n` comparing to use `GenericByteViewBuilder` +pub struct ByteViewGroupValueBuilder { + /// The views of string values + /// + /// If string len <= 12, the view's format will be: + /// string(12B) | len(4B) + /// + /// If string len > 12, its format will be: + /// offset(4B) | buffer_index(4B) | prefix(4B) | len(4B) + views: Vec, + + /// The progressing block + /// + /// New values will be inserted into it until its capacity + /// is not enough(detail can see `max_block_size`). + in_progress: Vec, + + /// The completed blocks + completed: Vec, + + /// The max size of `in_progress` + /// + /// `in_progress` will be flushed into `completed`, and create new `in_progress` + /// when found its remaining capacity(`max_block_size` - `len(in_progress)`), + /// is no enough to store the appended value. + /// + /// Currently it is fixed at 2MB. + max_block_size: usize, + + /// Nulls + nulls: MaybeNullBufferBuilder, + + /// phantom data so the type requires `` + _phantom: PhantomData, +} + +impl ByteViewGroupValueBuilder { + pub fn new() -> Self { + Self { + views: Vec::new(), + in_progress: Vec::new(), + completed: Vec::new(), + max_block_size: BYTE_VIEW_MAX_BLOCK_SIZE, + nulls: MaybeNullBufferBuilder::new(), + _phantom: PhantomData {}, + } + } + + /// Set the max block size + fn with_max_block_size(mut self, max_block_size: usize) -> Self { + self.max_block_size = max_block_size; + self + } + + fn append_val_inner(&mut self, array: &ArrayRef, row: usize) + where + B: ByteViewType, + { + let arr = array.as_byte_view::(); + + // Null row case, set and return + if arr.is_null(row) { + self.nulls.append(true); + self.views.push(0); + return; + } + + // Not null row case + self.nulls.append(false); + let value: &[u8] = arr.value(row).as_ref(); + + let value_len = value.len(); + let view = if value_len <= 12 { + make_view(value, 0, 0) + } else { + // Ensure big enough block to hold the value firstly + self.ensure_in_progress_big_enough(value_len); + + // Append value + let buffer_index = self.completed.len(); + let offset = self.in_progress.len(); + self.in_progress.extend_from_slice(value); + + make_view(value, buffer_index as u32, offset as u32) + }; + + // Append view + self.views.push(view); + } + + fn ensure_in_progress_big_enough(&mut self, value_len: usize) { + debug_assert!(value_len > 12); + let require_cap = self.in_progress.len() + value_len; + + // If current block isn't big enough, flush it and create a new in progress block + if require_cap > self.max_block_size { + let flushed_block = mem::replace( + &mut self.in_progress, + Vec::with_capacity(self.max_block_size), + ); + let buffer = Buffer::from_vec(flushed_block); + self.completed.push(buffer); + } + } + + fn equal_to_inner(&self, lhs_row: usize, array: &ArrayRef, rhs_row: usize) -> bool { + let array = array.as_byte_view::(); + + // Check if nulls equal firstly + let exist_null = self.nulls.is_null(lhs_row); + let input_null = array.is_null(rhs_row); + if let Some(result) = nulls_equal_to(exist_null, input_null) { + return result; + } + + // Otherwise, we need to check their values + let exist_view = self.views[lhs_row]; + let exist_view_len = exist_view as u32; + + let input_view = array.views()[rhs_row]; + let input_view_len = input_view as u32; + + // The check logic + // - Check len equality + // - If inlined, check inlined value + // - If non-inlined, check prefix and then check value in buffer + // when needed + if exist_view_len != input_view_len { + return false; + } + + if exist_view_len <= 12 { + let exist_inline = unsafe { + GenericByteViewArray::::inline_value( + &exist_view, + exist_view_len as usize, + ) + }; + let input_inline = unsafe { + GenericByteViewArray::::inline_value( + &input_view, + input_view_len as usize, + ) + }; + exist_inline == input_inline + } else { + let exist_prefix = + unsafe { GenericByteViewArray::::inline_value(&exist_view, 4) }; + let input_prefix = + unsafe { GenericByteViewArray::::inline_value(&input_view, 4) }; + + if exist_prefix != input_prefix { + return false; + } + + let exist_full = { + let byte_view = ByteView::from(exist_view); + self.value( + byte_view.buffer_index as usize, + byte_view.offset as usize, + byte_view.length as usize, + ) + }; + let input_full: &[u8] = unsafe { array.value_unchecked(rhs_row).as_ref() }; + exist_full == input_full + } + } + + fn value(&self, buffer_index: usize, offset: usize, length: usize) -> &[u8] { + debug_assert!(buffer_index <= self.completed.len()); + + if buffer_index < self.completed.len() { + let block = &self.completed[buffer_index]; + &block[offset..offset + length] + } else { + &self.in_progress[offset..offset + length] + } + } + + fn build_inner(self) -> ArrayRef { + let Self { + views, + in_progress, + mut completed, + nulls, + .. + } = self; + + // Build nulls + let null_buffer = nulls.build(); + + // Build values + // Flush `in_process` firstly + if !in_progress.is_empty() { + let buffer = Buffer::from(in_progress); + completed.push(buffer); + } + + let views = ScalarBuffer::from(views); + + // Safety: + // * all views were correctly made + // * (if utf8): Input was valid Utf8 so buffer contents are + // valid utf8 as well + unsafe { + Arc::new(GenericByteViewArray::::new_unchecked( + views, + completed, + null_buffer, + )) + } + } + + fn take_n_inner(&mut self, n: usize) -> ArrayRef { + debug_assert!(self.len() >= n); + + // The `n == len` case, we need to take all + if self.len() == n { + let new_builder = Self::new().with_max_block_size(self.max_block_size); + let cur_builder = std::mem::replace(self, new_builder); + return cur_builder.build_inner(); + } + + // The `n < len` case + // Take n for nulls + let null_buffer = self.nulls.take_n(n); + + // Take n for values: + // - Take first n `view`s from `views` + // + // - Find the last non-inlined `view`, if all inlined, + // we can build array and return happily, otherwise we + // we need to continue to process related buffers + // + // - Get the last related `buffer index`(let's name it `buffer index n`) + // from last non-inlined `view` + // + // - Take buffers, the key is that we need to know if we need to take + // the whole last related buffer. The logic is a bit complex, you can + // detail in `take_buffers_with_whole_last`, `take_buffers_with_partial_last` + // and other related steps in following + // + // - Shift the `buffer index` of remaining non-inlined `views` + // + let first_n_views = self.views.drain(0..n).collect::>(); + + let last_non_inlined_view = first_n_views + .iter() + .rev() + .find(|view| ((**view) as u32) > 12); + + if let Some(view) = last_non_inlined_view { + let view = ByteView::from(*view); + let last_related_buffer_index = view.buffer_index as usize; + + // Check should we take the whole `last_related_buffer_index` buffer + let take_whole_last_buffer = self.should_take_whole_buffer( + last_related_buffer_index, + (view.offset + view.length) as usize, + ); + + // Take related buffers + let buffers = if take_whole_last_buffer { + self.take_buffers_with_whole_last(last_related_buffer_index) + } else { + self.take_buffers_with_partial_last( + last_related_buffer_index, + (view.offset + view.length) as usize, + ) + }; + + // Shift `buffer index`s finally + let shifts = if take_whole_last_buffer { + last_related_buffer_index + 1 + } else { + last_related_buffer_index + }; + + self.views.iter_mut().for_each(|view| { + if (*view as u32) > 12 { + let mut byte_view = ByteView::from(*view); + byte_view.buffer_index -= shifts as u32; + *view = byte_view.as_u128(); + } + }); + + // Build array and return + let views = ScalarBuffer::from(first_n_views); + Arc::new(GenericByteViewArray::::new(views, buffers, null_buffer)) + } else { + let views = ScalarBuffer::from(first_n_views); + Arc::new(GenericByteViewArray::::new( + views, + Vec::new(), + null_buffer, + )) + } + } + + fn take_buffers_with_whole_last( + &mut self, + last_related_buffer_index: usize, + ) -> Vec { + if last_related_buffer_index == self.completed.len() { + self.flush_in_progress(); + } + self.completed + .drain(0..last_related_buffer_index + 1) + .collect() + } + + fn take_buffers_with_partial_last( + &mut self, + last_related_buffer_index: usize, + take_len: usize, + ) -> Vec { + let mut take_buffers = Vec::with_capacity(last_related_buffer_index + 1); + + // Take `0 ~ last_related_buffer_index - 1` buffers + if !self.completed.is_empty() || last_related_buffer_index == 0 { + take_buffers.extend(self.completed.drain(0..last_related_buffer_index)); + } + + // Process the `last_related_buffer_index` buffers + let last_buffer = if last_related_buffer_index < self.completed.len() { + // If it is in `completed`, simply clone + self.completed[last_related_buffer_index].clone() + } else { + // If it is `in_progress`, copied `0 ~ offset` part + let taken_last_buffer = self.in_progress[0..take_len].to_vec(); + Buffer::from_vec(taken_last_buffer) + }; + take_buffers.push(last_buffer); + + take_buffers + } + + #[inline] + fn should_take_whole_buffer(&self, buffer_index: usize, take_len: usize) -> bool { + if buffer_index < self.completed.len() { + take_len == self.completed[buffer_index].len() + } else { + take_len == self.in_progress.len() + } + } + + fn flush_in_progress(&mut self) { + let flushed_block = mem::replace( + &mut self.in_progress, + Vec::with_capacity(self.max_block_size), + ); + let buffer = Buffer::from_vec(flushed_block); + self.completed.push(buffer); + } +} + +impl GroupColumn for ByteViewGroupValueBuilder { + fn equal_to(&self, lhs_row: usize, array: &ArrayRef, rhs_row: usize) -> bool { + self.equal_to_inner(lhs_row, array, rhs_row) + } + + fn append_val(&mut self, array: &ArrayRef, row: usize) { + self.append_val_inner(array, row) + } + + fn len(&self) -> usize { + self.views.len() + } + + fn size(&self) -> usize { + let buffers_size = self + .completed + .iter() + .map(|buf| buf.capacity() * std::mem::size_of::()) + .sum::(); + + self.nulls.allocated_size() + + self.views.capacity() * std::mem::size_of::() + + self.in_progress.capacity() * std::mem::size_of::() + + buffers_size + + std::mem::size_of::() + } + + fn build(self: Box) -> ArrayRef { + Self::build_inner(*self) + } + + fn take_n(&mut self, n: usize) -> ArrayRef { + self.take_n_inner(n) + } +} + /// Determines if the nullability of the existing and new input array can be used /// to short-circuit the comparison of the two values. /// @@ -394,12 +802,17 @@ fn nulls_equal_to(lhs_null: bool, rhs_null: bool) -> Option { mod tests { use std::sync::Arc; - use arrow::datatypes::Int64Type; - use arrow_array::{ArrayRef, Int64Array, StringArray}; + use arrow::{ + array::AsArray, + datatypes::{Int64Type, StringViewType}, + }; + use arrow_array::{ArrayRef, Int64Array, StringArray, StringViewArray}; use arrow_buffer::{BooleanBufferBuilder, NullBuffer}; use datafusion_physical_expr::binary_map::OutputType; - use crate::aggregates::group_values::group_column::PrimitiveGroupValueBuilder; + use crate::aggregates::group_values::group_column::{ + ByteViewGroupValueBuilder, PrimitiveGroupValueBuilder, + }; use super::{ByteGroupValueBuilder, GroupColumn}; @@ -579,4 +992,208 @@ mod tests { assert!(!builder.equal_to(4, &input_array, 4)); assert!(builder.equal_to(5, &input_array, 5)); } + + #[test] + fn test_byte_view_append_val() { + let mut builder = + ByteViewGroupValueBuilder::::new().with_max_block_size(60); + let builder_array = StringViewArray::from(vec![ + Some("this string is quite long"), // in buffer 0 + Some("foo"), + None, + Some("bar"), + Some("this string is also quite long"), // buffer 0 + Some("this string is quite long"), // buffer 1 + Some("bar"), + ]); + let builder_array: ArrayRef = Arc::new(builder_array); + for row in 0..builder_array.len() { + builder.append_val(&builder_array, row); + } + + let output = Box::new(builder).build(); + // should be 2 output buffers to hold all the data + assert_eq!(output.as_string_view().data_buffers().len(), 2,); + assert_eq!(&output, &builder_array) + } + + #[test] + fn test_byte_view_equal_to() { + // Will cover such cases: + // - exist null, input not null + // - exist null, input null; values not equal + // - exist null, input null; values equal + // - exist not null, input null + // - exist not null, input not null; values not equal + // - exist not null, input not null; values equal + + let mut builder = ByteViewGroupValueBuilder::::new(); + let builder_array = Arc::new(StringViewArray::from(vec![ + None, + None, + None, + Some("foo"), + Some("bar"), + Some("this string is quite long"), + Some("baz"), + ])) as ArrayRef; + builder.append_val(&builder_array, 0); + builder.append_val(&builder_array, 1); + builder.append_val(&builder_array, 2); + builder.append_val(&builder_array, 3); + builder.append_val(&builder_array, 4); + builder.append_val(&builder_array, 5); + builder.append_val(&builder_array, 6); + + // Define input array + let (views, buffer, _nulls) = StringViewArray::from(vec![ + Some("foo"), + Some("bar"), // set to null + Some("this string is quite long"), // set to null + None, + None, + Some("foo"), + Some("baz"), + ]) + .into_parts(); + + // explicitly build a boolean buffer where one of the null values also happens to match + let mut boolean_buffer_builder = BooleanBufferBuilder::new(6); + boolean_buffer_builder.append(true); + boolean_buffer_builder.append(false); // this sets Some("bar") to null above + boolean_buffer_builder.append(false); // this sets Some("thisstringisquitelong") to null above + boolean_buffer_builder.append(false); + boolean_buffer_builder.append(false); + boolean_buffer_builder.append(true); + boolean_buffer_builder.append(true); + let nulls = NullBuffer::new(boolean_buffer_builder.finish()); + let input_array = + Arc::new(StringViewArray::new(views, buffer, Some(nulls))) as ArrayRef; + + // Check + assert!(!builder.equal_to(0, &input_array, 0)); + assert!(builder.equal_to(1, &input_array, 1)); + assert!(builder.equal_to(2, &input_array, 2)); + assert!(!builder.equal_to(3, &input_array, 3)); + assert!(!builder.equal_to(4, &input_array, 4)); + assert!(!builder.equal_to(5, &input_array, 5)); + assert!(builder.equal_to(6, &input_array, 6)); + } + + #[test] + fn test_byte_view_take_n() { + // ####### Define cases and init ####### + + // `take_n` is really complex, we should consider and test following situations: + // 1. Take nulls + // 2. Take all `inlined`s + // 3. Take non-inlined + partial last buffer in `completed` + // 4. Take non-inlined + whole last buffer in `completed` + // 5. Take non-inlined + partial last `in_progress` + // 6. Take non-inlined + while last buffer in ``in_progress` + // 7. Take all views at once + + let mut builder = + ByteViewGroupValueBuilder::::new().with_max_block_size(60); + let input_array = StringViewArray::from(vec![ + // Test situation 1 + None, + None, + // Test situation 2 (also test take null together) + None, + Some("foo"), + Some("bar"), + // Test situation 3 (also test take null + inlined) + None, + Some("foo"), + Some("this string is quite long"), + Some("this string is also quite long"), + // Test situation 4 (also test take null + inlined) + None, + Some("bar"), + Some("this string is quite long"), + // Test situation 5 (also test take null + inlined) + None, + Some("foo"), + Some("another string that is is quite long"), + Some("this string not so long"), + // Test situation 6 (also test take null + inlined + insert again after taking) + None, + Some("bar"), + Some("this string is quite long"), + // Insert 4 and just take 3 to ensure it will go the path of situation 6 + None, + // Finally, we create a new builder, insert the whole array and then + // take whole at once for testing situation 7 + ]); + + let input_array: ArrayRef = Arc::new(input_array); + let first_ones_to_append = 16; // For testing situation 1~5 + let second_ones_to_append = 3; // For testing situation 6 + let final_ones_to_append = input_array.len(); // For testing situation 7 + + // ####### Test situation 1~5 ####### + for row in 0..first_ones_to_append { + builder.append_val(&input_array, row); + } + + assert_eq!(builder.completed.len(), 2); + assert_eq!(builder.in_progress.len(), 59); + + // Situation 1 + let taken_array = builder.take_n(2); + assert_eq!(&taken_array, &input_array.slice(0, 2)); + + // Situation 2 + let taken_array = builder.take_n(3); + assert_eq!(&taken_array, &input_array.slice(2, 3)); + + // Situation 3 + let taken_array = builder.take_n(3); + assert_eq!(&taken_array, &input_array.slice(5, 3)); + + let taken_array = builder.take_n(1); + assert_eq!(&taken_array, &input_array.slice(8, 1)); + + // Situation 4 + let taken_array = builder.take_n(3); + assert_eq!(&taken_array, &input_array.slice(9, 3)); + + // Situation 5 + let taken_array = builder.take_n(3); + assert_eq!(&taken_array, &input_array.slice(12, 3)); + + let taken_array = builder.take_n(1); + assert_eq!(&taken_array, &input_array.slice(15, 1)); + + // ####### Test situation 6 ####### + assert!(builder.completed.is_empty()); + assert!(builder.in_progress.is_empty()); + assert!(builder.views.is_empty()); + + for row in first_ones_to_append..first_ones_to_append + second_ones_to_append { + builder.append_val(&input_array, row); + } + + assert!(builder.completed.is_empty()); + assert_eq!(builder.in_progress.len(), 25); + + let taken_array = builder.take_n(3); + assert_eq!(&taken_array, &input_array.slice(16, 3)); + + // ####### Test situation 7 ####### + // Create a new builder + let mut builder = + ByteViewGroupValueBuilder::::new().with_max_block_size(60); + + for row in 0..final_ones_to_append { + builder.append_val(&input_array, row); + } + + assert_eq!(builder.completed.len(), 3); + assert_eq!(builder.in_progress.len(), 25); + + let taken_array = builder.take_n(final_ones_to_append); + assert_eq!(&taken_array, &input_array); + } } diff --git a/datafusion/sqllogictest/test_files/group_by.slt b/datafusion/sqllogictest/test_files/group_by.slt index a80a0891e9770..8202b806a755c 100644 --- a/datafusion/sqllogictest/test_files/group_by.slt +++ b/datafusion/sqllogictest/test_files/group_by.slt @@ -5208,3 +5208,65 @@ NULL a 2 statement ok drop table t; + +# test multi group by int + utf8view +statement ok +create table source as values +-- use some strings that are larger than 12 characters as that goes through a different path +(1, 'a'), +(1, 'a'), +(2, 'thisstringislongerthan12'), +(2, 'thisstring'), +(3, 'abc'), +(3, 'cba'), +(2, 'thisstring'), +(null, null), +(null, 'a'), +(null, null), +(null, 'a'), +(2, 'thisstringisalsolongerthan12'), +(2, 'thisstringislongerthan12'), +(1, 'null') +; + +statement ok +create view t as select column1 as a, arrow_cast(column2, 'Utf8View') as b from source; + +query ITI +select a, b, count(*) from t group by a, b order by a, b; +---- +1 a 2 +1 null 1 +2 thisstring 2 +2 thisstringisalsolongerthan12 1 +2 thisstringislongerthan12 2 +3 abc 1 +3 cba 1 +NULL a 2 +NULL NULL 2 + +statement ok +drop view t + +# test with binary view +statement ok +create view t as select column1 as a, arrow_cast(column2, 'BinaryView') as b from source; + +query I?I +select a, b, count(*) from t group by a, b order by a, b; +---- +1 61 2 +1 6e756c6c 1 +2 74686973737472696e67 2 +2 74686973737472696e676973616c736f6c6f6e6765727468616e3132 1 +2 74686973737472696e6769736c6f6e6765727468616e3132 2 +3 616263 1 +3 636261 1 +NULL 61 2 +NULL NULL 2 + +statement ok +drop view t + +statement ok +drop table source;