Skip to content

Commit

Permalink
rm old workflow
Browse files Browse the repository at this point in the history
Signed-off-by: jayzhan211 <[email protected]>
  • Loading branch information
jayzhan211 committed May 25, 2024
1 parent 47ae11f commit 25dcb64
Show file tree
Hide file tree
Showing 10 changed files with 69 additions and 25 deletions.
1 change: 1 addition & 0 deletions datafusion-cli/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

6 changes: 5 additions & 1 deletion datafusion-examples/examples/simplify_udaf_expression.rs
Original file line number Diff line number Diff line change
Expand Up @@ -78,9 +78,13 @@ impl AggregateUDFImpl for BetterAvgUdaf {
true
}

fn create_groups_accumulator(&self) -> Result<Box<dyn GroupsAccumulator>> {
fn create_groups_accumulator(
&self,
_args: AccumulatorArgs,
) -> Result<Box<dyn GroupsAccumulator>> {
unimplemented!("should not get here");
}

// we override method, to return new expression which would substitute
// user defined function call
fn simplify(&self) -> Option<AggregateFunctionSimplification> {
Expand Down
10 changes: 6 additions & 4 deletions datafusion/core/tests/user_defined/user_defined_aggregates.rs
Original file line number Diff line number Diff line change
Expand Up @@ -45,9 +45,8 @@ use datafusion::{
};
use datafusion_common::{assert_contains, cast::as_primitive_array, exec_err};
use datafusion_expr::{
create_udaf,
function::{AccumulatorArgs, GroupsAccumulatorArgs},
AggregateUDFImpl, GroupsAccumulator, SimpleAggregateUDF,
create_udaf, function::AccumulatorArgs, AggregateUDFImpl, GroupsAccumulator,
SimpleAggregateUDF,
};
use datafusion_physical_expr::expressions::AvgAccumulator;

Expand Down Expand Up @@ -730,7 +729,10 @@ impl AggregateUDFImpl for TestGroupsAccumulator {
true
}

fn create_groups_accumulator(&self) -> Result<Box<dyn GroupsAccumulator>> {
fn create_groups_accumulator(
&self,
_args: AccumulatorArgs,
) -> Result<Box<dyn GroupsAccumulator>> {
Ok(Box::new(self.clone()))
}
}
Expand Down
15 changes: 11 additions & 4 deletions datafusion/expr/src/expr.rs
Original file line number Diff line number Diff line change
Expand Up @@ -634,10 +634,14 @@ impl WindowFunctionDefinition {
impl fmt::Display for WindowFunctionDefinition {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
match self {
WindowFunctionDefinition::AggregateFunction(fun) => fun.fmt(f),
WindowFunctionDefinition::BuiltInWindowFunction(fun) => fun.fmt(f),
WindowFunctionDefinition::AggregateUDF(fun) => std::fmt::Debug::fmt(fun, f),
WindowFunctionDefinition::WindowUDF(fun) => fun.fmt(f),
WindowFunctionDefinition::AggregateFunction(fun) => {
std::fmt::Display::fmt(fun, f)
}
WindowFunctionDefinition::BuiltInWindowFunction(fun) => {
std::fmt::Display::fmt(fun, f)
}
WindowFunctionDefinition::AggregateUDF(fun) => std::fmt::Display::fmt(fun, f),
WindowFunctionDefinition::WindowUDF(fun) => std::fmt::Display::fmt(fun, f),
}
}
}
Expand Down Expand Up @@ -694,6 +698,9 @@ pub fn find_df_window_func(name: &str) -> Option<WindowFunctionDefinition> {
Some(WindowFunctionDefinition::BuiltInWindowFunction(
built_in_function,
))
// filter out aggregate function that is udaf
} else if name.as_str() == "sum" {
None
} else if let Ok(aggregate) =
aggregate_function::AggregateFunction::from_str(name.as_str())
{
Expand Down
26 changes: 24 additions & 2 deletions datafusion/expr/src/expr_schema.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ use crate::type_coercion::binary::get_result_type;
use crate::type_coercion::functions::{
data_types_with_aggregate_udf, data_types_with_scalar_udf,
};
use crate::{utils, LogicalPlan, Projection, Subquery};
use crate::{utils, LogicalPlan, Projection, Subquery, WindowFunctionDefinition};
use arrow::compute::can_cast_types;
use arrow::datatypes::{DataType, Field};
use datafusion_common::{
Expand Down Expand Up @@ -160,7 +160,29 @@ impl ExprSchemable for Expr {
.iter()
.map(|e| e.get_type(schema))
.collect::<Result<Vec<_>>>()?;
fun.return_type(&data_types)
match fun {
WindowFunctionDefinition::AggregateUDF(udf) => {
let new_types = data_types_with_aggregate_udf(&data_types, udf).map_err(|err| {
plan_datafusion_err!(
"{} and {}",
err,
utils::generate_signature_error_msg(
fun.name(),
fun.signature().clone(),
&data_types
)
)
})?;
Ok(fun.return_type(&new_types)?)
}
_ => {
let data_types = args
.iter()
.map(|e| e.get_type(schema))
.collect::<Result<Vec<_>>>()?;
fun.return_type(&data_types)
}
}
}
Expr::AggregateFunction(AggregateFunction { func_def, args, .. }) => {
let data_types = args
Expand Down
6 changes: 6 additions & 0 deletions datafusion/expr/src/udaf.rs
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,12 @@ impl std::hash::Hash for AggregateUDF {
}
}

impl std::fmt::Display for AggregateUDF {
fn fmt(&self, f: &mut Formatter) -> fmt::Result {
write!(f, "{}", self.name())
}
}

impl AggregateUDF {
/// Create a new AggregateUDF
///
Expand Down
7 changes: 7 additions & 0 deletions datafusion/optimizer/src/analyzer/type_coercion.rs
Original file line number Diff line number Diff line change
Expand Up @@ -430,6 +430,13 @@ impl<'a> TreeNodeRewriter for TypeCoercionRewriter<'a> {
&fun.signature(),
)?
}
expr::WindowFunctionDefinition::AggregateUDF(udf) => {
coerce_arguments_for_signature_with_aggregate_udf(
args,
self.schema,
udf,
)?
}
_ => args,
};

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3788,7 +3788,10 @@ mod tests {
unimplemented!("not needed for testing")
}

fn create_groups_accumulator(&self) -> Result<Box<dyn GroupsAccumulator>> {
fn create_groups_accumulator(
&self,
_args: AccumulatorArgs,
) -> Result<Box<dyn GroupsAccumulator>> {
unimplemented!("not needed for testing")
}

Expand Down
15 changes: 4 additions & 11 deletions datafusion/physical-expr/src/aggregate/build_in.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ use std::sync::Arc;

use arrow::datatypes::Schema;

use datafusion_common::{exec_err, not_impl_err, Result};
use datafusion_common::{exec_err, internal_err, not_impl_err, Result};
use datafusion_expr::AggregateFunction;

use crate::aggregate::regr::RegrType;
Expand Down Expand Up @@ -104,16 +104,9 @@ pub fn create_aggregate_expr(
name,
data_type,
)),
(AggregateFunction::Sum, false) => Arc::new(expressions::Sum::new(
input_phy_exprs[0].clone(),
name,
input_phy_types[0].clone(),
)),
(AggregateFunction::Sum, true) => Arc::new(expressions::DistinctSum::new(
vec![input_phy_exprs[0].clone()],
name,
data_type,
)),
(AggregateFunction::Sum, _) => {
return internal_err!("Builtin Sum will be removed");
}
(AggregateFunction::ApproxDistinct, _) => Arc::new(
expressions::ApproxDistinct::new(input_phy_exprs[0].clone(), name, data_type),
),
Expand Down
3 changes: 1 addition & 2 deletions datafusion/physical-plan/src/windows/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -119,13 +119,12 @@ pub fn create_window_expr(
WindowFunctionDefinition::AggregateUDF(fun) => {
// TODO: Ordering not supported for Window UDFs yet
let sort_exprs = &[];
let ordering_req = &[];

let aggregate = udaf::create_aggregate_expr(
fun.as_ref(),
args,
sort_exprs,
ordering_req,
order_by,
input_schema,
name,
ignore_nulls,
Expand Down

0 comments on commit 25dcb64

Please sign in to comment.