Skip to content

Commit

Permalink
Improve performance for physical plan creation with many columns (apa…
Browse files Browse the repository at this point in the history
…che#12950)

* Add a benchmark for physical plan creation with many aggregates

* Wrap AggregateFunctionExpr with Arc

Patch f5c47fa removed Arc wrappers for AggregateFunctionExpr.
But, it can be inefficient. When physical optimizer decides to replace a node child to other,
it clones the node (with `with_new_children`). Assume, that node is `AggregateExec` than contains
hundreds aggregates and these aggregates are cloned each time.

This patch returns a Arc wrapping to not clone AggregateFunctionExpr itself but clone a pointer.

* Do not build mapping if parent does not require any

This patch adds a small optimization that can soft the edges on
some queries. If there are no parent requirements we do not need to
build column mapping.
  • Loading branch information
askalt authored Oct 18, 2024
1 parent 8c9b915 commit 10af8a7
Show file tree
Hide file tree
Showing 18 changed files with 165 additions and 119 deletions.
14 changes: 14 additions & 0 deletions datafusion/core/benches/sql_planner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -144,6 +144,20 @@ fn criterion_benchmark(c: &mut Criterion) {
})
});

c.bench_function("physical_select_aggregates_from_200", |b| {
let mut aggregates = String::new();
for i in 0..200 {
if i > 0 {
aggregates.push_str(", ");
}
aggregates.push_str(format!("MAX(a{})", i).as_str());
}
let query = format!("SELECT {} FROM t1", aggregates);
b.iter(|| {
physical_plan(&ctx, &query);
});
});

// --- TPC-H ---

let tpch_ctx = register_defs(SessionContext::new(), tpch_schemas());
Expand Down
10 changes: 6 additions & 4 deletions datafusion/core/src/physical_optimizer/update_aggr_exprs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -131,10 +131,10 @@ impl PhysicalOptimizerRule for OptimizeAggregateOrder {
/// successfully. Any errors occurring during the conversion process are
/// passed through.
fn try_convert_aggregate_if_better(
aggr_exprs: Vec<AggregateFunctionExpr>,
aggr_exprs: Vec<Arc<AggregateFunctionExpr>>,
prefix_requirement: &[PhysicalSortRequirement],
eq_properties: &EquivalenceProperties,
) -> Result<Vec<AggregateFunctionExpr>> {
) -> Result<Vec<Arc<AggregateFunctionExpr>>> {
aggr_exprs
.into_iter()
.map(|aggr_expr| {
Expand All @@ -154,7 +154,7 @@ fn try_convert_aggregate_if_better(
let reqs = concat_slices(prefix_requirement, &aggr_sort_reqs);
if eq_properties.ordering_satisfy_requirement(&reqs) {
// Existing ordering satisfies the aggregator requirements:
aggr_expr.with_beneficial_ordering(true)?
aggr_expr.with_beneficial_ordering(true)?.map(Arc::new)
} else if eq_properties.ordering_satisfy_requirement(&concat_slices(
prefix_requirement,
&reverse_aggr_req,
Expand All @@ -163,12 +163,14 @@ fn try_convert_aggregate_if_better(
// given the existing ordering (if possible):
aggr_expr
.reverse_expr()
.map(Arc::new)
.unwrap_or(aggr_expr)
.with_beneficial_ordering(true)?
.map(Arc::new)
} else {
// There is no beneficial ordering present -- aggregation
// will still work albeit in a less efficient mode.
aggr_expr.with_beneficial_ordering(false)?
aggr_expr.with_beneficial_ordering(false)?.map(Arc::new)
}
.ok_or_else(|| {
plan_datafusion_err!(
Expand Down
5 changes: 3 additions & 2 deletions datafusion/core/src/physical_planner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1523,7 +1523,7 @@ pub fn create_window_expr(
}

type AggregateExprWithOptionalArgs = (
AggregateFunctionExpr,
Arc<AggregateFunctionExpr>,
// The filter clause, if any
Option<Arc<dyn PhysicalExpr>>,
// Ordering requirements, if any
Expand Down Expand Up @@ -1587,7 +1587,8 @@ pub fn create_aggregate_expr_with_name_and_maybe_filter(
.alias(name)
.with_ignore_nulls(ignore_nulls)
.with_distinct(*distinct)
.build()?;
.build()
.map(Arc::new)?;

(agg_expr, filter, physical_sort_exprs)
};
Expand Down
1 change: 1 addition & 0 deletions datafusion/core/tests/fuzz_cases/aggregate_fuzz.rs
Original file line number Diff line number Diff line change
Expand Up @@ -405,6 +405,7 @@ async fn run_aggregate_test(input1: Vec<RecordBatch>, group_by_columns: Vec<&str
.schema(Arc::clone(&schema))
.alias("sum1")
.build()
.map(Arc::new)
.unwrap(),
];
let expr = group_by_columns
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@ fn parquet_exec(schema: &SchemaRef) -> Arc<ParquetExec> {
fn partial_aggregate_exec(
input: Arc<dyn ExecutionPlan>,
group_by: PhysicalGroupBy,
aggr_expr: Vec<AggregateFunctionExpr>,
aggr_expr: Vec<Arc<AggregateFunctionExpr>>,
) -> Arc<dyn ExecutionPlan> {
let schema = input.schema();
let n_aggr = aggr_expr.len();
Expand All @@ -104,7 +104,7 @@ fn partial_aggregate_exec(
fn final_aggregate_exec(
input: Arc<dyn ExecutionPlan>,
group_by: PhysicalGroupBy,
aggr_expr: Vec<AggregateFunctionExpr>,
aggr_expr: Vec<Arc<AggregateFunctionExpr>>,
) -> Arc<dyn ExecutionPlan> {
let schema = input.schema();
let n_aggr = aggr_expr.len();
Expand All @@ -130,11 +130,12 @@ fn count_expr(
expr: Arc<dyn PhysicalExpr>,
name: &str,
schema: &Schema,
) -> AggregateFunctionExpr {
) -> Arc<AggregateFunctionExpr> {
AggregateExprBuilder::new(count_udaf(), vec![expr])
.schema(Arc::new(schema.clone()))
.alias(name)
.build()
.map(Arc::new)
.unwrap()
}

Expand Down Expand Up @@ -218,6 +219,7 @@ fn aggregations_with_group_combined() -> datafusion_common::Result<()> {
.schema(Arc::clone(&schema))
.alias("Sum(b)")
.build()
.map(Arc::new)
.unwrap(),
];
let groups: Vec<(Arc<dyn PhysicalExpr>, String)> =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -347,10 +347,10 @@ fn test_has_aggregate_expression() -> Result<()> {
let single_agg = AggregateExec::try_new(
AggregateMode::Single,
build_group_by(&schema, vec!["a".to_string()]),
vec![agg.count_expr(&schema)], /* aggr_expr */
vec![None], /* filter_expr */
source, /* input */
schema.clone(), /* input_schema */
vec![Arc::new(agg.count_expr(&schema))], /* aggr_expr */
vec![None], /* filter_expr */
source, /* input */
schema.clone(), /* input_schema */
)?;
let limit_exec = LocalLimitExec::new(
Arc::new(single_agg),
Expand Down Expand Up @@ -384,10 +384,10 @@ fn test_has_filter() -> Result<()> {
let single_agg = AggregateExec::try_new(
AggregateMode::Single,
build_group_by(&schema.clone(), vec!["a".to_string()]),
vec![agg.count_expr(&schema)], /* aggr_expr */
vec![filter_expr], /* filter_expr */
source, /* input */
schema.clone(), /* input_schema */
vec![Arc::new(agg.count_expr(&schema))], /* aggr_expr */
vec![filter_expr], /* filter_expr */
source, /* input */
schema.clone(), /* input_schema */
)?;
let limit_exec = LocalLimitExec::new(
Arc::new(single_agg),
Expand Down
2 changes: 1 addition & 1 deletion datafusion/physical-expr/src/aggregate.rs
Original file line number Diff line number Diff line change
Expand Up @@ -328,7 +328,7 @@ impl AggregateFunctionExpr {
/// not implement the method, returns an error. Order insensitive and hard
/// requirement aggregators return `Ok(None)`.
pub fn with_beneficial_ordering(
self,
self: Arc<Self>,
beneficial_ordering: bool,
) -> Result<Option<AggregateFunctionExpr>> {
let Some(updated_fn) = self
Expand Down
4 changes: 4 additions & 0 deletions datafusion/physical-expr/src/utils/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,10 @@ pub fn map_columns_before_projection(
parent_required: &[Arc<dyn PhysicalExpr>],
proj_exprs: &[(Arc<dyn PhysicalExpr>, String)],
) -> Vec<Arc<dyn PhysicalExpr>> {
if parent_required.is_empty() {
// No need to build mapping.
return vec![];
}
let column_mapping = proj_exprs
.iter()
.filter_map(|(expr, name)| {
Expand Down
8 changes: 4 additions & 4 deletions datafusion/physical-expr/src/window/aggregate.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ use crate::{expressions::PhysicalSortExpr, reverse_order_bys, PhysicalExpr};
/// See comments on [`WindowExpr`] for more details.
#[derive(Debug)]
pub struct PlainAggregateWindowExpr {
aggregate: AggregateFunctionExpr,
aggregate: Arc<AggregateFunctionExpr>,
partition_by: Vec<Arc<dyn PhysicalExpr>>,
order_by: Vec<PhysicalSortExpr>,
window_frame: Arc<WindowFrame>,
Expand All @@ -50,7 +50,7 @@ pub struct PlainAggregateWindowExpr {
impl PlainAggregateWindowExpr {
/// Create a new aggregate window function expression
pub fn new(
aggregate: AggregateFunctionExpr,
aggregate: Arc<AggregateFunctionExpr>,
partition_by: &[Arc<dyn PhysicalExpr>],
order_by: &[PhysicalSortExpr],
window_frame: Arc<WindowFrame>,
Expand Down Expand Up @@ -137,14 +137,14 @@ impl WindowExpr for PlainAggregateWindowExpr {
let reverse_window_frame = self.window_frame.reverse();
if reverse_window_frame.start_bound.is_unbounded() {
Arc::new(PlainAggregateWindowExpr::new(
reverse_expr,
Arc::new(reverse_expr),
&self.partition_by.clone(),
&reverse_order_bys(&self.order_by),
Arc::new(self.window_frame.reverse()),
)) as _
} else {
Arc::new(SlidingAggregateWindowExpr::new(
reverse_expr,
Arc::new(reverse_expr),
&self.partition_by.clone(),
&reverse_order_bys(&self.order_by),
Arc::new(self.window_frame.reverse()),
Expand Down
13 changes: 8 additions & 5 deletions datafusion/physical-expr/src/window/sliding_aggregate.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ use crate::{expressions::PhysicalSortExpr, reverse_order_bys, PhysicalExpr};
/// See comments on [`WindowExpr`] for more details.
#[derive(Debug)]
pub struct SlidingAggregateWindowExpr {
aggregate: AggregateFunctionExpr,
aggregate: Arc<AggregateFunctionExpr>,
partition_by: Vec<Arc<dyn PhysicalExpr>>,
order_by: Vec<PhysicalSortExpr>,
window_frame: Arc<WindowFrame>,
Expand All @@ -50,7 +50,7 @@ pub struct SlidingAggregateWindowExpr {
impl SlidingAggregateWindowExpr {
/// Create a new (sliding) aggregate window function expression.
pub fn new(
aggregate: AggregateFunctionExpr,
aggregate: Arc<AggregateFunctionExpr>,
partition_by: &[Arc<dyn PhysicalExpr>],
order_by: &[PhysicalSortExpr],
window_frame: Arc<WindowFrame>,
Expand Down Expand Up @@ -121,14 +121,14 @@ impl WindowExpr for SlidingAggregateWindowExpr {
let reverse_window_frame = self.window_frame.reverse();
if reverse_window_frame.start_bound.is_unbounded() {
Arc::new(PlainAggregateWindowExpr::new(
reverse_expr,
Arc::new(reverse_expr),
&self.partition_by.clone(),
&reverse_order_bys(&self.order_by),
Arc::new(self.window_frame.reverse()),
)) as _
} else {
Arc::new(SlidingAggregateWindowExpr::new(
reverse_expr,
Arc::new(reverse_expr),
&self.partition_by.clone(),
&reverse_order_bys(&self.order_by),
Arc::new(self.window_frame.reverse()),
Expand Down Expand Up @@ -159,7 +159,10 @@ impl WindowExpr for SlidingAggregateWindowExpr {
})
.collect::<Vec<_>>();
Some(Arc::new(SlidingAggregateWindowExpr {
aggregate: self.aggregate.with_new_expressions(args, vec![])?,
aggregate: self
.aggregate
.with_new_expressions(args, vec![])
.map(Arc::new)?,
partition_by: partition_bys,
order_by: new_order_by,
window_frame: Arc::clone(&self.window_frame),
Expand Down
24 changes: 12 additions & 12 deletions datafusion/physical-optimizer/src/aggregate_statistics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -312,7 +312,7 @@ mod tests {
let partial_agg = AggregateExec::try_new(
AggregateMode::Partial,
PhysicalGroupBy::default(),
vec![agg.count_expr(&schema)],
vec![Arc::new(agg.count_expr(&schema))],
vec![None],
source,
Arc::clone(&schema),
Expand All @@ -321,7 +321,7 @@ mod tests {
let final_agg = AggregateExec::try_new(
AggregateMode::Final,
PhysicalGroupBy::default(),
vec![agg.count_expr(&schema)],
vec![Arc::new(agg.count_expr(&schema))],
vec![None],
Arc::new(partial_agg),
Arc::clone(&schema),
Expand All @@ -342,7 +342,7 @@ mod tests {
let partial_agg = AggregateExec::try_new(
AggregateMode::Partial,
PhysicalGroupBy::default(),
vec![agg.count_expr(&schema)],
vec![Arc::new(agg.count_expr(&schema))],
vec![None],
source,
Arc::clone(&schema),
Expand All @@ -351,7 +351,7 @@ mod tests {
let final_agg = AggregateExec::try_new(
AggregateMode::Final,
PhysicalGroupBy::default(),
vec![agg.count_expr(&schema)],
vec![Arc::new(agg.count_expr(&schema))],
vec![None],
Arc::new(partial_agg),
Arc::clone(&schema),
Expand All @@ -371,7 +371,7 @@ mod tests {
let partial_agg = AggregateExec::try_new(
AggregateMode::Partial,
PhysicalGroupBy::default(),
vec![agg.count_expr(&schema)],
vec![Arc::new(agg.count_expr(&schema))],
vec![None],
source,
Arc::clone(&schema),
Expand All @@ -383,7 +383,7 @@ mod tests {
let final_agg = AggregateExec::try_new(
AggregateMode::Final,
PhysicalGroupBy::default(),
vec![agg.count_expr(&schema)],
vec![Arc::new(agg.count_expr(&schema))],
vec![None],
Arc::new(coalesce),
Arc::clone(&schema),
Expand All @@ -403,7 +403,7 @@ mod tests {
let partial_agg = AggregateExec::try_new(
AggregateMode::Partial,
PhysicalGroupBy::default(),
vec![agg.count_expr(&schema)],
vec![Arc::new(agg.count_expr(&schema))],
vec![None],
source,
Arc::clone(&schema),
Expand All @@ -415,7 +415,7 @@ mod tests {
let final_agg = AggregateExec::try_new(
AggregateMode::Final,
PhysicalGroupBy::default(),
vec![agg.count_expr(&schema)],
vec![Arc::new(agg.count_expr(&schema))],
vec![None],
Arc::new(coalesce),
Arc::clone(&schema),
Expand Down Expand Up @@ -446,7 +446,7 @@ mod tests {
let partial_agg = AggregateExec::try_new(
AggregateMode::Partial,
PhysicalGroupBy::default(),
vec![agg.count_expr(&schema)],
vec![Arc::new(agg.count_expr(&schema))],
vec![None],
filter,
Arc::clone(&schema),
Expand All @@ -455,7 +455,7 @@ mod tests {
let final_agg = AggregateExec::try_new(
AggregateMode::Final,
PhysicalGroupBy::default(),
vec![agg.count_expr(&schema)],
vec![Arc::new(agg.count_expr(&schema))],
vec![None],
Arc::new(partial_agg),
Arc::clone(&schema),
Expand Down Expand Up @@ -491,7 +491,7 @@ mod tests {
let partial_agg = AggregateExec::try_new(
AggregateMode::Partial,
PhysicalGroupBy::default(),
vec![agg.count_expr(&schema)],
vec![Arc::new(agg.count_expr(&schema))],
vec![None],
filter,
Arc::clone(&schema),
Expand All @@ -500,7 +500,7 @@ mod tests {
let final_agg = AggregateExec::try_new(
AggregateMode::Final,
PhysicalGroupBy::default(),
vec![agg.count_expr(&schema)],
vec![Arc::new(agg.count_expr(&schema))],
vec![None],
Arc::new(partial_agg),
Arc::clone(&schema),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -125,7 +125,7 @@ impl PhysicalOptimizerRule for CombinePartialFinalAggregate {

type GroupExprsRef<'a> = (
&'a PhysicalGroupBy,
&'a [AggregateFunctionExpr],
&'a [Arc<AggregateFunctionExpr>],
&'a [Option<Arc<dyn PhysicalExpr>>],
);

Expand Down
Loading

0 comments on commit 10af8a7

Please sign in to comment.