Skip to content

Commit

Permalink
filter coalescer
Browse files Browse the repository at this point in the history
Signed-off-by: Jay Zhan <[email protected]>
  • Loading branch information
jayzhan211 committed Nov 16, 2024
1 parent 6d8313e commit 884aafa
Show file tree
Hide file tree
Showing 9 changed files with 1,203 additions and 12 deletions.
12 changes: 12 additions & 0 deletions datafusion/physical-plan/src/coalesce/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -124,6 +124,18 @@ impl BatchCoalescer {
}
}

/// Push next batch, and returns [`CoalescerState`] indicating the current
/// state of the buffer.
pub fn push_batch_without_gc(&mut self, batch: RecordBatch) -> CoalescerState {
if self.limit_reached(&batch) {
CoalescerState::LimitReached
} else if self.target_reached(batch) {
CoalescerState::TargetReached
} else {
CoalescerState::Continue
}
}

/// Return true if the there is no data buffered
pub fn is_empty(&self) -> bool {
self.buffer.is_empty()
Expand Down
2 changes: 1 addition & 1 deletion datafusion/physical-plan/src/coalesce_batches.rs
Original file line number Diff line number Diff line change
Expand Up @@ -271,7 +271,7 @@ impl Stream for CoalesceBatchesStream {
/// - updated buffer: `{[4000]}`
/// - next state: `Exhausted`
#[derive(Debug, Clone, Eq, PartialEq)]
enum CoalesceBatchesStreamState {
pub enum CoalesceBatchesStreamState {
/// State to pull a new batch from the input stream.
Pull,
/// State to return a buffered batch.
Expand Down
239 changes: 228 additions & 11 deletions datafusion/physical-plan/src/filter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@
// specific language governing permissions and limitations
// under the License.

mod filter_coalesce;

use std::any::Any;
use std::pin::Pin;
use std::sync::Arc;
Expand All @@ -24,34 +26,62 @@ use super::{
ColumnStatistics, DisplayAs, ExecutionPlanProperties, PlanProperties,
RecordBatchStream, SendableRecordBatchStream, Statistics,
};
use crate::coalesce::{BatchCoalescer, CoalescerState};
use crate::common::can_project;
use crate::{
metrics::{BaselineMetrics, ExecutionPlanMetricsSet, MetricsSet},
DisplayFormatType, ExecutionPlan,
};

use arrow::compute::filter_record_batch;
use arrow::datatypes::{DataType, SchemaRef};
use arrow::datatypes::{
DataType, Date32Type, Date64Type, Float32Type, Float64Type, Int16Type, Int32Type,
Int64Type, Int8Type, SchemaRef, UInt16Type, UInt32Type, UInt64Type, UInt8Type,
};
use arrow::record_batch::RecordBatch;
use arrow_array::{BooleanArray, RecordBatchOptions};
use datafusion_common::cast::as_boolean_array;
use datafusion_common::stats::Precision;
use datafusion_common::{
internal_err, plan_err, project_schema, DataFusionError, Result,
};
use datafusion_execution::TaskContext;
use datafusion_expr::Operator;
use datafusion_physical_expr::binary_map::OutputType;
use datafusion_physical_expr::equivalence::ProjectionMapping;
use datafusion_physical_expr::expressions::BinaryExpr;
use datafusion_physical_expr::intervals::utils::check_support;
use datafusion_physical_expr::utils::collect_columns;
use datafusion_physical_expr::{
analyze, split_conjunction, AnalysisContext, ConstExpr, ExprBoundaries, PhysicalExpr,
};
use filter_coalesce::{
ByteFilterBuilder, FilterBuilder, FilterCoalescer, PrimitiveFilterBuilder,
};

use crate::execution_plan::CardinalityEffect;
use futures::stream::{Stream, StreamExt};
use log::trace;

/// instantiates a [`PrimitiveFilterBuilder`] and pushes it into $v
///
/// Arguments:
/// `$v`: the vector to push the new builder into
/// `$nullable`: whether the input can contains nulls
/// `$t`: the primitive type of the builder
///
macro_rules! instantiate_primitive {
($v:expr, $nullable:expr, $t:ty) => {
if $nullable {
let b = PrimitiveFilterBuilder::<$t, true>::new();
$v.push(Box::new(b) as _)
} else {
let b = PrimitiveFilterBuilder::<$t, false>::new();
$v.push(Box::new(b) as _)
}
};
}

/// FilterExec evaluates a boolean predicate against all input batches to determine which rows to
/// include in its output batches.
#[derive(Debug, Clone)]
Expand Down Expand Up @@ -355,12 +385,62 @@ impl ExecutionPlan for FilterExec {
) -> Result<SendableRecordBatchStream> {
trace!("Start FilterExec::execute for partition {} of context session_id {} and task_id {:?}", partition, context.session_id(), context.task_id());
let baseline_metrics = BaselineMetrics::new(&self.metrics, partition);

let len = self.schema().fields().len();
let mut v = Vec::with_capacity(len);
let schema = self.schema();
if filter_coalesce::supported_schema(&schema) {
let nullable = true;
for f in self.schema().fields().iter() {
match *f.data_type() {
DataType::Int8 => instantiate_primitive!(v, nullable, Int8Type),
DataType::Int16 => instantiate_primitive!(v, nullable, Int16Type),
DataType::Int32 => instantiate_primitive!(v, nullable, Int32Type),
DataType::Int64 => instantiate_primitive!(v, nullable, Int64Type),
DataType::UInt8 => instantiate_primitive!(v, nullable, UInt8Type),
DataType::UInt16 => instantiate_primitive!(v, nullable, UInt16Type),
DataType::UInt32 => instantiate_primitive!(v, nullable, UInt32Type),
DataType::UInt64 => instantiate_primitive!(v, nullable, UInt64Type),
DataType::Float32 => {
instantiate_primitive!(v, nullable, Float32Type)
}
DataType::Float64 => {
instantiate_primitive!(v, nullable, Float64Type)
}
DataType::Date32 => instantiate_primitive!(v, nullable, Date32Type),
DataType::Date64 => instantiate_primitive!(v, nullable, Date64Type),
DataType::Utf8 => {
let b = ByteFilterBuilder::<i32>::new(OutputType::Utf8);
v.push(Box::new(b) as _)
}
DataType::LargeUtf8 => {
let b = ByteFilterBuilder::<i64>::new(OutputType::Utf8);
v.push(Box::new(b) as _)
}
DataType::Binary => {
let b = ByteFilterBuilder::<i32>::new(OutputType::Binary);
v.push(Box::new(b) as _)
}
DataType::LargeBinary => {
let b = ByteFilterBuilder::<i64>::new(OutputType::Binary);
v.push(Box::new(b) as _)
}
_ => todo!(),
}
}
assert!(!v.is_empty());
} else {
assert_eq!(v.len(), 0);
}

Ok(Box::pin(FilterExecStream {
schema: self.schema(),
predicate: Arc::clone(&self.predicate),
input: self.input.execute(partition, context)?,
baseline_metrics,
projection: self.projection.clone(),
coalescer: BatchCoalescer::new(self.schema(), 8192, None),
filter_builder: v,
}))
}

Expand Down Expand Up @@ -434,6 +514,12 @@ struct FilterExecStream {
baseline_metrics: BaselineMetrics,
/// The projection indices of the columns in the input schema
projection: Option<Vec<usize>>,
/// Buffer for combining batches
coalescer: BatchCoalescer,
// The current inner state of the stream. This state dictates the current
// action or operation to be performed in the streaming process.
// inner_state: CoalesceBatchesStreamState,
filter_builder: Vec<Box<dyn FilterCoalescer>>,
}

pub fn batch_filter(
Expand Down Expand Up @@ -476,6 +562,59 @@ fn filter_and_project(
})
}

impl FilterExecStream {
pub fn filter_record_batch_v2(
&mut self,
record_batch: &RecordBatch,
predicate: &BooleanArray,
) -> Result<()> {
let mut filter_builder = FilterBuilder::new(predicate);
if record_batch.num_columns() > 1 {
// Only optimize if filtering more than one column
// Otherwise, the overhead of optimization can be more than the benefit
filter_builder = filter_builder.optimize();
}
let filter = filter_builder.build();

record_batch
.columns()
.iter()
.enumerate()
.map(|(i, a)| self.filter_builder[i].append_filtered_array(a, &filter))
.collect::<Result<Vec<_>, _>>()?;

Ok(())
}

fn emit_if_possible(&mut self) -> Result<Option<RecordBatch>> {
let row_count = self.filter_builder[0].row_count();
if row_count >= 8192 {
return self.emit();
}

Ok(None)
}

fn emit(&mut self) -> Result<Option<RecordBatch>> {
let row_count = self.filter_builder[0].row_count();

let filter_builder = std::mem::take(&mut self.filter_builder);

let filtered_arrays = filter_builder
.into_iter()
.map(|b| b.build())
.collect::<Vec<_>>();

assert_eq!(self.filter_builder.len(), 0);

let options = RecordBatchOptions::default().with_row_count(Some(row_count));
// map arrow error to datafusion error
let r =
RecordBatch::try_new_with_options(self.schema(), filtered_arrays, &options)?;
Ok(Some(r))
}
}

impl Stream for FilterExecStream {
type Item = Result<RecordBatch>;

Expand All @@ -487,23 +626,101 @@ impl Stream for FilterExecStream {
loop {
match ready!(self.input.poll_next_unpin(cx)) {
Some(Ok(batch)) => {
let timer = self.baseline_metrics.elapsed_compute().timer();
let filtered_batch = filter_and_project(
&batch,
&self.predicate,
self.projection.as_ref(),
&self.schema,
)?;
timer.done();
// TODO: add timer back, there is mutable borrow issue
// let timer = self.baseline_metrics.elapsed_compute().timer();

let array = self
.predicate
.evaluate(&batch)
.and_then(|v| v.into_array(batch.num_rows()))?;
let filtered_batch =
match (as_boolean_array(&array), self.projection.as_ref()) {
// Apply filter array to record batch
(Ok(filter_array), None) => {
let force_to_old = false;
if force_to_old || self.filter_builder.is_empty() {
Ok(Some(filter_record_batch(&batch, filter_array)?))
} else {
// println!("reach here");
self.filter_record_batch_v2(&batch, filter_array)?;
self.emit_if_possible()
}
}
(Ok(filter_array), Some(projection)) => {
let projected_columns = projection
.iter()
.map(|i| Arc::clone(batch.column(*i)))
.collect();
let projected_batch = RecordBatch::try_new(
Arc::clone(&self.schema),
projected_columns,
)?;
let b =
filter_record_batch(&projected_batch, filter_array)?;
Ok(Some(b))
}
(Err(_), _) => {
internal_err!(
"Cannot create filter_array from non-boolean predicates"
)
}
}?;

// timer.done();

if filtered_batch.is_none() {
continue;
}

let filtered_batch = filtered_batch.unwrap();

// Skip entirely filtered batches
if filtered_batch.num_rows() == 0 {
continue;
}
poll = Poll::Ready(Some(Ok(filtered_batch)));
match self.coalescer.push_batch_without_gc(filtered_batch) {
CoalescerState::Continue => {
let empty_batch = RecordBatch::new_empty(self.schema());
poll = Poll::Ready(Some(Ok(empty_batch)))
}
CoalescerState::LimitReached => {
// Handle the end of the input stream.
poll = if self.coalescer.is_empty() {
// If buffer is empty, return None indicating the stream is fully consumed.
Poll::Ready(None)
} else {
// If the buffer still contains batches, prepare to return them.
let batch = self.coalescer.finish_batch()?;
Poll::Ready(Some(Ok(batch)))
};
}
CoalescerState::TargetReached => {
// Combine buffered batches into one batch and return it.
let batch = self.coalescer.finish_batch()?;
// Set to pull state for the next iteration.
poll = Poll::Ready(Some(Ok(batch)));
}
}
break;
}
value => {
poll = Poll::Ready(value);
if self.filter_builder.is_empty() {
if !self.coalescer.is_empty() {
// Combine buffered batches into one batch and return it.
let batch = self.coalescer.finish_batch()?;
// Set to pull state for the next iteration.
poll = Poll::Ready(Some(Ok(batch)));
} else {
poll = Poll::Ready(value);
}
} else {
if let Some(rb) = self.emit()? {
poll = Poll::Ready(Some(Ok(rb)));
} else {
poll = Poll::Ready(value);
}
}

break;
}
}
Expand Down
Loading

0 comments on commit 884aafa

Please sign in to comment.