Skip to content

Commit

Permalink
Add ColumnarValue::values_to_arrays, deprecate `columnar_values_to_…
Browse files Browse the repository at this point in the history
…array` (apache#9114)

* Add `ColumnarValue::values_to_array`

* Apply suggestions from code review

Co-authored-by: Liang-Chi Hsieh <[email protected]>

---------

Co-authored-by: Liang-Chi Hsieh <[email protected]>
  • Loading branch information
alamb and viirya authored Feb 5, 2024
1 parent 33b52ba commit dfb6435
Show file tree
Hide file tree
Showing 4 changed files with 170 additions and 48 deletions.
7 changes: 4 additions & 3 deletions datafusion-examples/examples/simple_udf.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,6 @@ use datafusion::error::Result;
use datafusion::prelude::*;
use datafusion_common::cast::as_float64_array;
use datafusion_expr::ColumnarValue;
use datafusion_physical_expr::functions::columnar_values_to_array;
use std::sync::Arc;

/// create local execution context with an in-memory table:
Expand Down Expand Up @@ -71,13 +70,15 @@ async fn main() -> Result<()> {
// this is guaranteed by DataFusion based on the function's signature.
assert_eq!(args.len(), 2);

let args = columnar_values_to_array(args)?;
// Expand the arguments to arrays (this is simple, but inefficient for
// single constant values).
let args = ColumnarValue::values_to_arrays(args)?;

// 1. cast both arguments to f64. These casts MUST be aligned with the signature or this function panics!
let base = as_float64_array(&args[0]).expect("cast failed");
let exponent = as_float64_array(&args[1]).expect("cast failed");

// this is guaranteed by DataFusion. We place it just to make it obvious.
// The array lengths is guaranteed by DataFusion. We assert here to make it obvious.
assert_eq!(exponent.len(), base.len());

// 2. perform the computation
Expand Down
164 changes: 163 additions & 1 deletion datafusion/expr/src/columnar_value.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
use arrow::array::ArrayRef;
use arrow::array::NullArray;
use arrow::datatypes::DataType;
use datafusion_common::{Result, ScalarValue};
use datafusion_common::{internal_err, DataFusionError, Result, ScalarValue};
use std::sync::Arc;

/// Represents the result of evaluating an expression: either a single
Expand Down Expand Up @@ -75,4 +75,166 @@ impl ColumnarValue {
pub fn create_null_array(num_rows: usize) -> Self {
ColumnarValue::Array(Arc::new(NullArray::new(num_rows)))
}

/// Converts [`ColumnarValue`]s to [`ArrayRef`]s with the same length.
///
/// # Performance Note
///
/// This function expands any [`ScalarValue`] to an array. This expansion
/// permits using a single function in terms of arrays, but it can be
/// inefficient compared to handling the scalar value directly.
///
/// Thus, it is recommended to provide specialized implementations for
/// scalar values if performance is a concern.
///
/// # Errors
///
/// If there are multiple array arguments that have different lengths
pub fn values_to_arrays(args: &[ColumnarValue]) -> Result<Vec<ArrayRef>> {
if args.is_empty() {
return Ok(vec![]);
}

let mut array_len = None;
for arg in args {
array_len = match (arg, array_len) {
(ColumnarValue::Array(a), None) => Some(a.len()),
(ColumnarValue::Array(a), Some(array_len)) => {
if array_len == a.len() {
Some(array_len)
} else {
return internal_err!(
"Arguments has mixed length. Expected length: {array_len}, found length: {}", a.len()
);
}
}
(ColumnarValue::Scalar(_), array_len) => array_len,
}
}

// If array_len is none, it means there are only scalars, so make a 1 element array
let inferred_length = array_len.unwrap_or(1);

let args = args
.iter()
.map(|arg| arg.clone().into_array(inferred_length))
.collect::<Result<Vec<_>>>()?;

Ok(args)
}
}

#[cfg(test)]
mod tests {
use super::*;

#[test]
fn values_to_arrays() {
// (input, expected)
let cases = vec![
// empty
TestCase {
input: vec![],
expected: vec![],
},
// one array of length 3
TestCase {
input: vec![ColumnarValue::Array(make_array(1, 3))],
expected: vec![make_array(1, 3)],
},
// two arrays length 3
TestCase {
input: vec![
ColumnarValue::Array(make_array(1, 3)),
ColumnarValue::Array(make_array(2, 3)),
],
expected: vec![make_array(1, 3), make_array(2, 3)],
},
// array and scalar
TestCase {
input: vec![
ColumnarValue::Array(make_array(1, 3)),
ColumnarValue::Scalar(ScalarValue::Int32(Some(100))),
],
expected: vec![
make_array(1, 3),
make_array(100, 3), // scalar is expanded
],
},
// scalar and array
TestCase {
input: vec![
ColumnarValue::Scalar(ScalarValue::Int32(Some(100))),
ColumnarValue::Array(make_array(1, 3)),
],
expected: vec![
make_array(100, 3), // scalar is expanded
make_array(1, 3),
],
},
// multiple scalars and array
TestCase {
input: vec![
ColumnarValue::Scalar(ScalarValue::Int32(Some(100))),
ColumnarValue::Array(make_array(1, 3)),
ColumnarValue::Scalar(ScalarValue::Int32(Some(200))),
],
expected: vec![
make_array(100, 3), // scalar is expanded
make_array(1, 3),
make_array(200, 3), // scalar is expanded
],
},
];
for case in cases {
case.run();
}
}

#[test]
#[should_panic(
expected = "Arguments has mixed length. Expected length: 3, found length: 4"
)]
fn values_to_arrays_mixed_length() {
ColumnarValue::values_to_arrays(&[
ColumnarValue::Array(make_array(1, 3)),
ColumnarValue::Array(make_array(2, 4)),
])
.unwrap();
}

#[test]
#[should_panic(
expected = "Arguments has mixed length. Expected length: 3, found length: 7"
)]
fn values_to_arrays_mixed_length_and_scalar() {
ColumnarValue::values_to_arrays(&[
ColumnarValue::Array(make_array(1, 3)),
ColumnarValue::Scalar(ScalarValue::Int32(Some(100))),
ColumnarValue::Array(make_array(2, 7)),
])
.unwrap();
}

struct TestCase {
input: Vec<ColumnarValue>,
expected: Vec<ArrayRef>,
}

impl TestCase {
fn run(self) {
let Self { input, expected } = self;

assert_eq!(
ColumnarValue::values_to_arrays(&input).unwrap(),
expected,
"\ninput: {input:?}\nexpected: {expected:?}"
);
}
}

/// Makes an array of length `len` with all elements set to `val`
fn make_array(val: i32, len: usize) -> ArrayRef {
Arc::new(arrow::array::Int32Array::from(vec![val; len]))
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -1347,7 +1347,6 @@ mod tests {
use datafusion_physical_expr::execution_props::ExecutionProps;

use chrono::{DateTime, TimeZone, Utc};
use datafusion_physical_expr::functions::columnar_values_to_array;

// ------------------------------
// --- ExprSimplifier tests -----
Expand Down Expand Up @@ -1461,7 +1460,7 @@ mod tests {
let return_type = Arc::new(DataType::Int32);

let fun = Arc::new(|args: &[ColumnarValue]| {
let args = columnar_values_to_array(args)?;
let args = ColumnarValue::values_to_arrays(args)?;

let arg0 = as_int32_array(&args[0])?;
let arg1 = as_int32_array(&args[1])?;
Expand Down
44 changes: 2 additions & 42 deletions datafusion/physical-expr/src/functions.rs
Original file line number Diff line number Diff line change
Expand Up @@ -173,49 +173,9 @@ pub(crate) enum Hint {
AcceptsSingular,
}

/// A helper function used to infer the length of arguments of Scalar functions and convert
/// [`ColumnarValue`]s to [`ArrayRef`]s with the inferred length. Note that this function
/// only works for functions that accept either that all arguments are scalars or all arguments
/// are arrays with same length. Otherwise, it will return an error.
#[deprecated(since = "36.0.0", note = "Use ColumarValue::values_to_arrays instead")]
pub fn columnar_values_to_array(args: &[ColumnarValue]) -> Result<Vec<ArrayRef>> {
if args.is_empty() {
return Ok(vec![]);
}

let len = args
.iter()
.fold(Option::<usize>::None, |acc, arg| match arg {
ColumnarValue::Scalar(_) if acc.is_none() => Some(1),
ColumnarValue::Scalar(_) => {
if let Some(1) = acc {
acc
} else {
None
}
}
ColumnarValue::Array(a) => {
if let Some(l) = acc {
if l == a.len() {
acc
} else {
None
}
} else {
Some(a.len())
}
}
});

let inferred_length = len.ok_or(DataFusionError::Internal(
"Arguments has mixed length".to_string(),
))?;

let args = args
.iter()
.map(|arg| arg.clone().into_array(inferred_length))
.collect::<Result<Vec<_>>>()?;

Ok(args)
ColumnarValue::values_to_arrays(args)
}

/// Decorates a function to handle [`ScalarValue`]s by converting them to arrays before calling the function
Expand Down

0 comments on commit dfb6435

Please sign in to comment.