Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Minor: Switch LastValue SQL workflow to UDAF version #10062

Closed
wants to merge 2 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
118 changes: 118 additions & 0 deletions datafusion/functions-aggregate/src/first_last.rs
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,14 @@ make_udaf_function!(
first_value_udaf
);

make_udaf_function!(
LastValue,
last_value,
value,
"Returns the last value in a group of values.",
last_value_udaf
);

pub struct FirstValue {
signature: Signature,
aliases: Vec<String>,
Expand Down Expand Up @@ -514,6 +522,116 @@ impl PartialEq<dyn Any> for FirstValuePhysicalExpr {
}
}

pub struct LastValue {
signature: Signature,
aliases: Vec<String>,
}

impl Debug for LastValue {
fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
f.debug_struct("LastValue")
.field("name", &self.name())
.field("signature", &self.signature)
.field("accumulator", &"<FUNC>")
.finish()
}
}

impl Default for LastValue {
fn default() -> Self {
Self::new()
}
}

impl LastValue {
pub fn new() -> Self {
Self {
aliases: vec![String::from("LAST_VALUE")],
signature: Signature::uniform(1, NUMERICS.to_vec(), Volatility::Immutable),
}
}
}

impl AggregateUDFImpl for LastValue {
fn as_any(&self) -> &dyn Any {
self
}

fn name(&self) -> &str {
"LAST_VALUE"
}

fn signature(&self) -> &Signature {
&self.signature
}

fn return_type(&self, arg_types: &[DataType]) -> Result<DataType> {
Ok(arg_types[0].clone())
}

fn accumulator(&self, acc_args: AccumulatorArgs) -> Result<Box<dyn Accumulator>> {
let mut all_sort_orders = vec![];

// Construct PhysicalSortExpr objects from Expr objects:
let mut sort_exprs = vec![];
for expr in acc_args.sort_exprs {
if let Expr::Sort(sort) = expr {
if let Expr::Column(col) = sort.expr.as_ref() {
let name = &col.name;
let e = expressions::column::col(name, acc_args.schema)?;
sort_exprs.push(PhysicalSortExpr {
expr: e,
options: SortOptions {
descending: !sort.asc,
nulls_first: sort.nulls_first,
},
});
}
}
}
if !sort_exprs.is_empty() {
all_sort_orders.extend(sort_exprs);
}

let ordering_req = all_sort_orders;

let ordering_dtypes = ordering_req
.iter()
.map(|e| e.expr.data_type(acc_args.schema))
.collect::<Result<Vec<_>>>()?;

let requirement_satisfied = ordering_req.is_empty();

LastValueAccumulator::try_new(
acc_args.data_type,
&ordering_dtypes,
ordering_req,
acc_args.ignore_nulls,
)
.map(|acc| Box::new(acc.with_requirement_satisfied(requirement_satisfied)) as _)
}

fn state_fields(
&self,
name: &str,
value_type: DataType,
ordering_fields: Vec<Field>,
) -> Result<Vec<Field>> {
let mut fields = vec![Field::new(
format_state_name(name, "last_value"),
value_type,
true,
)];
fields.extend(ordering_fields);
fields.push(Field::new("is_set", DataType::Boolean, true));
Ok(fields)
}

fn aliases(&self) -> &[String] {
&self.aliases
}
}

/// TO BE DEPRECATED: Builtin LAST_VALUE physical aggregate expression will be replaced by udf in the future
#[derive(Debug, Clone)]
pub struct LastValuePhysicalExpr {
Expand Down
5 changes: 4 additions & 1 deletion datafusion/functions-aggregate/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,10 @@ pub mod expr_fn {

/// Registers all enabled packages with a [`FunctionRegistry`]
pub fn register_all(registry: &mut dyn FunctionRegistry) -> Result<()> {
let functions: Vec<Arc<AggregateUDF>> = vec![first_last::first_value_udaf()];
let functions: Vec<Arc<AggregateUDF>> = vec![
first_last::first_value_udaf(),
first_last::last_value_udaf(),
];

functions.into_iter().try_for_each(|udf| {
let existing_udaf = registry.register_udaf(udf)?;
Expand Down
Loading