Skip to content

Commit

Permalink
Move null argument handling helper to utils
Browse files Browse the repository at this point in the history
  • Loading branch information
jcsherin committed Oct 16, 2024
1 parent 47c2b9c commit 4385f21
Show file tree
Hide file tree
Showing 2 changed files with 47 additions and 3 deletions.
12 changes: 10 additions & 2 deletions datafusion/functions-window/src/lead_lag.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,10 @@

//! `lead` and `lag` window function implementations
use crate::utils::{get_casted_value, get_scalar_value_from_args, get_signed_integer};
use crate::utils::{
get_casted_value, get_scalar_value_from_args, get_signed_integer,
rewrite_null_expr_and_data_type,
};
use datafusion_common::arrow::array::ArrayRef;
use datafusion_common::arrow::datatypes::DataType;
use datafusion_common::arrow::datatypes::Field;
Expand Down Expand Up @@ -178,9 +181,14 @@ impl WindowUDFImpl for WindowShift {
.input_types()
.first()
.unwrap_or(&DataType::Null);
// See https://github.com/apache/datafusion/pull/12811
let (_expr, return_type) = rewrite_null_expr_and_data_type(
partition_evaluator_args.input_exprs(),
return_type,
)?;
let default_value = get_casted_value(
get_scalar_value_from_args(partition_evaluator_args.input_exprs(), 2)?,
return_type,
&return_type,
)?;

Ok(Box::new(WindowShiftEvaluator {
Expand Down
38 changes: 37 additions & 1 deletion datafusion/functions-window/src/utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

use datafusion_common::arrow::datatypes::DataType;
use datafusion_common::{exec_err, DataFusionError, ScalarValue};
use datafusion_physical_expr::expressions::Literal;
use datafusion_physical_expr_common::physical_expr::PhysicalExpr;
use std::sync::Arc;

Expand All @@ -43,14 +44,49 @@ pub(crate) fn get_casted_value(
}
}

/// Rewrites the NULL expression (1st argument) with an expression
/// which is the same data type as the default value (3rd argument).
/// Also rewrites the return type with the same data type as the
/// default value.
///
/// If a default value is not provided, or it is NULL the original
/// expression (1st argument) and return type is returned without
/// any modifications.
pub(crate) fn rewrite_null_expr_and_data_type(
args: &[Arc<dyn PhysicalExpr>],
expr_type: &DataType,
) -> datafusion_common::Result<(Arc<dyn PhysicalExpr>, DataType)> {
assert!(!args.is_empty());
let expr = Arc::clone(&args[0]);

// The input expression and the return is type is unchanged
// when the input expression is not NULL.
if !expr_type.is_null() {
return Ok((expr, expr_type.clone()));
}

get_scalar_value_from_args(args, 2)?
.and_then(|value| {
ScalarValue::try_from(value.data_type().clone())
.map(|sv| {
Ok((
Arc::new(Literal::new(sv)) as Arc<dyn PhysicalExpr>,
value.data_type().clone(),
))
})
.ok()
})
.unwrap_or(Ok((expr, expr_type.clone())))
}

pub(crate) fn get_scalar_value_from_args(
args: &[Arc<dyn PhysicalExpr>],
index: usize,
) -> datafusion_common::Result<Option<ScalarValue>> {
Ok(if let Some(field) = args.get(index) {
let tmp = field
.as_any()
.downcast_ref::<datafusion_physical_expr::expressions::Literal>()
.downcast_ref::<Literal>()
.ok_or_else(|| DataFusionError::NotImplemented(
format!("There is only support Literal types for field at idx: {index} in Window Function"),
))?
Expand Down

0 comments on commit 4385f21

Please sign in to comment.