Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Move First Value UDAF and builtin first / last function to aggregate-functions #9960

Merged
merged 40 commits into from
Apr 7, 2024
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
Show all changes
40 commits
Select commit Hold shift + click to select a range
0eaf289
backup
jayzhan211 Apr 3, 2024
5338f61
move PhysicalExpr
jayzhan211 Apr 3, 2024
450ae4b
cleanup
jayzhan211 Apr 3, 2024
3624964
move physical sort
jayzhan211 Apr 3, 2024
835f147
cleanup dependencies
jayzhan211 Apr 3, 2024
c5d80c8
add readme
jayzhan211 Apr 3, 2024
7851de7
disable doc test
jayzhan211 Apr 3, 2024
f5aafb3
move column
jayzhan211 Apr 4, 2024
7bfc074
fmt
jayzhan211 Apr 4, 2024
675d2fe
move aggregatexp
jayzhan211 Apr 4, 2024
5220087
move other two utils
jayzhan211 Apr 4, 2024
113a000
license
jayzhan211 Apr 4, 2024
fea87e3
switch to ignore
jayzhan211 Apr 4, 2024
06d87bc
move reverse order
jayzhan211 Apr 4, 2024
26e5782
rename to common
jayzhan211 Apr 4, 2024
26f852c
cleanup
jayzhan211 Apr 5, 2024
65bf4a1
Merge branch 'physical-expr-core' into move-agg-crate-2
jayzhan211 Apr 5, 2024
2bc58c1
backup
jayzhan211 Apr 5, 2024
ae9db96
Merge branch 'physical-expr-core' into move-agg-crate-2
jayzhan211 Apr 5, 2024
30d5576
move acc to first value
jayzhan211 Apr 5, 2024
672edc7
move builtin expr too
jayzhan211 Apr 5, 2024
109b790
use macro
jayzhan211 Apr 5, 2024
87d589f
fmt
jayzhan211 Apr 5, 2024
398e4e2
fix doc
jayzhan211 Apr 5, 2024
04c7f5e
add todo
jayzhan211 Apr 5, 2024
01a1ddf
rm comments
jayzhan211 Apr 5, 2024
4871414
rm unused
jayzhan211 Apr 5, 2024
1ef212b
rm unused code
jayzhan211 Apr 5, 2024
b6d53a5
change to private
jayzhan211 Apr 5, 2024
9aa15a2
fix lock
jayzhan211 Apr 5, 2024
e90464b
cleanup
jayzhan211 Apr 5, 2024
ece925f
cleanup
jayzhan211 Apr 5, 2024
89ccc89
support roundtrip
jayzhan211 Apr 5, 2024
41a830a
remmove old format state
jayzhan211 Apr 6, 2024
d235d2a
move aggregate related things to aggr crate
jayzhan211 Apr 6, 2024
51cd272
move back to common
jayzhan211 Apr 6, 2024
38b2ce7
taplo
jayzhan211 Apr 6, 2024
9c7767c
rm comment
jayzhan211 Apr 7, 2024
ea4adde
cleanup
jayzhan211 Apr 7, 2024
39c5d15
lock
jayzhan211 Apr 7, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
30 changes: 15 additions & 15 deletions datafusion-cli/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion datafusion/core/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -93,12 +93,12 @@ bytes = { workspace = true }
bzip2 = { version = "0.4.3", optional = true }
chrono = { workspace = true }
dashmap = { workspace = true }
datafusion-functions-aggregate = { workspace = true }
datafusion-common = { workspace = true, features = ["object_store"] }
datafusion-common-runtime = { workspace = true }
datafusion-execution = { workspace = true }
datafusion-expr = { workspace = true }
datafusion-functions = { workspace = true }
datafusion-functions-aggregate = { workspace = true }
datafusion-functions-array = { workspace = true, optional = true }
datafusion-optimizer = { workspace = true }
datafusion-physical-expr = { workspace = true }
Expand Down
2 changes: 1 addition & 1 deletion datafusion/core/src/execution/context/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@ use std::string::String;
use std::sync::{Arc, Weak};

use super::options::ReadOptions;
use crate::{functions, functions_aggregate, functions_array};
use crate::{
catalog::information_schema::{InformationSchemaProvider, INFORMATION_SCHEMA},
catalog::listing_schema::ListingSchemaProvider,
Expand Down Expand Up @@ -58,6 +57,7 @@ use crate::{
physical_planner::{DefaultPhysicalPlanner, PhysicalPlanner},
variable::{VarProvider, VarType},
};
use crate::{functions, functions_aggregate, functions_array};

use arrow::datatypes::{DataType, SchemaRef};
use arrow::record_batch::RecordBatch;
Expand Down
102 changes: 48 additions & 54 deletions datafusion/functions-aggregate/src/first_last.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,10 +27,7 @@ use datafusion_common::{
use datafusion_expr::function::AccumulatorArgs;
use datafusion_expr::type_coercion::aggregates::NUMERICS;
use datafusion_expr::utils::format_state_name;
use datafusion_expr::{
Accumulator, AccumulatorFactoryFunction, AggregateUDFImpl, Expr, Signature,
Volatility,
};
use datafusion_expr::{Accumulator, AggregateUDFImpl, Expr, Signature, Volatility};
use datafusion_physical_expr_common::aggregate::utils::{
down_cast_any_ref, get_sort_options, ordering_fields,
};
Expand All @@ -48,14 +45,12 @@ make_udaf_function!(
first_value,
value,
"Returns the first value in a group of values.",
first_value_udaf,
create_first_value_accumulator
first_value_udaf
);

pub struct FirstValue {
signature: Signature,
aliases: Vec<String>,
accumulator: AccumulatorFactoryFunction,
}

impl Debug for FirstValue {
Expand All @@ -68,12 +63,17 @@ impl Debug for FirstValue {
}
}

impl Default for FirstValue {
fn default() -> Self {
Self::new()
}
}

impl FirstValue {
pub fn new(accumulator: AccumulatorFactoryFunction) -> Self {
pub fn new() -> Self {
Self {
aliases: vec![String::from("FIRST_VALUE")],
signature: Signature::uniform(1, NUMERICS.to_vec(), Volatility::Immutable),
accumulator,
}
}
}
Expand All @@ -96,7 +96,45 @@ impl AggregateUDFImpl for FirstValue {
}

fn accumulator(&self, acc_args: AccumulatorArgs) -> Result<Box<dyn Accumulator>> {
(self.accumulator)(acc_args)
let mut all_sort_orders = vec![];

// Construct PhysicalSortExpr objects from Expr objects:
let mut sort_exprs = vec![];
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍

for expr in acc_args.sort_exprs {
if let Expr::Sort(sort) = expr {
if let Expr::Column(col) = sort.expr.as_ref() {
let name = &col.name;
let e = expressions::column::col(name, acc_args.schema)?;
sort_exprs.push(PhysicalSortExpr {
expr: e,
options: SortOptions {
descending: !sort.asc,
nulls_first: sort.nulls_first,
},
});
}
}
}
if !sort_exprs.is_empty() {
all_sort_orders.extend(sort_exprs);
}

let ordering_req = all_sort_orders;

let ordering_dtypes = ordering_req
.iter()
.map(|e| e.expr.data_type(acc_args.schema))
.collect::<Result<Vec<_>>>()?;

let requirement_satisfied = ordering_req.is_empty();

FirstValueAccumulator::try_new(
acc_args.data_type,
&ordering_dtypes,
ordering_req,
acc_args.ignore_nulls,
)
.map(|acc| Box::new(acc.with_requirement_satisfied(requirement_satisfied)) as _)
}

fn state_fields(
Expand All @@ -120,50 +158,6 @@ impl AggregateUDFImpl for FirstValue {
}
}

pub(crate) fn create_first_value_accumulator(
acc_args: AccumulatorArgs,
) -> Result<Box<dyn Accumulator>> {
let mut all_sort_orders = vec![];

// Construct PhysicalSortExpr objects from Expr objects:
let mut sort_exprs = vec![];
for expr in acc_args.sort_exprs {
if let Expr::Sort(sort) = expr {
if let Expr::Column(col) = sort.expr.as_ref() {
let name = &col.name;
let e = expressions::column::col(name, acc_args.schema)?;
sort_exprs.push(PhysicalSortExpr {
expr: e,
options: SortOptions {
descending: !sort.asc,
nulls_first: sort.nulls_first,
},
});
}
}
}
if !sort_exprs.is_empty() {
all_sort_orders.extend(sort_exprs);
}

let ordering_req = all_sort_orders;

let ordering_dtypes = ordering_req
.iter()
.map(|e| e.expr.data_type(acc_args.schema))
.collect::<Result<Vec<_>>>()?;

let requirement_satisfied = ordering_req.is_empty();

FirstValueAccumulator::try_new(
acc_args.data_type,
&ordering_dtypes,
ordering_req,
acc_args.ignore_nulls,
)
.map(|acc| Box::new(acc.with_requirement_satisfied(requirement_satisfied)) as _)
}

#[derive(Debug)]
pub struct FirstValueAccumulator {
first: ScalarValue,
Expand Down
7 changes: 2 additions & 5 deletions datafusion/functions-aggregate/src/macros.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
// under the License.

macro_rules! make_udaf_function {
($UDAF:ty, $EXPR_FN:ident, $($arg:ident)*, $DOC:expr, $AGGREGATE_UDF_FN:ident, $ACCUMULATOR:ident) => {
($UDAF:ty, $EXPR_FN:ident, $($arg:ident)*, $DOC:expr, $AGGREGATE_UDF_FN:ident) => {
paste::paste! {
// "fluent expr_fn" style function
#[doc = $DOC]
Expand Down Expand Up @@ -44,10 +44,7 @@ macro_rules! make_udaf_function {
pub fn $AGGREGATE_UDF_FN() -> std::sync::Arc<datafusion_expr::AggregateUDF> {
[< STATIC_ $UDAF >]
.get_or_init(|| {

let accumulator = std::sync::Arc::new($ACCUMULATOR);
std::sync::Arc::new(datafusion_expr::AggregateUDF::from(<$UDAF>::new(accumulator)))

std::sync::Arc::new(datafusion_expr::AggregateUDF::from(<$UDAF>::default()))
})
.clone()
}
Expand Down
2 changes: 1 addition & 1 deletion datafusion/physical-expr/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -56,10 +56,10 @@ base64 = { version = "0.22", optional = true }
blake2 = { version = "^0.10.2", optional = true }
blake3 = { version = "1.0", optional = true }
chrono = { workspace = true }
datafusion-functions-aggregate = { workspace = true }
datafusion-common = { workspace = true, default-features = true }
datafusion-execution = { workspace = true }
datafusion-expr = { workspace = true }
datafusion-functions-aggregate = { workspace = true }
datafusion-physical-expr-common = { workspace = true }
half = { workspace = true }
hashbrown = { version = "0.14", features = ["raw"] }
Expand Down