Skip to content

Commit

Permalink
introduce expr builder
Browse files Browse the repository at this point in the history
Signed-off-by: jayzhan211 <[email protected]>
  • Loading branch information
jayzhan211 committed May 17, 2024
1 parent d2fb05e commit fbb87c6
Show file tree
Hide file tree
Showing 7 changed files with 175 additions and 15 deletions.
11 changes: 11 additions & 0 deletions datafusion-cli/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions datafusion/functions-aggregate/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ path = "src/lib.rs"

[dependencies]
arrow = { workspace = true }
concat-idents = "1.1.5"
datafusion-common = { workspace = true }
datafusion-execution = { workspace = true }
datafusion-expr = { workspace = true }
Expand Down
86 changes: 86 additions & 0 deletions datafusion/functions-aggregate/src/expr_builder.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,86 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

use std::sync::Arc;

use datafusion_expr::{expr::AggregateFunction, Expr};
use sqlparser::ast::NullTreatment;

pub struct ExprBuilder {
udf: Arc<crate::AggregateUDF>,
/// List of expressions to feed to the functions as arguments
args: Vec<Expr>,
/// Whether this is a DISTINCT aggregation or not
distinct: bool,
/// Optional filter
filter: Option<Box<Expr>>,
/// Optional ordering
order_by: Option<Vec<Expr>>,
null_treatment: Option<NullTreatment>,
}

impl ExprBuilder {
pub fn new(udf: Arc<crate::AggregateUDF>, args: Vec<Expr>) -> Self {
Self {
udf,
args,
distinct: false,
filter: None,
order_by: None,
null_treatment: None,
}
}

pub fn new_distinct(udf: Arc<crate::AggregateUDF>, args: Vec<Expr>) -> Self {
Self {
udf,
args,
distinct: true,
filter: None,
order_by: None,
null_treatment: None,
}
}
}

impl ExprBuilder {
pub fn build(self) -> Expr {
Expr::AggregateFunction(AggregateFunction::new_udf(
self.udf,
self.args,
self.distinct,
self.filter,
self.order_by,
self.null_treatment,
))
}

pub fn order_by(mut self, order_by: Vec<Expr>) -> Self {
self.order_by = Some(order_by);
self
}

pub fn filter(mut self, filter: Box<Expr>) -> Self {
self.filter = Some(filter);
self
}

pub fn null_treatment(mut self, null_treatment: NullTreatment) -> Self {
self.null_treatment = Some(null_treatment);
self
}
}
2 changes: 2 additions & 0 deletions datafusion/functions-aggregate/src/first_last.rs
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,8 @@ use std::sync::Arc;
make_udaf_expr_and_func!(
FirstValue,
first_value,
expression,
order_by,
"Returns the first value in a group of values.",
first_value_udaf
);
Expand Down
6 changes: 6 additions & 0 deletions datafusion/functions-aggregate/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@
pub mod macros;

pub mod covariance;
pub mod expr_builder;
pub mod first_last;

use datafusion_common::Result;
Expand All @@ -66,8 +67,13 @@ use std::sync::Arc;

/// Fluent-style API for creating `Expr`s
pub mod expr_fn {
pub use super::covariance::covar_pop;
pub use super::covariance::covar_samp;
pub use super::first_last::first_value;

pub use super::covariance::covar_pop_builder;
pub use super::covariance::covar_samp_builder;
pub use super::first_last::first_value_builder;
}

/// Registers all enabled packages with a [`FunctionRegistry`]
Expand Down
73 changes: 61 additions & 12 deletions datafusion/functions-aggregate/src/macros.rs
Original file line number Diff line number Diff line change
Expand Up @@ -48,49 +48,98 @@ macro_rules! make_udaf_expr_and_func {
None,
))
}

create_builder!(
$EXPR_FN,
$($arg)*,
$DOC,
$AGGREGATE_UDF_FN
);

create_func!($UDAF, $AGGREGATE_UDF_FN);
};
($UDAF:ty, $EXPR_FN:ident, $($arg:ident)*, $distinct:ident, $DOC:expr, $AGGREGATE_UDF_FN:ident) => {
($UDAF:ty, $EXPR_FN:ident, $($arg:ident)*, $order_by: ident, $DOC:expr, $AGGREGATE_UDF_FN:ident) => {
// "fluent expr_fn" style function
#[doc = $DOC]
pub fn $EXPR_FN(
$($arg: datafusion_expr::Expr,)*
distinct: bool,
order_by: Option<Vec<datafusion_expr::Expr>>,
) -> datafusion_expr::Expr {
datafusion_expr::Expr::AggregateFunction(datafusion_expr::expr::AggregateFunction::new_udf(
$AGGREGATE_UDF_FN(),
vec![$($arg),*],
distinct,
false,
None,
order_by,
None,
None
))
}

create_builder!(
$EXPR_FN,
$($arg)*,
$DOC,
$AGGREGATE_UDF_FN
);

create_func!($UDAF, $AGGREGATE_UDF_FN);
};
($UDAF:ty, $EXPR_FN:ident, $DOC:expr, $AGGREGATE_UDF_FN:ident) => {
// "fluent expr_fn" style function
#[doc = $DOC]
pub fn $EXPR_FN(
args: Vec<datafusion_expr::Expr>,
distinct: bool,
filter: Option<Box<datafusion_expr::Expr>>,
order_by: Option<Vec<datafusion_expr::Expr>>,
null_treatment: Option<sqlparser::ast::NullTreatment>
) -> datafusion_expr::Expr {
datafusion_expr::Expr::AggregateFunction(datafusion_expr::expr::AggregateFunction::new_udf(
$AGGREGATE_UDF_FN(),
args,
distinct,
filter,
order_by,
null_treatment,
false,
None,
None,
None,
))
}

create_builder!(
$EXPR_FN,
$DOC,
$AGGREGATE_UDF_FN
);

create_func!($UDAF, $AGGREGATE_UDF_FN);
};
}

macro_rules! create_builder {
($EXPR_FN:ident, $($arg:ident)*, $DOC:expr, $AGGREGATE_UDF_FN:ident) => {
concat_idents::concat_idents!(builder_fn_name = $EXPR_FN, _builder {
#[doc = $DOC]
pub fn builder_fn_name(
$($arg: datafusion_expr::Expr,)*
) -> crate::expr_builder::ExprBuilder {
crate::expr_builder::ExprBuilder::new(
$AGGREGATE_UDF_FN(),
vec![$($arg),*],
)
}
});
};

($EXPR_FN:ident, $DOC:expr, $AGGREGATE_UDF_FN:ident) => {
concat_idents::concat_idents!(builder_fn_name = $EXPR_FN, _builder {
#[doc = $DOC]
pub fn builder_fn_name(
args: Vec<datafusion_expr::Expr>,
) -> crate::expr_builder::ExprBuilder {
crate::expr_builder::ExprBuilder::new(
$AGGREGATE_UDF_FN(),
args,
)
}
});
};
}

macro_rules! create_func {
($UDAF:ty, $AGGREGATE_UDF_FN:ident) => {
paste::paste! {
Expand Down
11 changes: 8 additions & 3 deletions datafusion/proto/tests/cases/roundtrip_logical_plan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,8 +31,10 @@ use datafusion::datasource::TableProvider;
use datafusion::execution::context::SessionState;
use datafusion::execution::runtime_env::{RuntimeConfig, RuntimeEnv};
use datafusion::execution::FunctionRegistry;
use datafusion::functions_aggregate::covariance::{covar_pop, covar_samp};
use datafusion::functions_aggregate::expr_fn::first_value;
use datafusion::functions_aggregate::expr_fn::{
covar_pop, covar_pop_builder, covar_samp, covar_samp_builder, first_value,
first_value_builder,
};
use datafusion::prelude::*;
use datafusion::test_util::{TestTableFactory, TestTableProvider};
use datafusion_common::config::{FormatOptions, TableOptions};
Expand Down Expand Up @@ -621,9 +623,12 @@ async fn roundtrip_expr_api() -> Result<()> {
lit(1),
),
array_replace_all(make_array(vec![lit(1), lit(2), lit(3)]), lit(2), lit(4)),
first_value(vec![lit(1)], false, None, None, None),
first_value(lit(1), Some(vec![lit(2)])),
first_value_builder(lit(1)).order_by(vec![lit(3)]).build(),
covar_samp(lit(1.5), lit(2.2)),
covar_samp_builder(lit(1.5), lit(2.3)).build(),
covar_pop(lit(1.5), lit(2.2)),
covar_pop_builder(lit(1.5), lit(2.3)).build(),
];

// ensure expressions created with the expr api can be round tripped
Expand Down

0 comments on commit fbb87c6

Please sign in to comment.