Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Move First Value UDAF and builtin first / last function to aggregate-functions #9960

Merged
merged 40 commits into from
Apr 7, 2024
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
Show all changes
40 commits
Select commit Hold shift + click to select a range
0eaf289
backup
jayzhan211 Apr 3, 2024
5338f61
move PhysicalExpr
jayzhan211 Apr 3, 2024
450ae4b
cleanup
jayzhan211 Apr 3, 2024
3624964
move physical sort
jayzhan211 Apr 3, 2024
835f147
cleanup dependencies
jayzhan211 Apr 3, 2024
c5d80c8
add readme
jayzhan211 Apr 3, 2024
7851de7
disable doc test
jayzhan211 Apr 3, 2024
f5aafb3
move column
jayzhan211 Apr 4, 2024
7bfc074
fmt
jayzhan211 Apr 4, 2024
675d2fe
move aggregatexp
jayzhan211 Apr 4, 2024
5220087
move other two utils
jayzhan211 Apr 4, 2024
113a000
license
jayzhan211 Apr 4, 2024
fea87e3
switch to ignore
jayzhan211 Apr 4, 2024
06d87bc
move reverse order
jayzhan211 Apr 4, 2024
26e5782
rename to common
jayzhan211 Apr 4, 2024
26f852c
cleanup
jayzhan211 Apr 5, 2024
65bf4a1
Merge branch 'physical-expr-core' into move-agg-crate-2
jayzhan211 Apr 5, 2024
2bc58c1
backup
jayzhan211 Apr 5, 2024
ae9db96
Merge branch 'physical-expr-core' into move-agg-crate-2
jayzhan211 Apr 5, 2024
30d5576
move acc to first value
jayzhan211 Apr 5, 2024
672edc7
move builtin expr too
jayzhan211 Apr 5, 2024
109b790
use macro
jayzhan211 Apr 5, 2024
87d589f
fmt
jayzhan211 Apr 5, 2024
398e4e2
fix doc
jayzhan211 Apr 5, 2024
04c7f5e
add todo
jayzhan211 Apr 5, 2024
01a1ddf
rm comments
jayzhan211 Apr 5, 2024
4871414
rm unused
jayzhan211 Apr 5, 2024
1ef212b
rm unused code
jayzhan211 Apr 5, 2024
b6d53a5
change to private
jayzhan211 Apr 5, 2024
9aa15a2
fix lock
jayzhan211 Apr 5, 2024
e90464b
cleanup
jayzhan211 Apr 5, 2024
ece925f
cleanup
jayzhan211 Apr 5, 2024
89ccc89
support roundtrip
jayzhan211 Apr 5, 2024
41a830a
remmove old format state
jayzhan211 Apr 6, 2024
d235d2a
move aggregate related things to aggr crate
jayzhan211 Apr 6, 2024
51cd272
move back to common
jayzhan211 Apr 6, 2024
38b2ce7
taplo
jayzhan211 Apr 6, 2024
9c7767c
rm comment
jayzhan211 Apr 7, 2024
ea4adde
cleanup
jayzhan211 Apr 7, 2024
39c5d15
lock
jayzhan211 Apr 7, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions datafusion-cli/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 2 additions & 1 deletion datafusion/core/src/execution/context/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ use crate::{
datasource::{provider_as_source, MemTable, TableProvider, ViewTable},
error::{DataFusionError, Result},
execution::{options::ArrowReadOptions, runtime_env::RuntimeEnv, FunctionRegistry},
logical_expr::AggregateUDF,
logical_expr::{
CreateCatalog, CreateCatalogSchema, CreateExternalTable, CreateFunction,
CreateMemoryTable, CreateView, DropCatalogSchema, DropFunction, DropTable,
Expand All @@ -53,7 +54,7 @@ use crate::{
optimizer::analyzer::{Analyzer, AnalyzerRule},
optimizer::optimizer::{Optimizer, OptimizerConfig, OptimizerRule},
physical_optimizer::optimizer::{PhysicalOptimizer, PhysicalOptimizerRule},
physical_plan::{udaf::AggregateUDF, udf::ScalarUDF, ExecutionPlan},
physical_plan::{udf::ScalarUDF, ExecutionPlan},
physical_planner::{DefaultPhysicalPlanner, PhysicalPlanner},
variable::{VarProvider, VarType},
};
Expand Down
6 changes: 2 additions & 4 deletions datafusion/functions-aggregate/src/first_last.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@

//! Defines the FIRST_VALUE/LAST_VALUE aggregations.

use crate::utils::{down_cast_any_ref, get_sort_options, ordering_fields};
use crate::AggregateExpr;
use arrow::array::{ArrayRef, AsArray, BooleanArray};
use arrow::compute::{self, lexsort_to_indices, SortColumn, SortOptions};
use arrow::datatypes::{DataType, Field};
Expand All @@ -28,10 +30,6 @@ use datafusion_expr::function::AccumulatorArgs;
use datafusion_expr::type_coercion::aggregates::NUMERICS;
use datafusion_expr::utils::format_state_name;
use datafusion_expr::{Accumulator, AggregateUDFImpl, Expr, Signature, Volatility};
use datafusion_physical_expr_common::aggregate::utils::{
down_cast_any_ref, get_sort_options, ordering_fields,
};
use datafusion_physical_expr_common::aggregate::AggregateExpr;
use datafusion_physical_expr_common::expressions;
use datafusion_physical_expr_common::physical_expr::PhysicalExpr;
use datafusion_physical_expr_common::sort_expr::{LexOrdering, PhysicalSortExpr};
Expand Down
277 changes: 272 additions & 5 deletions datafusion/functions-aggregate/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -51,17 +51,27 @@
//!
//! 4. Use the `make_package!` macro to expose the module when the
//! feature is enabled.
use std::sync::Arc;

#[macro_use]
pub mod macros;

use datafusion_common::Result;
pub mod first_last;
pub mod utils;

use arrow::datatypes::{DataType, Field, Schema};
use datafusion_common::{not_impl_err, Result};
use datafusion_execution::FunctionRegistry;
use datafusion_expr::AggregateUDF;
use datafusion_expr::{
function::AccumulatorArgs, Accumulator, AggregateUDF, Expr, GroupsAccumulator,
};
use datafusion_physical_expr_common::{
physical_expr::PhysicalExpr,
sort_expr::{LexOrdering, PhysicalSortExpr},
};
use log::debug;

pub mod first_last;
use std::fmt::Debug;
use std::{any::Any, sync::Arc};
use utils::{down_cast_any_ref, ordering_fields};

/// Fluent-style API for creating `Expr`s
pub mod expr_fn {
Expand All @@ -82,3 +92,260 @@ pub fn register_all(registry: &mut dyn FunctionRegistry) -> Result<()> {

Ok(())
}

/// Creates a physical expression of the UDAF, that includes all necessary type coercion.
/// This function errors when `args`' can't be coerced to a valid argument type of the UDAF.
pub fn create_aggregate_expr(
fun: &AggregateUDF,
input_phy_exprs: &[Arc<dyn PhysicalExpr>],
sort_exprs: &[Expr],
ordering_req: &[PhysicalSortExpr],
schema: &Schema,
name: impl Into<String>,
ignore_nulls: bool,
) -> Result<Arc<dyn AggregateExpr>> {
let input_exprs_types = input_phy_exprs
.iter()
.map(|arg| arg.data_type(schema))
.collect::<Result<Vec<_>>>()?;

let ordering_types = ordering_req
.iter()
.map(|e| e.expr.data_type(schema))
.collect::<Result<Vec<_>>>()?;

let ordering_fields = ordering_fields(ordering_req, &ordering_types);

Ok(Arc::new(AggregateFunctionExpr {
fun: fun.clone(),
args: input_phy_exprs.to_vec(),
data_type: fun.return_type(&input_exprs_types)?,
name: name.into(),
schema: schema.clone(),
sort_exprs: sort_exprs.to_vec(),
ordering_req: ordering_req.to_vec(),
ignore_nulls,
ordering_fields,
}))
}

/// An aggregate expression that:
/// * knows its resulting field
/// * knows how to create its accumulator
/// * knows its accumulator's state's field
/// * knows the expressions from whose its accumulator will receive values
///
/// Any implementation of this trait also needs to implement the
/// `PartialEq<dyn Any>` to allows comparing equality between the
/// trait objects.
pub trait AggregateExpr: Send + Sync + Debug + PartialEq<dyn Any> {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I also move AggregateFunctionExpr and create_aggregate_expr to functions-aggregate from physical-expr-common

While I agree that these two structures are aggregate specific, I still think they should be in datafusion-physical-expr-common because that would allow people to use DataFusion and create their own aggregate functions without having to take DataFusion's built in aggregate functions

I see real value in keeping the APIs required to use datafusion separate from the actual implementations

Copy link
Contributor Author

@jayzhan211 jayzhan211 Apr 6, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree that we should avoid the user pulling out unnecessary things for them. I thought they would need builtin functions, but they probably don't. I will keep them in common

/// Returns the aggregate expression as [`Any`] so that it can be
/// downcast to a specific implementation.
fn as_any(&self) -> &dyn Any;

/// the field of the final result of this aggregation.
fn field(&self) -> Result<Field>;

/// the accumulator used to accumulate values from the expressions.
/// the accumulator expects the same number of arguments as `expressions` and must
/// return states with the same description as `state_fields`
fn create_accumulator(&self) -> Result<Box<dyn Accumulator>>;

/// the fields that encapsulate the Accumulator's state
/// the number of fields here equals the number of states that the accumulator contains
fn state_fields(&self) -> Result<Vec<Field>>;

/// expressions that are passed to the Accumulator.
/// Single-column aggregations such as `sum` return a single value, others (e.g. `cov`) return many.
fn expressions(&self) -> Vec<Arc<dyn PhysicalExpr>>;

/// Order by requirements for the aggregate function
/// By default it is `None` (there is no requirement)
/// Order-sensitive aggregators, such as `FIRST_VALUE(x ORDER BY y)` should implement this
fn order_bys(&self) -> Option<&[PhysicalSortExpr]> {
None
}

/// Human readable name such as `"MIN(c2)"`. The default
/// implementation returns placeholder text.
fn name(&self) -> &str {
"AggregateExpr: default name"
}

/// If the aggregate expression has a specialized
/// [`GroupsAccumulator`] implementation. If this returns true,
/// `[Self::create_groups_accumulator`] will be called.
fn groups_accumulator_supported(&self) -> bool {
false
}

/// Return a specialized [`GroupsAccumulator`] that manages state
/// for all groups.
///
/// For maximum performance, a [`GroupsAccumulator`] should be
/// implemented in addition to [`Accumulator`].
fn create_groups_accumulator(&self) -> Result<Box<dyn GroupsAccumulator>> {
not_impl_err!("GroupsAccumulator hasn't been implemented for {self:?} yet")
}

/// Construct an expression that calculates the aggregate in reverse.
/// Typically the "reverse" expression is itself (e.g. SUM, COUNT).
/// For aggregates that do not support calculation in reverse,
/// returns None (which is the default value).
fn reverse_expr(&self) -> Option<Arc<dyn AggregateExpr>> {
None
}

/// Creates accumulator implementation that supports retract
fn create_sliding_accumulator(&self) -> Result<Box<dyn Accumulator>> {
not_impl_err!("Retractable Accumulator hasn't been implemented for {self:?} yet")
}
}

/// Physical aggregate expression of a UDAF.
#[derive(Debug)]
pub struct AggregateFunctionExpr {
fun: AggregateUDF,
args: Vec<Arc<dyn PhysicalExpr>>,
/// Output / return type of this aggregate
data_type: DataType,
name: String,
schema: Schema,
// The logical order by expressions
sort_exprs: Vec<Expr>,
// The physical order by expressions
ordering_req: LexOrdering,
ignore_nulls: bool,
ordering_fields: Vec<Field>,
}

impl AggregateFunctionExpr {
/// Return the `AggregateUDF` used by this `AggregateFunctionExpr`
pub fn fun(&self) -> &AggregateUDF {
&self.fun
}
}

impl AggregateExpr for AggregateFunctionExpr {
/// Return a reference to Any that can be used for downcasting
fn as_any(&self) -> &dyn Any {
self
}

fn expressions(&self) -> Vec<Arc<dyn PhysicalExpr>> {
self.args.clone()
}

fn state_fields(&self) -> Result<Vec<Field>> {
self.fun.state_fields(
self.name(),
self.data_type.clone(),
self.ordering_fields.clone(),
)
}

fn field(&self) -> Result<Field> {
Ok(Field::new(&self.name, self.data_type.clone(), true))
}

fn create_accumulator(&self) -> Result<Box<dyn Accumulator>> {
let acc_args = AccumulatorArgs::new(
&self.data_type,
&self.schema,
self.ignore_nulls,
&self.sort_exprs,
);

self.fun.accumulator(acc_args)
}

fn create_sliding_accumulator(&self) -> Result<Box<dyn Accumulator>> {
let accumulator = self.create_accumulator()?;

// Accumulators that have window frame startings different
// than `UNBOUNDED PRECEDING`, such as `1 PRECEEDING`, need to
// implement retract_batch method in order to run correctly
// currently in DataFusion.
//
// If this `retract_batches` is not present, there is no way
// to calculate result correctly. For example, the query
//
// ```sql
// SELECT
// SUM(a) OVER(ORDER BY a ROWS BETWEEN 1 PRECEDING AND 1 FOLLOWING) AS sum_a
// FROM
// t
// ```
//
// 1. First sum value will be the sum of rows between `[0, 1)`,
//
// 2. Second sum value will be the sum of rows between `[0, 2)`
//
// 3. Third sum value will be the sum of rows between `[1, 3)`, etc.
//
// Since the accumulator keeps the running sum:
//
// 1. First sum we add to the state sum value between `[0, 1)`
//
// 2. Second sum we add to the state sum value between `[1, 2)`
// (`[0, 1)` is already in the state sum, hence running sum will
// cover `[0, 2)` range)
//
// 3. Third sum we add to the state sum value between `[2, 3)`
// (`[0, 2)` is already in the state sum). Also we need to
// retract values between `[0, 1)` by this way we can obtain sum
// between [1, 3) which is indeed the apropriate range.
//
// When we use `UNBOUNDED PRECEDING` in the query starting
// index will always be 0 for the desired range, and hence the
// `retract_batch` method will not be called. In this case
// having retract_batch is not a requirement.
//
// This approach is a a bit different than window function
// approach. In window function (when they use a window frame)
// they get all the desired range during evaluation.
if !accumulator.supports_retract_batch() {
return not_impl_err!(
"Aggregate can not be used as a sliding accumulator because \
`retract_batch` is not implemented: {}",
self.name
);
}
Ok(accumulator)
}

fn name(&self) -> &str {
&self.name
}

fn groups_accumulator_supported(&self) -> bool {
self.fun.groups_accumulator_supported()
}

fn create_groups_accumulator(&self) -> Result<Box<dyn GroupsAccumulator>> {
self.fun.create_groups_accumulator()
}

fn order_bys(&self) -> Option<&[PhysicalSortExpr]> {
(!self.ordering_req.is_empty()).then_some(&self.ordering_req)
}
}

impl PartialEq<dyn Any> for AggregateFunctionExpr {
fn eq(&self, other: &dyn Any) -> bool {
down_cast_any_ref(other)
.downcast_ref::<Self>()
.map(|x| {
self.name == x.name
&& self.data_type == x.data_type
&& self.fun == x.fun
&& self.args.len() == x.args.len()
&& self
.args
.iter()
.zip(x.args.iter())
.all(|(this_arg, other_arg)| this_arg.eq(other_arg))
})
.unwrap_or(false)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,10 +21,9 @@ use arrow::{
compute::SortOptions,
datatypes::{DataType, Field},
};
use datafusion_physical_expr_common::sort_expr::PhysicalSortExpr;

use crate::sort_expr::PhysicalSortExpr;

use super::AggregateExpr;
use crate::AggregateExpr;

/// Downcast a `Box<dyn AggregateExpr>` or `Arc<dyn AggregateExpr>`
/// and return the inner trait object as [`Any`] so
Expand Down
Loading
Loading