Skip to content

Commit

Permalink
backup
Browse files Browse the repository at this point in the history
Signed-off-by: jayzhan211 <[email protected]>
  • Loading branch information
jayzhan211 committed Nov 13, 2024
1 parent 9476c64 commit c6cbfd3
Show file tree
Hide file tree
Showing 2 changed files with 216 additions and 1 deletion.
58 changes: 57 additions & 1 deletion datafusion/physical-plan/src/filter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@
// specific language governing permissions and limitations
// under the License.

mod filter_coalesce;

use std::any::Any;
use std::pin::Pin;
use std::sync::Arc;
Expand All @@ -32,9 +34,12 @@ use crate::{
DisplayFormatType, ExecutionPlan,
};

use arrow::compute::filter_record_batch;
use arrow::array::AsArray;
use arrow::compute::{filter_record_batch, SlicesIterator};
use arrow::datatypes::{DataType, SchemaRef};
use arrow::record_batch::RecordBatch;
use arrow_array::{Array, ArrayRef, ArrowPrimitiveType, BooleanArray};
use arrow_buffer::bit_iterator::BitIndexIterator;
use datafusion_common::cast::as_boolean_array;
use datafusion_common::stats::Precision;
use datafusion_common::{
Expand All @@ -49,11 +54,31 @@ use datafusion_physical_expr::utils::collect_columns;
use datafusion_physical_expr::{
analyze, split_conjunction, AnalysisContext, ConstExpr, ExprBoundaries, PhysicalExpr,
};
use filter_coalesce::{FilterCoalescer, PrimitiveFilterBuilder};

use crate::execution_plan::CardinalityEffect;
use futures::stream::{Stream, StreamExt};
use log::trace;

/// instantiates a [`PrimitiveFilterBuilder`] and pushes it into $v
///
/// Arguments:
/// `$v`: the vector to push the new builder into
/// `$nullable`: whether the input can contains nulls
/// `$t`: the primitive type of the builder
///
macro_rules! instantiate_primitive {
($v:expr, $nullable:expr, $t:ty) => {
if $nullable {
let b = PrimitiveFilterBuilder::<$t, true>::new();
$v.push(Box::new(b) as _)
} else {
let b = PrimitiveFilterBuilder::<$t, false>::new();
$v.push(Box::new(b) as _)
}
};
}

/// FilterExec evaluates a boolean predicate against all input batches to determine which rows to
/// include in its output batches.
#[derive(Debug, Clone)]
Expand Down Expand Up @@ -357,13 +382,43 @@ impl ExecutionPlan for FilterExec {
) -> Result<SendableRecordBatchStream> {
trace!("Start FilterExec::execute for partition {} of context session_id {} and task_id {:?}", partition, context.session_id(), context.task_id());
let baseline_metrics = BaselineMetrics::new(&self.metrics, partition);

let len = self.schema().fields().len();
let mut v = Vec::with_capacity(len);
for f in self.schema().fields().iter() {
match f.data_type() {
&DataType::Int8 => instantiate_primitive!(v, nullable, Int8Type),
&DataType::Int16 => instantiate_primitive!(v, nullable, Int16Type),
&DataType::Int32 => instantiate_primitive!(v, nullable, Int32Type),
&DataType::Int64 => instantiate_primitive!(v, nullable, Int64Type),
&DataType::UInt8 => instantiate_primitive!(v, nullable, UInt8Type),
&DataType::UInt16 => instantiate_primitive!(v, nullable, UInt16Type),
&DataType::UInt32 => instantiate_primitive!(v, nullable, UInt32Type),
&DataType::UInt64 => instantiate_primitive!(v, nullable, UInt64Type),
&DataType::Float32 => {
instantiate_primitive!(v, nullable, Float32Type)
}
&DataType::Float64 => {
instantiate_primitive!(v, nullable, Float64Type)
}
&DataType::Date32 => instantiate_primitive!(v, nullable, Date32Type),
&DataType::Date64 => instantiate_primitive!(v, nullable, Date64Type),
&DataType::Utf8 => {
let b = ByteGroupValueBuilder::<i32>::new(OutputType::Utf8);
v.push(Box::new(b) as _)
}
_ => todo!(),
}
}

Ok(Box::pin(FilterExecStream {
schema: self.schema(),
predicate: Arc::clone(&self.predicate),
input: self.input.execute(partition, context)?,
baseline_metrics,
projection: self.projection.clone(),
coalescer: BatchCoalescer::new(self.schema(), 8192, None),
filter_builder: v,
}))
}

Expand Down Expand Up @@ -442,6 +497,7 @@ struct FilterExecStream {
// The current inner state of the stream. This state dictates the current
// action or operation to be performed in the streaming process.
// inner_state: CoalesceBatchesStreamState,
filter_builder: Vec<Box<dyn FilterCoalescer>>,
}

pub fn batch_filter(
Expand Down
159 changes: 159 additions & 0 deletions datafusion/physical-plan/src/filter/filter_coalesce.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,159 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

use arrow::compute::SlicesIterator;
use arrow_array::{ArrayRef, ArrowPrimitiveType, BooleanArray};
use arrow_buffer::bit_iterator::BitIndexIterator;
use datafusion_common::Result;

// The iteration strategy used to evaluate [`FilterPredicate`]
#[derive(Debug)]
enum IterationStrategy {
/// A lazily evaluated iterator of ranges
SlicesIterator,
/// A lazily evaluated iterator of indices
IndexIterator,
/// A precomputed list of indices
Indices(Vec<usize>),
/// A precomputed array of ranges
Slices(Vec<(usize, usize)>),
/// Select all rows
All,
/// Select no rows
None,
}

/// A filtering predicate that can be applied to an [`Array`]
#[derive(Debug)]
pub struct FilterPredicate {
filter: BooleanArray,
count: usize,
strategy: IterationStrategy,
}

impl FilterPredicate {
// /// Selects rows from `values` based on this [`FilterPredicate`]
// pub fn filter(&self, values: &dyn Array) -> Result<ArrayRef> {
// filter_array(values, self)
// }

/// Number of rows being selected based on this [`FilterPredicate`]
pub fn count(&self) -> usize {
self.count
}
}

pub trait FilterCoalescer: Send + Sync {
fn append_filtered_array(
&mut self,
array: &ArrayRef,
predicate: &FilterPredicate,
) -> Result<()>;
}

#[derive(Debug)]
pub struct PrimitiveFilterBuilder<T: ArrowPrimitiveType, const NULLABLE: bool> {
filter_values: Vec<T::Native>,
// nulls: MaybeNullBufferBuilder,
}

impl<T, const NULLABLE: bool> PrimitiveFilterBuilder<T, NULLABLE>
where
T: ArrowPrimitiveType,
{
pub fn new() -> Self {
Self {
filter_values: vec![],
}
}
}

impl<T, const NULLABLE: bool> FilterCoalescer for PrimitiveFilterBuilder<T, NULLABLE>
where
T: ArrowPrimitiveType,
{
fn append_filtered_array(
&mut self,
array: &ArrayRef,
predicate: &FilterPredicate,
) -> Result<()> {
let arr = array.as_primitive::<T>();
let values = arr.values();

match &predicate.strategy {
IterationStrategy::SlicesIterator => {
for (start, end) in SlicesIterator::new(&predicate.filter) {
self.filter_values.extend(&values[start..end]);
}
}
IterationStrategy::Slices(slices) => {
for (start, end) in slices {
self.filter_values.extend(&values[*start..*end]);
}
}
IterationStrategy::IndexIterator => {
let iter = IndexIterator::new(&predicate.filter, predicate.count)
.map(|x| values[x]);
self.filter_values.extend(iter);
}
IterationStrategy::Indices(indices) => {
let iter = indices.iter().map(|x| values[*x]);
self.filter_values.extend(iter);
}
IterationStrategy::All | IterationStrategy::None => unreachable!(),
}

Ok(())
}
}

/// An iterator of `usize` whose index in [`BooleanArray`] is true
///
/// This provides the best performance on most predicates, apart from those which keep
/// large runs and therefore favour [`SlicesIterator`]
struct IndexIterator<'a> {
remaining: usize,
iter: BitIndexIterator<'a>,
}

impl<'a> IndexIterator<'a> {
fn new(filter: &'a BooleanArray, remaining: usize) -> Self {
assert_eq!(filter.null_count(), 0);
let iter = filter.values().set_indices();
Self { remaining, iter }
}
}

impl Iterator for IndexIterator<'_> {
type Item = usize;

fn next(&mut self) -> Option<Self::Item> {
if self.remaining != 0 {
// Fascinatingly swapping these two lines around results in a 50%
// performance regression for some benchmarks
let next = self.iter.next().expect("IndexIterator exhausted early");
self.remaining -= 1;
// Must panic if exhausted early as trusted length iterator
return Some(next);
}
None
}

fn size_hint(&self) -> (usize, Option<usize>) {
(self.remaining, Some(self.remaining))
}
}

0 comments on commit c6cbfd3

Please sign in to comment.