Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Move First Value UDAF and builtin first / last function to aggregate-functions #9960

Merged
merged 40 commits into from
Apr 7, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
40 commits
Select commit Hold shift + click to select a range
0eaf289
backup
jayzhan211 Apr 3, 2024
5338f61
move PhysicalExpr
jayzhan211 Apr 3, 2024
450ae4b
cleanup
jayzhan211 Apr 3, 2024
3624964
move physical sort
jayzhan211 Apr 3, 2024
835f147
cleanup dependencies
jayzhan211 Apr 3, 2024
c5d80c8
add readme
jayzhan211 Apr 3, 2024
7851de7
disable doc test
jayzhan211 Apr 3, 2024
f5aafb3
move column
jayzhan211 Apr 4, 2024
7bfc074
fmt
jayzhan211 Apr 4, 2024
675d2fe
move aggregatexp
jayzhan211 Apr 4, 2024
5220087
move other two utils
jayzhan211 Apr 4, 2024
113a000
license
jayzhan211 Apr 4, 2024
fea87e3
switch to ignore
jayzhan211 Apr 4, 2024
06d87bc
move reverse order
jayzhan211 Apr 4, 2024
26e5782
rename to common
jayzhan211 Apr 4, 2024
26f852c
cleanup
jayzhan211 Apr 5, 2024
65bf4a1
Merge branch 'physical-expr-core' into move-agg-crate-2
jayzhan211 Apr 5, 2024
2bc58c1
backup
jayzhan211 Apr 5, 2024
ae9db96
Merge branch 'physical-expr-core' into move-agg-crate-2
jayzhan211 Apr 5, 2024
30d5576
move acc to first value
jayzhan211 Apr 5, 2024
672edc7
move builtin expr too
jayzhan211 Apr 5, 2024
109b790
use macro
jayzhan211 Apr 5, 2024
87d589f
fmt
jayzhan211 Apr 5, 2024
398e4e2
fix doc
jayzhan211 Apr 5, 2024
04c7f5e
add todo
jayzhan211 Apr 5, 2024
01a1ddf
rm comments
jayzhan211 Apr 5, 2024
4871414
rm unused
jayzhan211 Apr 5, 2024
1ef212b
rm unused code
jayzhan211 Apr 5, 2024
b6d53a5
change to private
jayzhan211 Apr 5, 2024
9aa15a2
fix lock
jayzhan211 Apr 5, 2024
e90464b
cleanup
jayzhan211 Apr 5, 2024
ece925f
cleanup
jayzhan211 Apr 5, 2024
89ccc89
support roundtrip
jayzhan211 Apr 5, 2024
41a830a
remmove old format state
jayzhan211 Apr 6, 2024
d235d2a
move aggregate related things to aggr crate
jayzhan211 Apr 6, 2024
51cd272
move back to common
jayzhan211 Apr 6, 2024
38b2ce7
taplo
jayzhan211 Apr 6, 2024
9c7767c
rm comment
jayzhan211 Apr 7, 2024
ea4adde
cleanup
jayzhan211 Apr 7, 2024
39c5d15
lock
jayzhan211 Apr 7, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ members = [
"datafusion/core",
"datafusion/expr",
"datafusion/execution",
"datafusion/functions-aggregate",
"datafusion/functions",
"datafusion/functions-array",
"datafusion/optimizer",
Expand Down Expand Up @@ -78,6 +79,7 @@ datafusion-common-runtime = { path = "datafusion/common-runtime", version = "37.
datafusion-execution = { path = "datafusion/execution", version = "37.0.0" }
datafusion-expr = { path = "datafusion/expr", version = "37.0.0" }
datafusion-functions = { path = "datafusion/functions", version = "37.0.0" }
datafusion-functions-aggregate = { path = "datafusion/functions-aggregate", version = "37.0.0" }
datafusion-functions-array = { path = "datafusion/functions-array", version = "37.0.0" }
datafusion-optimizer = { path = "datafusion/optimizer", version = "37.0.0", default-features = false }
datafusion-physical-expr = { path = "datafusion/physical-expr", version = "37.0.0", default-features = false }
Expand Down
17 changes: 17 additions & 0 deletions datafusion-cli/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions datafusion/core/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,7 @@ datafusion-common-runtime = { workspace = true }
datafusion-execution = { workspace = true }
datafusion-expr = { workspace = true }
datafusion-functions = { workspace = true }
datafusion-functions-aggregate = { workspace = true }
datafusion-functions-array = { workspace = true, optional = true }
datafusion-optimizer = { workspace = true }
datafusion-physical-expr = { workspace = true }
Expand Down
29 changes: 7 additions & 22 deletions datafusion/core/src/execution/context/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ use crate::{
datasource::{provider_as_source, MemTable, TableProvider, ViewTable},
error::{DataFusionError, Result},
execution::{options::ArrowReadOptions, runtime_env::RuntimeEnv, FunctionRegistry},
logical_expr::AggregateUDF,
logical_expr::{
CreateCatalog, CreateCatalogSchema, CreateExternalTable, CreateFunction,
CreateMemoryTable, CreateView, DropCatalogSchema, DropFunction, DropTable,
Expand All @@ -53,10 +54,11 @@ use crate::{
optimizer::analyzer::{Analyzer, AnalyzerRule},
optimizer::optimizer::{Optimizer, OptimizerConfig, OptimizerRule},
physical_optimizer::optimizer::{PhysicalOptimizer, PhysicalOptimizerRule},
physical_plan::{udaf::AggregateUDF, udf::ScalarUDF, ExecutionPlan},
physical_plan::{udf::ScalarUDF, ExecutionPlan},
physical_planner::{DefaultPhysicalPlanner, PhysicalPlanner},
variable::{VarProvider, VarType},
};
use crate::{functions, functions_aggregate, functions_array};

use arrow::datatypes::{DataType, SchemaRef};
use arrow::record_batch::RecordBatch;
Expand All @@ -69,14 +71,11 @@ use datafusion_common::{
SchemaReference, TableReference,
};
use datafusion_execution::registry::SerializerRegistry;
use datafusion_expr::type_coercion::aggregates::NUMERICS;
use datafusion_expr::{create_first_value, Signature, Volatility};
use datafusion_expr::{
logical_plan::{DdlStatement, Statement},
var_provider::is_system_variables,
Expr, StringifiedPlan, UserDefinedLogicalNode, WindowUDF,
};
use datafusion_physical_expr::create_first_value_accumulator;
use datafusion_sql::{
parser::{CopyToSource, CopyToStatement, DFParser},
planner::{object_name_to_table_reference, ContextProvider, ParserOptions, SqlToRel},
Expand All @@ -85,7 +84,6 @@ use datafusion_sql::{

use async_trait::async_trait;
use chrono::{DateTime, Utc};
use log::debug;
use parking_lot::RwLock;
use sqlparser::dialect::dialect_from_str;
use url::Url;
Expand Down Expand Up @@ -1452,29 +1450,16 @@ impl SessionState {
};

// register built in functions
datafusion_functions::register_all(&mut new_self)
functions::register_all(&mut new_self)
.expect("can not register built in functions");

// register crate of array expressions (if enabled)
#[cfg(feature = "array_expressions")]
datafusion_functions_array::register_all(&mut new_self)
functions_array::register_all(&mut new_self)
.expect("can not register array expressions");

let first_value = create_first_value(
"FIRST_VALUE",
Signature::uniform(1, NUMERICS.to_vec(), Volatility::Immutable),
Arc::new(create_first_value_accumulator),
);

match new_self.register_udaf(Arc::new(first_value)) {
Ok(Some(existing_udaf)) => {
debug!("Overwrite existing UDAF: {}", existing_udaf.name());
}
Ok(None) => {}
Err(err) => {
panic!("Failed to register UDAF: {}", err);
}
}
functions_aggregate::register_all(&mut new_self)
.expect("can not register aggregate functions");

new_self
}
Expand Down
5 changes: 5 additions & 0 deletions datafusion/core/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -538,6 +538,11 @@ pub mod functions_array {
pub use datafusion_functions_array::*;
}

/// re-export of [`datafusion_functions_aggregate`] crate
pub mod functions_aggregate {
pub use datafusion_functions_aggregate::*;
}

#[cfg(test)]
pub mod test;
pub mod test_util;
Expand Down
84 changes: 0 additions & 84 deletions datafusion/expr/src/expr_fn.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@ use crate::expr::{
use crate::function::{
AccumulatorArgs, AccumulatorFactoryFunction, PartitionEvaluatorFactory,
};
use crate::udaf::format_state_name;
use crate::{
aggregate_function, built_in_function, conditional_expressions::CaseBuilder,
logical_plan::Subquery, AggregateUDF, BuiltinScalarFunction, Expr, LogicalPlan,
Expand Down Expand Up @@ -708,17 +707,6 @@ pub fn create_udaf(
))
}

/// Creates a new UDAF with a specific signature, state type and return type.
/// The signature and state type must match the `Accumulator's implementation`.
/// TOOD: We plan to move aggregate function to its own crate. This function will be deprecated then.
pub fn create_first_value(
name: &str,
signature: Signature,
accumulator: AccumulatorFactoryFunction,
) -> AggregateUDF {
AggregateUDF::from(FirstValue::new(name, signature, accumulator))
}

/// Implements [`AggregateUDFImpl`] for functions that have a single signature and
/// return type.
pub struct SimpleAggregateUDF {
Expand Down Expand Up @@ -813,78 +801,6 @@ impl AggregateUDFImpl for SimpleAggregateUDF {
}
}

pub struct FirstValue {
name: String,
signature: Signature,
accumulator: AccumulatorFactoryFunction,
}

impl Debug for FirstValue {
fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
f.debug_struct("FirstValue")
.field("name", &self.name)
.field("signature", &self.signature)
.field("accumulator", &"<FUNC>")
.finish()
}
}

impl FirstValue {
pub fn new(
name: impl Into<String>,
signature: Signature,
accumulator: AccumulatorFactoryFunction,
) -> Self {
let name = name.into();
Self {
name,
signature,
accumulator,
}
}
}

impl AggregateUDFImpl for FirstValue {
fn as_any(&self) -> &dyn Any {
self
}

fn name(&self) -> &str {
&self.name
}

fn signature(&self) -> &Signature {
&self.signature
}

fn return_type(&self, arg_types: &[DataType]) -> Result<DataType> {
Ok(arg_types[0].clone())
}

fn accumulator(
&self,
acc_args: AccumulatorArgs,
) -> Result<Box<dyn crate::Accumulator>> {
(self.accumulator)(acc_args)
}

fn state_fields(
&self,
name: &str,
value_type: DataType,
ordering_fields: Vec<Field>,
) -> Result<Vec<Field>> {
let mut fields = vec![Field::new(
format_state_name(name, "first_value"),
value_type,
true,
)];
fields.extend(ordering_fields);
fields.push(Field::new("is_set", DataType::Boolean, true));
Ok(fields)
}
}

/// Creates a new UDWF with a specific signature, state type and return type.
///
/// The signature and state type must match the [`PartitionEvaluator`]'s implementation`.
Expand Down
7 changes: 1 addition & 6 deletions datafusion/expr/src/udaf.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

use crate::function::AccumulatorArgs;
use crate::groups_accumulator::GroupsAccumulator;
use crate::utils::format_state_name;
use crate::{Accumulator, Expr};
use crate::{AccumulatorFactoryFunction, ReturnTypeFunction, Signature};
use arrow::datatypes::{DataType, Field};
Expand Down Expand Up @@ -447,9 +448,3 @@ impl AggregateUDFImpl for AggregateUDFLegacyWrapper {
(self.accumulator)(acc_args)
}
}

/// returns the name of the state
/// TODO: Remove duplicated function in physical-expr
pub(crate) fn format_state_name(name: &str, state_name: &str) -> String {
format!("{name}[{state_name}]")
}
5 changes: 5 additions & 0 deletions datafusion/expr/src/utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1240,6 +1240,11 @@ pub fn merge_schema(inputs: Vec<&LogicalPlan>) -> DFSchema {
}
}

/// Build state name. State is the intermidiate state of the aggregate function.
pub fn format_state_name(name: &str, state_name: &str) -> String {
format!("{name}[{state_name}]")
}

#[cfg(test)]
mod tests {
use super::*;
Expand Down
44 changes: 44 additions & 0 deletions datafusion/functions-aggregate/Cargo.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.

[package]
name = "datafusion-functions-aggregate"
description = "Aggregate function packages for the DataFusion query engine"
keywords = ["datafusion", "logical", "plan", "expressions"]
readme = "README.md"
version = { workspace = true }
edition = { workspace = true }
homepage = { workspace = true }
repository = { workspace = true }
license = { workspace = true }
authors = { workspace = true }
rust-version = { workspace = true }

[lib]
name = "datafusion_functions_aggregate"
path = "src/lib.rs"

# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html

[dependencies]
arrow = { workspace = true }
datafusion-common = { workspace = true }
datafusion-execution = { workspace = true }
datafusion-expr = { workspace = true }
datafusion-physical-expr-common = { workspace = true }
log = { workspace = true }
paste = "1.0.14"
Loading
Loading