diff --git a/datafusion/physical-plan/src/coalesce/mod.rs b/datafusion/physical-plan/src/coalesce/mod.rs index 46875fae94fc..0d23edfaa746 100644 --- a/datafusion/physical-plan/src/coalesce/mod.rs +++ b/datafusion/physical-plan/src/coalesce/mod.rs @@ -124,6 +124,18 @@ impl BatchCoalescer { } } + /// Push next batch, and returns [`CoalescerState`] indicating the current + /// state of the buffer. + pub fn push_batch_without_gc(&mut self, batch: RecordBatch) -> CoalescerState { + if self.limit_reached(&batch) { + CoalescerState::LimitReached + } else if self.target_reached(batch) { + CoalescerState::TargetReached + } else { + CoalescerState::Continue + } + } + /// Return true if the there is no data buffered pub fn is_empty(&self) -> bool { self.buffer.is_empty() diff --git a/datafusion/physical-plan/src/coalesce_batches.rs b/datafusion/physical-plan/src/coalesce_batches.rs index 11678e7a4696..9b96e9496ad5 100644 --- a/datafusion/physical-plan/src/coalesce_batches.rs +++ b/datafusion/physical-plan/src/coalesce_batches.rs @@ -271,7 +271,7 @@ impl Stream for CoalesceBatchesStream { /// - updated buffer: `{[4000]}` /// - next state: `Exhausted` #[derive(Debug, Clone, Eq, PartialEq)] -enum CoalesceBatchesStreamState { +pub enum CoalesceBatchesStreamState { /// State to pull a new batch from the input stream. Pull, /// State to return a buffered batch. diff --git a/datafusion/physical-plan/src/filter.rs b/datafusion/physical-plan/src/filter.rs index 07898e8d22d8..37728e17c233 100644 --- a/datafusion/physical-plan/src/filter.rs +++ b/datafusion/physical-plan/src/filter.rs @@ -15,6 +15,8 @@ // specific language governing permissions and limitations // under the License. +mod filter_coalesce; + use std::any::Any; use std::pin::Pin; use std::sync::Arc; @@ -24,6 +26,7 @@ use super::{ ColumnStatistics, DisplayAs, ExecutionPlanProperties, PlanProperties, RecordBatchStream, SendableRecordBatchStream, Statistics, }; +use crate::coalesce::{BatchCoalescer, CoalescerState}; use crate::common::can_project; use crate::{ metrics::{BaselineMetrics, ExecutionPlanMetricsSet, MetricsSet}, @@ -31,8 +34,12 @@ use crate::{ }; use arrow::compute::filter_record_batch; -use arrow::datatypes::{DataType, SchemaRef}; +use arrow::datatypes::{ + DataType, Date32Type, Date64Type, Float32Type, Float64Type, Int16Type, Int32Type, + Int64Type, Int8Type, SchemaRef, UInt16Type, UInt32Type, UInt64Type, UInt8Type, +}; use arrow::record_batch::RecordBatch; +use arrow_array::{BooleanArray, RecordBatchOptions}; use datafusion_common::cast::as_boolean_array; use datafusion_common::stats::Precision; use datafusion_common::{ @@ -40,6 +47,7 @@ use datafusion_common::{ }; use datafusion_execution::TaskContext; use datafusion_expr::Operator; +use datafusion_physical_expr::binary_map::OutputType; use datafusion_physical_expr::equivalence::ProjectionMapping; use datafusion_physical_expr::expressions::BinaryExpr; use datafusion_physical_expr::intervals::utils::check_support; @@ -47,11 +55,33 @@ use datafusion_physical_expr::utils::collect_columns; use datafusion_physical_expr::{ analyze, split_conjunction, AnalysisContext, ConstExpr, ExprBoundaries, PhysicalExpr, }; +use filter_coalesce::{ + ByteFilterBuilder, FilterBuilder, FilterCoalescer, PrimitiveFilterBuilder, +}; use crate::execution_plan::CardinalityEffect; use futures::stream::{Stream, StreamExt}; use log::trace; +/// instantiates a [`PrimitiveFilterBuilder`] and pushes it into $v +/// +/// Arguments: +/// `$v`: the vector to push the new builder into +/// `$nullable`: whether the input can contains nulls +/// `$t`: the primitive type of the builder +/// +macro_rules! instantiate_primitive { + ($v:expr, $nullable:expr, $t:ty) => { + if $nullable { + let b = PrimitiveFilterBuilder::<$t, true>::new(); + $v.push(Box::new(b) as _) + } else { + let b = PrimitiveFilterBuilder::<$t, false>::new(); + $v.push(Box::new(b) as _) + } + }; +} + /// FilterExec evaluates a boolean predicate against all input batches to determine which rows to /// include in its output batches. #[derive(Debug, Clone)] @@ -355,12 +385,62 @@ impl ExecutionPlan for FilterExec { ) -> Result { trace!("Start FilterExec::execute for partition {} of context session_id {} and task_id {:?}", partition, context.session_id(), context.task_id()); let baseline_metrics = BaselineMetrics::new(&self.metrics, partition); + + let len = self.schema().fields().len(); + let mut v = Vec::with_capacity(len); + let schema = self.schema(); + if filter_coalesce::supported_schema(&schema) { + let nullable = true; + for f in self.schema().fields().iter() { + match *f.data_type() { + DataType::Int8 => instantiate_primitive!(v, nullable, Int8Type), + DataType::Int16 => instantiate_primitive!(v, nullable, Int16Type), + DataType::Int32 => instantiate_primitive!(v, nullable, Int32Type), + DataType::Int64 => instantiate_primitive!(v, nullable, Int64Type), + DataType::UInt8 => instantiate_primitive!(v, nullable, UInt8Type), + DataType::UInt16 => instantiate_primitive!(v, nullable, UInt16Type), + DataType::UInt32 => instantiate_primitive!(v, nullable, UInt32Type), + DataType::UInt64 => instantiate_primitive!(v, nullable, UInt64Type), + DataType::Float32 => { + instantiate_primitive!(v, nullable, Float32Type) + } + DataType::Float64 => { + instantiate_primitive!(v, nullable, Float64Type) + } + DataType::Date32 => instantiate_primitive!(v, nullable, Date32Type), + DataType::Date64 => instantiate_primitive!(v, nullable, Date64Type), + DataType::Utf8 => { + let b = ByteFilterBuilder::::new(OutputType::Utf8); + v.push(Box::new(b) as _) + } + DataType::LargeUtf8 => { + let b = ByteFilterBuilder::::new(OutputType::Utf8); + v.push(Box::new(b) as _) + } + DataType::Binary => { + let b = ByteFilterBuilder::::new(OutputType::Binary); + v.push(Box::new(b) as _) + } + DataType::LargeBinary => { + let b = ByteFilterBuilder::::new(OutputType::Binary); + v.push(Box::new(b) as _) + } + _ => todo!(), + } + } + assert!(!v.is_empty()); + } else { + assert_eq!(v.len(), 0); + } + Ok(Box::pin(FilterExecStream { schema: self.schema(), predicate: Arc::clone(&self.predicate), input: self.input.execute(partition, context)?, baseline_metrics, projection: self.projection.clone(), + coalescer: BatchCoalescer::new(self.schema(), 8192, None), + filter_builder: v, })) } @@ -434,6 +514,12 @@ struct FilterExecStream { baseline_metrics: BaselineMetrics, /// The projection indices of the columns in the input schema projection: Option>, + /// Buffer for combining batches + coalescer: BatchCoalescer, + // The current inner state of the stream. This state dictates the current + // action or operation to be performed in the streaming process. + // inner_state: CoalesceBatchesStreamState, + filter_builder: Vec>, } pub fn batch_filter( @@ -476,6 +562,59 @@ fn filter_and_project( }) } +impl FilterExecStream { + pub fn filter_record_batch_v2( + &mut self, + record_batch: &RecordBatch, + predicate: &BooleanArray, + ) -> Result<()> { + let mut filter_builder = FilterBuilder::new(predicate); + if record_batch.num_columns() > 1 { + // Only optimize if filtering more than one column + // Otherwise, the overhead of optimization can be more than the benefit + filter_builder = filter_builder.optimize(); + } + let filter = filter_builder.build(); + + record_batch + .columns() + .iter() + .enumerate() + .map(|(i, a)| self.filter_builder[i].append_filtered_array(a, &filter)) + .collect::, _>>()?; + + Ok(()) + } + + fn emit_if_possible(&mut self) -> Result> { + let row_count = self.filter_builder[0].row_count(); + if row_count >= 8192 { + return self.emit(); + } + + Ok(None) + } + + fn emit(&mut self) -> Result> { + let row_count = self.filter_builder[0].row_count(); + + let filter_builder = std::mem::take(&mut self.filter_builder); + + let filtered_arrays = filter_builder + .into_iter() + .map(|b| b.build()) + .collect::>(); + + assert_eq!(self.filter_builder.len(), 0); + + let options = RecordBatchOptions::default().with_row_count(Some(row_count)); + // map arrow error to datafusion error + let r = + RecordBatch::try_new_with_options(self.schema(), filtered_arrays, &options)?; + Ok(Some(r)) + } +} + impl Stream for FilterExecStream { type Item = Result; @@ -487,23 +626,101 @@ impl Stream for FilterExecStream { loop { match ready!(self.input.poll_next_unpin(cx)) { Some(Ok(batch)) => { - let timer = self.baseline_metrics.elapsed_compute().timer(); - let filtered_batch = filter_and_project( - &batch, - &self.predicate, - self.projection.as_ref(), - &self.schema, - )?; - timer.done(); + // TODO: add timer back, there is mutable borrow issue + // let timer = self.baseline_metrics.elapsed_compute().timer(); + + let array = self + .predicate + .evaluate(&batch) + .and_then(|v| v.into_array(batch.num_rows()))?; + let filtered_batch = + match (as_boolean_array(&array), self.projection.as_ref()) { + // Apply filter array to record batch + (Ok(filter_array), None) => { + let force_to_old = false; + if force_to_old || self.filter_builder.is_empty() { + Ok(Some(filter_record_batch(&batch, filter_array)?)) + } else { + // println!("reach here"); + self.filter_record_batch_v2(&batch, filter_array)?; + self.emit_if_possible() + } + } + (Ok(filter_array), Some(projection)) => { + let projected_columns = projection + .iter() + .map(|i| Arc::clone(batch.column(*i))) + .collect(); + let projected_batch = RecordBatch::try_new( + Arc::clone(&self.schema), + projected_columns, + )?; + let b = + filter_record_batch(&projected_batch, filter_array)?; + Ok(Some(b)) + } + (Err(_), _) => { + internal_err!( + "Cannot create filter_array from non-boolean predicates" + ) + } + }?; + + // timer.done(); + + if filtered_batch.is_none() { + continue; + } + + let filtered_batch = filtered_batch.unwrap(); + // Skip entirely filtered batches if filtered_batch.num_rows() == 0 { continue; } - poll = Poll::Ready(Some(Ok(filtered_batch))); + match self.coalescer.push_batch_without_gc(filtered_batch) { + CoalescerState::Continue => { + let empty_batch = RecordBatch::new_empty(self.schema()); + poll = Poll::Ready(Some(Ok(empty_batch))) + } + CoalescerState::LimitReached => { + // Handle the end of the input stream. + poll = if self.coalescer.is_empty() { + // If buffer is empty, return None indicating the stream is fully consumed. + Poll::Ready(None) + } else { + // If the buffer still contains batches, prepare to return them. + let batch = self.coalescer.finish_batch()?; + Poll::Ready(Some(Ok(batch))) + }; + } + CoalescerState::TargetReached => { + // Combine buffered batches into one batch and return it. + let batch = self.coalescer.finish_batch()?; + // Set to pull state for the next iteration. + poll = Poll::Ready(Some(Ok(batch))); + } + } break; } value => { - poll = Poll::Ready(value); + if self.filter_builder.is_empty() { + if !self.coalescer.is_empty() { + // Combine buffered batches into one batch and return it. + let batch = self.coalescer.finish_batch()?; + // Set to pull state for the next iteration. + poll = Poll::Ready(Some(Ok(batch))); + } else { + poll = Poll::Ready(value); + } + } else { + if let Some(rb) = self.emit()? { + poll = Poll::Ready(Some(Ok(rb))); + } else { + poll = Poll::Ready(value); + } + } + break; } } diff --git a/datafusion/physical-plan/src/filter/filter_coalesce.rs b/datafusion/physical-plan/src/filter/filter_coalesce.rs new file mode 100644 index 000000000000..e59417976f47 --- /dev/null +++ b/datafusion/physical-plan/src/filter/filter_coalesce.rs @@ -0,0 +1,393 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +mod bytes; + +use std::sync::Arc; + +use arrow_schema::{DataType, Schema}; +pub(crate) use bytes::ByteFilterBuilder; + +use arrow::{ + array::AsArray, + compute::{prep_null_mask_filter, SlicesIterator}, +}; +use arrow_array::{ + new_empty_array, Array, ArrayRef, ArrowPrimitiveType, BooleanArray, PrimitiveArray, +}; +use arrow_buffer::{bit_iterator::BitIndexIterator, ScalarBuffer}; +use datafusion_common::Result; +use datafusion_expr::sqlparser::keywords::NULL; + +use crate::null_builder::MaybeNullBufferBuilder; + +/// If the filter selects more than this fraction of rows, use +/// [`SlicesIterator`] to copy ranges of values. Otherwise iterate +/// over individual rows using [`IndexIterator`] +/// +/// Threshold of 0.8 chosen based on +/// +const FILTER_SLICES_SELECTIVITY_THRESHOLD: f64 = 0.8; + +/// The iteration strategy used to evaluate [`FilterPredicate`] +#[derive(Debug)] +enum IterationStrategy { + /// A lazily evaluated iterator of ranges + SlicesIterator, + /// A lazily evaluated iterator of indices + IndexIterator, + /// A precomputed list of indices + Indices(Vec), + /// A precomputed array of ranges + Slices(Vec<(usize, usize)>), + /// Select all rows + All, + /// Select no rows + None, +} + +impl IterationStrategy { + /// The default [`IterationStrategy`] for a filter of length `filter_length` + /// and selecting `filter_count` rows + fn default_strategy(filter_length: usize, filter_count: usize) -> Self { + if filter_length == 0 || filter_count == 0 { + return IterationStrategy::None; + } + + if filter_count == filter_length { + return IterationStrategy::All; + } + + // Compute the selectivity of the predicate by dividing the number of true + // bits in the predicate by the predicate's total length + // + // This can then be used as a heuristic for the optimal iteration strategy + let selectivity_frac = filter_count as f64 / filter_length as f64; + if selectivity_frac > FILTER_SLICES_SELECTIVITY_THRESHOLD { + return IterationStrategy::SlicesIterator; + } + IterationStrategy::IndexIterator + } +} + +/// A filtering predicate that can be applied to an [`Array`] +#[derive(Debug)] +pub struct FilterPredicate { + filter: BooleanArray, + count: usize, + strategy: IterationStrategy, +} + +impl FilterPredicate { + // /// Selects rows from `values` based on this [`FilterPredicate`] + // pub fn filter(&self, values: &dyn Array) -> Result { + // filter_array(values, self) + // } + + /// Number of rows being selected based on this [`FilterPredicate`] + pub fn count(&self) -> usize { + self.count + } +} + +pub trait FilterCoalescer: Send + Sync { + fn append_filtered_array( + &mut self, + array: &ArrayRef, + predicate: &FilterPredicate, + ) -> Result<()>; + + fn row_count(&self) -> usize; + fn build(self: Box) -> ArrayRef; +} + +#[derive(Debug)] +pub struct PrimitiveFilterBuilder { + filter_values: Vec, + nulls: MaybeNullBufferBuilder, +} + +impl PrimitiveFilterBuilder +where + T: ArrowPrimitiveType, +{ + pub fn new() -> Self { + Self { + filter_values: vec![], + nulls: MaybeNullBufferBuilder::new(), + } + } +} + +impl FilterCoalescer for PrimitiveFilterBuilder +where + T: ArrowPrimitiveType, +{ + fn append_filtered_array( + &mut self, + array: &ArrayRef, + predicate: &FilterPredicate, + ) -> Result<()> { + let arr = array.as_primitive::(); + let values = arr.values(); + + match &predicate.strategy { + IterationStrategy::SlicesIterator => { + if NULLABLE { + for (start, end) in SlicesIterator::new(&predicate.filter) { + for row in start..end { + if arr.is_valid(row) { + self.filter_values.push(values[row]); + self.nulls.append(false); + } else { + self.filter_values.push(T::default_value()); + self.nulls.append(true); + } + } + } + } else { + for (start, end) in SlicesIterator::new(&predicate.filter) { + self.filter_values.extend(&values[start..end]); + } + } + } + IterationStrategy::Slices(slices) => { + if NULLABLE { + for (start, end) in slices { + let start = *start; + let end = *end; + for row in start..end { + if arr.is_valid(row) { + self.filter_values.push(values[row]); + self.nulls.append(false); + } else { + self.filter_values.push(T::default_value()); + self.nulls.append(true); + } + } + } + } else { + for (start, end) in SlicesIterator::new(&predicate.filter) { + self.filter_values.extend(&values[start..end]); + } + } + } + IterationStrategy::IndexIterator => { + if NULLABLE { + for row in IndexIterator::new(&predicate.filter, predicate.count) { + if arr.is_valid(row) { + self.filter_values.push(values[row]); + self.nulls.append(false); + } else { + self.filter_values.push(T::default_value()); + self.nulls.append(true); + } + } + } else { + for row in IndexIterator::new(&predicate.filter, predicate.count) { + self.filter_values.push(values[row]); + } + } + } + IterationStrategy::Indices(indices) => { + if NULLABLE { + for row in indices { + if arr.is_valid(*row) { + self.filter_values.push(values[*row]); + self.nulls.append(false); + } else { + self.filter_values.push(T::default_value()); + self.nulls.append(true); + } + } + } else { + let iter = indices.iter().map(|x| values[*x]); + self.filter_values.extend(iter); + } + } + IterationStrategy::None => {} + IterationStrategy::All => { + for v in arr.iter() { + if let Some(v) = v { + self.filter_values.push(v); + self.nulls.append(false); + } else { + self.filter_values.push(T::default_value()); + self.nulls.append(true); + } + } + } + } + + Ok(()) + } + + fn row_count(&self) -> usize { + self.filter_values.len() + } + + fn build(self: Box) -> ArrayRef { + let Self { + filter_values, + nulls, + } = *self; + + let nulls = nulls.build(); + if !NULLABLE { + assert!(nulls.is_none(), "unexpected nulls in non nullable input"); + } + + Arc::new(PrimitiveArray::::new( + ScalarBuffer::from(filter_values), + nulls, + )) + } +} + +/// An iterator of `usize` whose index in [`BooleanArray`] is true +/// +/// This provides the best performance on most predicates, apart from those which keep +/// large runs and therefore favour [`SlicesIterator`] +struct IndexIterator<'a> { + remaining: usize, + iter: BitIndexIterator<'a>, +} + +impl<'a> IndexIterator<'a> { + fn new(filter: &'a BooleanArray, remaining: usize) -> Self { + assert_eq!(filter.null_count(), 0); + let iter = filter.values().set_indices(); + Self { remaining, iter } + } +} + +impl Iterator for IndexIterator<'_> { + type Item = usize; + + fn next(&mut self) -> Option { + if self.remaining != 0 { + // Fascinatingly swapping these two lines around results in a 50% + // performance regression for some benchmarks + let next = self.iter.next().expect("IndexIterator exhausted early"); + self.remaining -= 1; + // Must panic if exhausted early as trusted length iterator + return Some(next); + } + None + } + + fn size_hint(&self) -> (usize, Option) { + (self.remaining, Some(self.remaining)) + } +} + +/// Returns true if [`GroupValuesColumn`] supported for the specified schema +pub fn supported_schema(schema: &Schema) -> bool { + schema + .fields() + .iter() + .map(|f| f.data_type()) + .all(supported_type) +} + +/// Returns true if the specified data type is supported by [`GroupValuesColumn`] +/// +/// In order to be supported, there must be a specialized implementation of +/// [`GroupColumn`] for the data type, instantiated in [`GroupValuesColumn::intern`] +fn supported_type(data_type: &DataType) -> bool { + matches!( + *data_type, + DataType::Int8 + | DataType::Int16 + | DataType::Int32 + | DataType::Int64 + | DataType::UInt8 + | DataType::UInt16 + | DataType::UInt32 + | DataType::UInt64 + | DataType::Float32 + | DataType::Float64 + | DataType::Utf8 + | DataType::LargeUtf8 + | DataType::Binary + | DataType::LargeBinary + | DataType::Date32 + | DataType::Date64 // | DataType::Utf8View + // | DataType::BinaryView + ) +} + +/// A builder to construct [`FilterPredicate`] +#[derive(Debug)] +pub struct FilterBuilder { + filter: BooleanArray, + count: usize, + strategy: IterationStrategy, +} + +impl FilterBuilder { + /// Create a new [`FilterBuilder`] that can be used to construct a [`FilterPredicate`] + pub fn new(filter: &BooleanArray) -> Self { + let filter = match filter.null_count() { + 0 => filter.clone(), + _ => prep_null_mask_filter(filter), + }; + + let count = filter_count(&filter); + let strategy = IterationStrategy::default_strategy(filter.len(), count); + + Self { + filter, + count, + strategy, + } + } + + /// Compute an optimised representation of the provided `filter` mask that can be + /// applied to an array more quickly. + /// + /// Note: There is limited benefit to calling this to then filter a single array + /// Note: This will likely have a larger memory footprint than the original mask + pub fn optimize(mut self) -> Self { + match self.strategy { + IterationStrategy::SlicesIterator => { + let slices = SlicesIterator::new(&self.filter).collect(); + self.strategy = IterationStrategy::Slices(slices) + } + IterationStrategy::IndexIterator => { + let indices = IndexIterator::new(&self.filter, self.count).collect(); + self.strategy = IterationStrategy::Indices(indices) + } + _ => {} + } + self + } + + /// Construct the final `FilterPredicate` + pub fn build(self) -> FilterPredicate { + FilterPredicate { + filter: self.filter, + count: self.count, + strategy: self.strategy, + } + } +} + +/// Counts the number of set bits in `filter` +fn filter_count(filter: &BooleanArray) -> usize { + filter.values().count_set_bits() +} diff --git a/datafusion/physical-plan/src/filter/filter_coalesce/bytes.rs b/datafusion/physical-plan/src/filter/filter_coalesce/bytes.rs new file mode 100644 index 000000000000..a55200567b35 --- /dev/null +++ b/datafusion/physical-plan/src/filter/filter_coalesce/bytes.rs @@ -0,0 +1,424 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use std::sync::Arc; + +use arrow::{ + array::{ArrayDataBuilder, AsArray}, + compute::SlicesIterator, + datatypes::{ByteArrayType, GenericBinaryType, GenericStringType}, +}; +use arrow_array::{ + Array, ArrayRef, GenericBinaryArray, GenericByteArray, GenericStringArray, + OffsetSizeTrait, +}; +use arrow_buffer::{ + bit_util, ArrowNativeType, BooleanBuffer, BooleanBufferBuilder, Buffer, + BufferBuilder, MutableBuffer, NullBuffer, OffsetBuffer, ScalarBuffer, +}; +use arrow_schema::DataType; +use datafusion_common::Result; +use datafusion_functions_aggregate_common::aggregate::groups_accumulator::nulls; +use datafusion_physical_expr::binary_map::OutputType; +use datafusion_physical_expr_common::binary_map::INITIAL_BUFFER_CAPACITY; + +use super::{ + FilterCoalescer, FilterPredicate, IndexIterator, IterationStrategy, + MaybeNullBufferBuilder, +}; + +pub struct ByteFilterBuilder +where + O: OffsetSizeTrait, +{ + output_type: OutputType, + buffer: BufferBuilder, + /// Offsets into `buffer` for each distinct value. These offsets as used + /// directly to create the final `GenericBinaryArray`. The `i`th string is + /// stored in the range `offsets[i]..offsets[i+1]` in `buffer`. Null values + /// are stored as a zero length string. + offsets: Vec, + nulls: MaybeNullBufferBuilder, +} + +impl ByteFilterBuilder +where + O: OffsetSizeTrait, +{ + pub fn new(output_type: OutputType) -> Self { + Self { + output_type, + buffer: BufferBuilder::new(INITIAL_BUFFER_CAPACITY), + offsets: vec![O::default()], + nulls: MaybeNullBufferBuilder::new(), + } + } + + fn append_filtered_array( + &mut self, + array: &ArrayRef, + predicate: &FilterPredicate, + ) -> Result<()> + where + B: ByteArrayType, + { + let arr = array.as_bytes::(); + match &predicate.strategy { + IterationStrategy::SlicesIterator => { + for (start, end) in SlicesIterator::new(&predicate.filter) { + for row in start..end { + if arr.is_null(row) { + self.nulls.append(true); + // nulls need a zero length in the offset buffer + let offset = self.buffer.len(); + self.offsets.push(O::usize_as(offset)); + } else { + self.nulls.append(false); + self.do_append_val_inner(arr, row); + } + } + } + } + IterationStrategy::Slices(slices) => { + for (start, end) in slices { + for row in *start..*end { + if arr.is_null(row) { + self.nulls.append(true); + // nulls need a zero length in the offset buffer + let offset = self.buffer.len(); + self.offsets.push(O::usize_as(offset)); + } else { + self.nulls.append(false); + self.do_append_val_inner(arr, row); + } + } + } + } + IterationStrategy::IndexIterator => { + for row in IndexIterator::new(&predicate.filter, predicate.count) { + if arr.is_null(row) { + self.nulls.append(true); + // nulls need a zero length in the offset buffer + let offset = self.buffer.len(); + self.offsets.push(O::usize_as(offset)); + } else { + self.nulls.append(false); + self.do_append_val_inner(arr, row); + } + } + } + IterationStrategy::Indices(indices) => { + for row in indices.iter() { + let row = *row; + if arr.is_null(row) { + self.nulls.append(true); + // nulls need a zero length in the offset buffer + let offset = self.buffer.len(); + self.offsets.push(O::usize_as(offset)); + } else { + self.nulls.append(false); + self.do_append_val_inner(arr, row); + } + } + } + IterationStrategy::None => {} + IterationStrategy::All => { + for row in 0..arr.len() { + if arr.is_null(row) { + self.nulls.append(true); + // nulls need a zero length in the offset buffer + let offset = self.buffer.len(); + self.offsets.push(O::usize_as(offset)); + } else { + self.nulls.append(false); + self.do_append_val_inner(arr, row); + } + } + } + } + + Ok(()) + } + + fn do_append_val_inner(&mut self, array: &GenericByteArray, row: usize) + where + B: ByteArrayType, + { + let value: &[u8] = array.value(row).as_ref(); + self.buffer.append_slice(value); + self.offsets.push(O::usize_as(self.buffer.len())); + } +} + +impl FilterCoalescer for ByteFilterBuilder +where + O: OffsetSizeTrait, +{ + fn append_filtered_array( + &mut self, + array: &ArrayRef, + predicate: &FilterPredicate, + ) -> Result<()> { + // Sanity array type + match self.output_type { + OutputType::Binary => { + debug_assert!(matches!( + array.data_type(), + DataType::Binary | DataType::LargeBinary + )); + self.append_filtered_array::>(array, predicate) + } + OutputType::Utf8 => { + debug_assert!(matches!( + array.data_type(), + DataType::Utf8 | DataType::LargeUtf8 + )); + self.append_filtered_array::>(array, predicate) + } + _ => unreachable!("View types should use `ArrowBytesViewMap`"), + } + } + + fn row_count(&self) -> usize { + self.offsets.len() - 1 + } + + fn build(self: Box) -> ArrayRef { + let Self { + output_type, + mut buffer, + offsets, + nulls, + } = *self; + + let null_buffer = nulls.build(); + + // SAFETY: the offsets were constructed correctly in `insert_if_new` -- + // monotonically increasing, overflows were checked. + let offsets = unsafe { OffsetBuffer::new_unchecked(ScalarBuffer::from(offsets)) }; + let values = buffer.finish(); + match output_type { + OutputType::Binary => { + // SAFETY: the offsets were constructed correctly + Arc::new(unsafe { + GenericBinaryArray::new_unchecked(offsets, values, null_buffer) + }) + } + OutputType::Utf8 => { + // SAFETY: + // 1. the offsets were constructed safely + // + // 2. the input arrays were all the correct type and thus since + // all the values that went in were valid (e.g. utf8) so are all + // the values that come out + Arc::new(unsafe { + GenericStringArray::new_unchecked(offsets, values, null_buffer) + }) + } + _ => unreachable!("View types should use `ArrowBytesViewMap`"), + } + } +} + +/// [`FilterBytes`] is created from a source [`GenericByteArray`] and can be +/// used to build a new [`GenericByteArray`] by copying values from the source +/// +/// TODO(raphael): Could this be used for the take kernel as well? +struct FilterBytes<'a, OffsetSize> { + src_offsets: &'a [OffsetSize], + src_values: &'a [u8], + dst_offsets: MutableBuffer, + dst_values: MutableBuffer, + cur_offset: OffsetSize, +} + +impl<'a, OffsetSize> FilterBytes<'a, OffsetSize> +where + OffsetSize: OffsetSizeTrait, +{ + fn new(capacity: usize, array: &'a GenericByteArray) -> Self + where + T: ByteArrayType, + { + let num_offsets_bytes = (capacity + 1) * size_of::(); + let mut dst_offsets = MutableBuffer::new(num_offsets_bytes); + let dst_values = MutableBuffer::new(0); + let cur_offset = OffsetSize::from_usize(0).unwrap(); + dst_offsets.push(cur_offset); + + Self { + src_offsets: array.value_offsets(), + src_values: array.value_data(), + dst_offsets, + dst_values, + cur_offset, + } + } + + /// Returns the byte offset at `idx` + #[inline] + fn get_value_offset(&self, idx: usize) -> usize { + self.src_offsets[idx].as_usize() + } + + /// Returns the start and end of the value at index `idx` along with its length + #[inline] + fn get_value_range(&self, idx: usize) -> (usize, usize, OffsetSize) { + // These can only fail if `array` contains invalid data + let start = self.get_value_offset(idx); + let end = self.get_value_offset(idx + 1); + let len = OffsetSize::from_usize(end - start).expect("illegal offset range"); + (start, end, len) + } + + /// Extends the in-progress array by the indexes in the provided iterator + fn extend_idx(&mut self, iter: impl Iterator) { + for idx in iter { + let (start, end, len) = self.get_value_range(idx); + self.cur_offset += len; + self.dst_offsets.push(self.cur_offset); + self.dst_values + .extend_from_slice(&self.src_values[start..end]); + } + } + + /// Extends the in-progress array by the ranges in the provided iterator + fn extend_slices(&mut self, iter: impl Iterator) { + for (start, end) in iter { + // These can only fail if `array` contains invalid data + for idx in start..end { + let (_, _, len) = self.get_value_range(idx); + self.cur_offset += len; + self.dst_offsets.push(self.cur_offset); // push_unchecked? + } + + let value_start = self.get_value_offset(start); + let value_end = self.get_value_offset(end); + self.dst_values + .extend_from_slice(&self.src_values[value_start..value_end]); + } + } +} + +/// `filter` implementation for byte arrays +/// +/// Note: NULLs with a non-zero slot length in `array` will have the corresponding +/// data copied across. This allows handling the null mask separately from the data +fn filter_bytes( + array: &GenericByteArray, + predicate: &FilterPredicate, +) -> GenericByteArray +where + T: ByteArrayType, +{ + let src_offsets = array.value_offsets(); + let src_values = array.value_data(); + + let mut filter = FilterBytes::new(predicate.count, array); + + match &predicate.strategy { + IterationStrategy::SlicesIterator => { + filter.extend_slices(SlicesIterator::new(&predicate.filter)) + } + IterationStrategy::Slices(slices) => filter.extend_slices(slices.iter().cloned()), + IterationStrategy::IndexIterator => { + filter.extend_idx(IndexIterator::new(&predicate.filter, predicate.count)) + } + IterationStrategy::Indices(indices) => filter.extend_idx(indices.iter().cloned()), + IterationStrategy::All | IterationStrategy::None => unreachable!(), + } + + let mut builder = ArrayDataBuilder::new(T::DATA_TYPE) + .len(predicate.count) + .add_buffer(filter.dst_offsets.into()) + .add_buffer(filter.dst_values.into()); + + if let Some((null_count, nulls)) = filter_null_mask(array.nulls(), predicate) { + builder = builder.null_count(null_count).null_bit_buffer(Some(nulls)); + } + + let data = unsafe { builder.build_unchecked() }; + GenericByteArray::from(data) +} + +/// Computes a new null mask for `data` based on `predicate` +/// +/// If the predicate selected no null-rows, returns `None`, otherwise returns +/// `Some((null_count, null_buffer))` where `null_count` is the number of nulls +/// in the filtered output, and `null_buffer` is the filtered null buffer +/// +fn filter_null_mask( + nulls: Option<&NullBuffer>, + predicate: &FilterPredicate, +) -> Option<(usize, Buffer)> { + let nulls = nulls?; + if nulls.null_count() == 0 { + return None; + } + + let nulls = filter_bits(nulls.inner(), predicate); + // The filtered `nulls` has a length of `predicate.count` bits and + // therefore the null count is this minus the number of valid bits + let null_count = predicate.count - nulls.count_set_bits_offset(0, predicate.count); + + if null_count == 0 { + return None; + } + + Some((null_count, nulls)) +} + +// Filter the packed bitmask `buffer`, with `predicate` starting at bit offset `offset` +fn filter_bits(buffer: &BooleanBuffer, predicate: &FilterPredicate) -> Buffer { + let src = buffer.values(); + let offset = buffer.offset(); + + match &predicate.strategy { + IterationStrategy::IndexIterator => { + let bits = IndexIterator::new(&predicate.filter, predicate.count) + .map(|src_idx| bit_util::get_bit(src, src_idx + offset)); + + // SAFETY: `IndexIterator` reports its size correctly + unsafe { MutableBuffer::from_trusted_len_iter_bool(bits).into() } + } + IterationStrategy::Indices(indices) => { + let bits = indices + .iter() + .map(|src_idx| bit_util::get_bit(src, *src_idx + offset)); + + // SAFETY: `Vec::iter()` reports its size correctly + unsafe { MutableBuffer::from_trusted_len_iter_bool(bits).into() } + } + IterationStrategy::SlicesIterator => { + let mut builder = + BooleanBufferBuilder::new(bit_util::ceil(predicate.count, 8)); + for (start, end) in SlicesIterator::new(&predicate.filter) { + builder.append_packed_range(start + offset..end + offset, src) + } + builder.into() + } + IterationStrategy::Slices(slices) => { + let mut builder = + BooleanBufferBuilder::new(bit_util::ceil(predicate.count, 8)); + for (start, end) in slices { + builder.append_packed_range(*start + offset..*end + offset, src) + } + builder.into() + } + IterationStrategy::All | IterationStrategy::None => unreachable!(), + } +} diff --git a/datafusion/physical-plan/src/lib.rs b/datafusion/physical-plan/src/lib.rs index 845a74eaea48..de8485f6169c 100644 --- a/datafusion/physical-plan/src/lib.rs +++ b/datafusion/physical-plan/src/lib.rs @@ -66,6 +66,7 @@ pub mod joins; pub mod limit; pub mod memory; pub mod metrics; +mod null_builder; pub mod placeholder_row; pub mod projection; pub mod recursive_query; diff --git a/datafusion/physical-plan/src/null_builder.rs b/datafusion/physical-plan/src/null_builder.rs new file mode 100644 index 000000000000..a584cf58e50a --- /dev/null +++ b/datafusion/physical-plan/src/null_builder.rs @@ -0,0 +1,133 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use arrow_buffer::{BooleanBufferBuilder, NullBuffer}; + +/// Builder for an (optional) null mask +/// +/// Optimized for avoid creating the bitmask when all values are non-null +#[derive(Debug)] +pub(crate) enum MaybeNullBufferBuilder { + /// seen `row_count` rows but no nulls yet + NoNulls { row_count: usize }, + /// have at least one null value + /// + /// Note this is an Arrow *VALIDITY* buffer (so it is false for nulls, true + /// for non-nulls) + Nulls(BooleanBufferBuilder), +} + +impl MaybeNullBufferBuilder { + /// Create a new builder + pub fn new() -> Self { + Self::NoNulls { row_count: 0 } + } + + /// Return true if the row at index `row` is null + pub fn is_null(&self, row: usize) -> bool { + match self { + Self::NoNulls { .. } => false, + // validity mask means a unset bit is NULL + Self::Nulls(builder) => !builder.get_bit(row), + } + } + + /// Set the nullness of the next row to `is_null` + /// + /// num_values is the current length of the rows being tracked + /// + /// If `value` is true, the row is null. + /// If `value` is false, the row is non null + pub fn append(&mut self, is_null: bool) { + match self { + Self::NoNulls { row_count } if is_null => { + // have seen no nulls so far, this is the first null, + // need to create the nulls buffer for all currently valid values + // alloc 2x the need given we push a new but immediately + let mut nulls = BooleanBufferBuilder::new(*row_count * 2); + nulls.append_n(*row_count, true); + nulls.append(false); + *self = Self::Nulls(nulls); + } + Self::NoNulls { row_count } => { + *row_count += 1; + } + Self::Nulls(builder) => builder.append(!is_null), + } + } + + pub fn append_n(&mut self, n: usize, is_null: bool) { + match self { + Self::NoNulls { row_count } if is_null => { + // have seen no nulls so far, this is the first null, + // need to create the nulls buffer for all currently valid values + // alloc 2x the need given we push a new but immediately + let mut nulls = BooleanBufferBuilder::new(*row_count * 2); + nulls.append_n(*row_count, true); + nulls.append_n(n, false); + *self = Self::Nulls(nulls); + } + Self::NoNulls { row_count } => { + *row_count += n; + } + Self::Nulls(builder) => builder.append_n(n, !is_null), + } + } + + /// return the number of heap allocated bytes used by this structure to store boolean values + pub fn allocated_size(&self) -> usize { + match self { + Self::NoNulls { .. } => 0, + // BooleanBufferBuilder builder::capacity returns capacity in bits (not bytes) + Self::Nulls(builder) => builder.capacity() / 8, + } + } + + /// Return a NullBuffer representing the accumulated nulls so far + pub fn build(self) -> Option { + match self { + Self::NoNulls { .. } => None, + Self::Nulls(mut builder) => Some(NullBuffer::from(builder.finish())), + } + } + + /// Returns a NullBuffer representing the first `n` rows accumulated so far + /// shifting any remaining down by `n` + pub fn take_n(&mut self, n: usize) -> Option { + match self { + Self::NoNulls { row_count } => { + *row_count -= n; + None + } + Self::Nulls(builder) => { + // Copy over the values at n..len-1 values to the start of a + // new builder and leave it in self + // + // TODO: it would be great to use something like `set_bits` from arrow here. + let mut new_builder = BooleanBufferBuilder::new(builder.len()); + for i in n..builder.len() { + new_builder.append(builder.get_bit(i)); + } + std::mem::swap(&mut new_builder, builder); + + // take only first n values from the original builder + new_builder.truncate(n); + Some(NullBuffer::from(new_builder.finish())) + } + } + } +} diff --git a/datafusion/sqllogictest/test_files/test1.slt b/datafusion/sqllogictest/test_files/test1.slt new file mode 100644 index 000000000000..e69de29bb2d1 diff --git a/datafusion/sqllogictest/test_files/test2.slt b/datafusion/sqllogictest/test_files/test2.slt new file mode 100644 index 000000000000..e631df8cd38d --- /dev/null +++ b/datafusion/sqllogictest/test_files/test2.slt @@ -0,0 +1,11 @@ +statement ok +CREATE EXTERNAL TABLE hits +STORED AS PARQUET +LOCATION '../../benchmarks/data/hits.parquet'; + +query I +SELECT COUNT(*) FROM hits WHERE "URL" LIKE '%google%'; +---- +15911 + +