Skip to content

Commit

Permalink
Move First Value UDAF and builtin first / last function to `aggregate…
Browse files Browse the repository at this point in the history
…-functions` (#9960)

* backup

Signed-off-by: jayzhan211 <[email protected]>

* move PhysicalExpr

Signed-off-by: jayzhan211 <[email protected]>

* cleanup

Signed-off-by: jayzhan211 <[email protected]>

* move physical sort

Signed-off-by: jayzhan211 <[email protected]>

* cleanup dependencies

Signed-off-by: jayzhan211 <[email protected]>

* add readme

Signed-off-by: jayzhan211 <[email protected]>

* disable doc test

Signed-off-by: jayzhan211 <[email protected]>

* move column

Signed-off-by: jayzhan211 <[email protected]>

* fmt

Signed-off-by: jayzhan211 <[email protected]>

* move aggregatexp

Signed-off-by: jayzhan211 <[email protected]>

* move other two utils

Signed-off-by: jayzhan211 <[email protected]>

* license

Signed-off-by: jayzhan211 <[email protected]>

* switch to ignore

Signed-off-by: jayzhan211 <[email protected]>

* move reverse order

Signed-off-by: jayzhan211 <[email protected]>

* rename to common

Signed-off-by: jayzhan211 <[email protected]>

* cleanup

Signed-off-by: jayzhan211 <[email protected]>

* backup

Signed-off-by: jayzhan211 <[email protected]>

* move acc to first value

Signed-off-by: jayzhan211 <[email protected]>

* move builtin expr too

Signed-off-by: jayzhan211 <[email protected]>

* use macro

Signed-off-by: jayzhan211 <[email protected]>

* fmt

Signed-off-by: jayzhan211 <[email protected]>

* fix doc

Signed-off-by: jayzhan211 <[email protected]>

* add todo

Signed-off-by: jayzhan211 <[email protected]>

* rm comments

Signed-off-by: jayzhan211 <[email protected]>

* rm unused

Signed-off-by: jayzhan211 <[email protected]>

* rm unused code

Signed-off-by: jayzhan211 <[email protected]>

* change to private

Signed-off-by: jayzhan211 <[email protected]>

* fix lock

Signed-off-by: jayzhan211 <[email protected]>

* cleanup

Signed-off-by: jayzhan211 <[email protected]>

* cleanup

Signed-off-by: jayzhan211 <[email protected]>

* support roundtrip

Signed-off-by: jayzhan211 <[email protected]>

* remmove old format state

Signed-off-by: jayzhan211 <[email protected]>

* move aggregate related things to aggr crate

Signed-off-by: jayzhan211 <[email protected]>

* move back to common

Signed-off-by: jayzhan211 <[email protected]>

* taplo

Signed-off-by: jayzhan211 <[email protected]>

* rm comment

Signed-off-by: jayzhan211 <[email protected]>

* cleanup

Signed-off-by: jayzhan211 <[email protected]>

* lock

Signed-off-by: jayzhan211 <[email protected]>

---------

Signed-off-by: jayzhan211 <[email protected]>
  • Loading branch information
jayzhan211 authored Apr 7, 2024
1 parent 98ba11f commit 2f1c3ab
Show file tree
Hide file tree
Showing 23 changed files with 720 additions and 570 deletions.
2 changes: 2 additions & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ members = [
"datafusion/core",
"datafusion/expr",
"datafusion/execution",
"datafusion/functions-aggregate",
"datafusion/functions",
"datafusion/functions-array",
"datafusion/optimizer",
Expand Down Expand Up @@ -78,6 +79,7 @@ datafusion-common-runtime = { path = "datafusion/common-runtime", version = "37.
datafusion-execution = { path = "datafusion/execution", version = "37.0.0" }
datafusion-expr = { path = "datafusion/expr", version = "37.0.0" }
datafusion-functions = { path = "datafusion/functions", version = "37.0.0" }
datafusion-functions-aggregate = { path = "datafusion/functions-aggregate", version = "37.0.0" }
datafusion-functions-array = { path = "datafusion/functions-array", version = "37.0.0" }
datafusion-optimizer = { path = "datafusion/optimizer", version = "37.0.0", default-features = false }
datafusion-physical-expr = { path = "datafusion/physical-expr", version = "37.0.0", default-features = false }
Expand Down
17 changes: 17 additions & 0 deletions datafusion-cli/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions datafusion/core/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,7 @@ datafusion-common-runtime = { workspace = true }
datafusion-execution = { workspace = true }
datafusion-expr = { workspace = true }
datafusion-functions = { workspace = true }
datafusion-functions-aggregate = { workspace = true }
datafusion-functions-array = { workspace = true, optional = true }
datafusion-optimizer = { workspace = true }
datafusion-physical-expr = { workspace = true }
Expand Down
29 changes: 7 additions & 22 deletions datafusion/core/src/execution/context/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ use crate::{
datasource::{provider_as_source, MemTable, TableProvider, ViewTable},
error::{DataFusionError, Result},
execution::{options::ArrowReadOptions, runtime_env::RuntimeEnv, FunctionRegistry},
logical_expr::AggregateUDF,
logical_expr::{
CreateCatalog, CreateCatalogSchema, CreateExternalTable, CreateFunction,
CreateMemoryTable, CreateView, DropCatalogSchema, DropFunction, DropTable,
Expand All @@ -53,10 +54,11 @@ use crate::{
optimizer::analyzer::{Analyzer, AnalyzerRule},
optimizer::optimizer::{Optimizer, OptimizerConfig, OptimizerRule},
physical_optimizer::optimizer::{PhysicalOptimizer, PhysicalOptimizerRule},
physical_plan::{udaf::AggregateUDF, udf::ScalarUDF, ExecutionPlan},
physical_plan::{udf::ScalarUDF, ExecutionPlan},
physical_planner::{DefaultPhysicalPlanner, PhysicalPlanner},
variable::{VarProvider, VarType},
};
use crate::{functions, functions_aggregate, functions_array};

use arrow::datatypes::{DataType, SchemaRef};
use arrow::record_batch::RecordBatch;
Expand All @@ -69,14 +71,11 @@ use datafusion_common::{
SchemaReference, TableReference,
};
use datafusion_execution::registry::SerializerRegistry;
use datafusion_expr::type_coercion::aggregates::NUMERICS;
use datafusion_expr::{create_first_value, Signature, Volatility};
use datafusion_expr::{
logical_plan::{DdlStatement, Statement},
var_provider::is_system_variables,
Expr, StringifiedPlan, UserDefinedLogicalNode, WindowUDF,
};
use datafusion_physical_expr::create_first_value_accumulator;
use datafusion_sql::{
parser::{CopyToSource, CopyToStatement, DFParser},
planner::{object_name_to_table_reference, ContextProvider, ParserOptions, SqlToRel},
Expand All @@ -85,7 +84,6 @@ use datafusion_sql::{

use async_trait::async_trait;
use chrono::{DateTime, Utc};
use log::debug;
use parking_lot::RwLock;
use sqlparser::dialect::dialect_from_str;
use url::Url;
Expand Down Expand Up @@ -1452,29 +1450,16 @@ impl SessionState {
};

// register built in functions
datafusion_functions::register_all(&mut new_self)
functions::register_all(&mut new_self)
.expect("can not register built in functions");

// register crate of array expressions (if enabled)
#[cfg(feature = "array_expressions")]
datafusion_functions_array::register_all(&mut new_self)
functions_array::register_all(&mut new_self)
.expect("can not register array expressions");

let first_value = create_first_value(
"FIRST_VALUE",
Signature::uniform(1, NUMERICS.to_vec(), Volatility::Immutable),
Arc::new(create_first_value_accumulator),
);

match new_self.register_udaf(Arc::new(first_value)) {
Ok(Some(existing_udaf)) => {
debug!("Overwrite existing UDAF: {}", existing_udaf.name());
}
Ok(None) => {}
Err(err) => {
panic!("Failed to register UDAF: {}", err);
}
}
functions_aggregate::register_all(&mut new_self)
.expect("can not register aggregate functions");

new_self
}
Expand Down
5 changes: 5 additions & 0 deletions datafusion/core/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -541,6 +541,11 @@ pub mod functions_array {
pub use datafusion_functions_array::*;
}

/// re-export of [`datafusion_functions_aggregate`] crate
pub mod functions_aggregate {
pub use datafusion_functions_aggregate::*;
}

#[cfg(test)]
pub mod test;
pub mod test_util;
Expand Down
84 changes: 0 additions & 84 deletions datafusion/expr/src/expr_fn.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@ use crate::expr::{
use crate::function::{
AccumulatorArgs, AccumulatorFactoryFunction, PartitionEvaluatorFactory,
};
use crate::udaf::format_state_name;
use crate::{
aggregate_function, built_in_function, conditional_expressions::CaseBuilder,
logical_plan::Subquery, AggregateUDF, BuiltinScalarFunction, Expr, LogicalPlan,
Expand Down Expand Up @@ -708,17 +707,6 @@ pub fn create_udaf(
))
}

/// Creates a new UDAF with a specific signature, state type and return type.
/// The signature and state type must match the `Accumulator's implementation`.
/// TOOD: We plan to move aggregate function to its own crate. This function will be deprecated then.
pub fn create_first_value(
name: &str,
signature: Signature,
accumulator: AccumulatorFactoryFunction,
) -> AggregateUDF {
AggregateUDF::from(FirstValue::new(name, signature, accumulator))
}

/// Implements [`AggregateUDFImpl`] for functions that have a single signature and
/// return type.
pub struct SimpleAggregateUDF {
Expand Down Expand Up @@ -813,78 +801,6 @@ impl AggregateUDFImpl for SimpleAggregateUDF {
}
}

pub struct FirstValue {
name: String,
signature: Signature,
accumulator: AccumulatorFactoryFunction,
}

impl Debug for FirstValue {
fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
f.debug_struct("FirstValue")
.field("name", &self.name)
.field("signature", &self.signature)
.field("accumulator", &"<FUNC>")
.finish()
}
}

impl FirstValue {
pub fn new(
name: impl Into<String>,
signature: Signature,
accumulator: AccumulatorFactoryFunction,
) -> Self {
let name = name.into();
Self {
name,
signature,
accumulator,
}
}
}

impl AggregateUDFImpl for FirstValue {
fn as_any(&self) -> &dyn Any {
self
}

fn name(&self) -> &str {
&self.name
}

fn signature(&self) -> &Signature {
&self.signature
}

fn return_type(&self, arg_types: &[DataType]) -> Result<DataType> {
Ok(arg_types[0].clone())
}

fn accumulator(
&self,
acc_args: AccumulatorArgs,
) -> Result<Box<dyn crate::Accumulator>> {
(self.accumulator)(acc_args)
}

fn state_fields(
&self,
name: &str,
value_type: DataType,
ordering_fields: Vec<Field>,
) -> Result<Vec<Field>> {
let mut fields = vec![Field::new(
format_state_name(name, "first_value"),
value_type,
true,
)];
fields.extend(ordering_fields);
fields.push(Field::new("is_set", DataType::Boolean, true));
Ok(fields)
}
}

/// Creates a new UDWF with a specific signature, state type and return type.
///
/// The signature and state type must match the [`PartitionEvaluator`]'s implementation`.
Expand Down
7 changes: 1 addition & 6 deletions datafusion/expr/src/udaf.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
use crate::function::AccumulatorArgs;
use crate::groups_accumulator::GroupsAccumulator;
use crate::utils::format_state_name;
use crate::{Accumulator, Expr};
use crate::{AccumulatorFactoryFunction, ReturnTypeFunction, Signature};
use arrow::datatypes::{DataType, Field};
Expand Down Expand Up @@ -447,9 +448,3 @@ impl AggregateUDFImpl for AggregateUDFLegacyWrapper {
(self.accumulator)(acc_args)
}
}

/// returns the name of the state
/// TODO: Remove duplicated function in physical-expr
pub(crate) fn format_state_name(name: &str, state_name: &str) -> String {
format!("{name}[{state_name}]")
}
5 changes: 5 additions & 0 deletions datafusion/expr/src/utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1240,6 +1240,11 @@ pub fn merge_schema(inputs: Vec<&LogicalPlan>) -> DFSchema {
}
}

/// Build state name. State is the intermidiate state of the aggregate function.
pub fn format_state_name(name: &str, state_name: &str) -> String {
format!("{name}[{state_name}]")
}

#[cfg(test)]
mod tests {
use super::*;
Expand Down
44 changes: 44 additions & 0 deletions datafusion/functions-aggregate/Cargo.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.

[package]
name = "datafusion-functions-aggregate"
description = "Aggregate function packages for the DataFusion query engine"
keywords = ["datafusion", "logical", "plan", "expressions"]
readme = "README.md"
version = { workspace = true }
edition = { workspace = true }
homepage = { workspace = true }
repository = { workspace = true }
license = { workspace = true }
authors = { workspace = true }
rust-version = { workspace = true }

[lib]
name = "datafusion_functions_aggregate"
path = "src/lib.rs"

# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html

[dependencies]
arrow = { workspace = true }
datafusion-common = { workspace = true }
datafusion-execution = { workspace = true }
datafusion-expr = { workspace = true }
datafusion-physical-expr-common = { workspace = true }
log = { workspace = true }
paste = "1.0.14"
Loading

0 comments on commit 2f1c3ab

Please sign in to comment.