From 1adfcde32e62552bb149828acce9bb93bbbd3f47 Mon Sep 17 00:00:00 2001 From: jayzhan211 Date: Tue, 23 Apr 2024 09:04:58 +0800 Subject: [PATCH] all done Signed-off-by: jayzhan211 --- .../src/expressions/case.rs | 45 +- .../src/expressions/mod.rs | 2 + .../src/expressions/no_op.rs | 2 +- .../physical-expr-common/src/physical_expr.rs | 115 +++-- .../physical-expr/src/expressions/mod.rs | 6 +- datafusion/physical-expr/src/planner.rs | 414 +----------------- 6 files changed, 108 insertions(+), 476 deletions(-) rename datafusion/{physical-expr => physical-expr-common}/src/expressions/case.rs (97%) rename datafusion/{physical-expr => physical-expr-common}/src/expressions/no_op.rs (98%) diff --git a/datafusion/physical-expr/src/expressions/case.rs b/datafusion/physical-expr-common/src/expressions/case.rs similarity index 97% rename from datafusion/physical-expr/src/expressions/case.rs rename to datafusion/physical-expr-common/src/expressions/case.rs index e376d3e7bbac3..683e29f8b08b6 100644 --- a/datafusion/physical-expr/src/expressions/case.rs +++ b/datafusion/physical-expr-common/src/expressions/case.rs @@ -19,9 +19,10 @@ use std::borrow::Cow; use std::hash::{Hash, Hasher}; use std::{any::Any, sync::Arc}; -use crate::expressions::{try_cast, NoOp}; +use crate::expressions::no_op::NoOp; +use crate::expressions::try_cast::try_cast; use crate::physical_expr::down_cast_any_ref; -use crate::PhysicalExpr; +use crate::physical_expr::PhysicalExpr; use arrow::array::*; use arrow::compute::kernels::cmp::eq; @@ -414,7 +415,11 @@ pub fn case( #[cfg(test)] mod tests { use super::*; - use crate::expressions::{binary, cast, col, lit}; + + use crate::expressions::binary::binary; + use crate::expressions::cast::cast; + use crate::expressions::column::col; + use crate::expressions::literal::{lit, Literal}; use arrow::array::StringArray; use arrow::buffer::Buffer; @@ -959,16 +964,15 @@ mod tests { let expr2 = expr .clone() .transform(|e| { - let transformed = - match e.as_any().downcast_ref::() { - Some(lit_value) => match lit_value.value() { - ScalarValue::Utf8(Some(str_value)) => { - Some(lit(str_value.to_uppercase())) - } - _ => None, - }, + let transformed = match e.as_any().downcast_ref::() { + Some(lit_value) => match lit_value.value() { + ScalarValue::Utf8(Some(str_value)) => { + Some(lit(str_value.to_uppercase())) + } _ => None, - }; + }, + _ => None, + }; Ok(if let Some(transformed) = transformed { Transformed::yes(transformed) } else { @@ -981,16 +985,15 @@ mod tests { let expr3 = expr .clone() .transform_down(|e| { - let transformed = - match e.as_any().downcast_ref::() { - Some(lit_value) => match lit_value.value() { - ScalarValue::Utf8(Some(str_value)) => { - Some(lit(str_value.to_uppercase())) - } - _ => None, - }, + let transformed = match e.as_any().downcast_ref::() { + Some(lit_value) => match lit_value.value() { + ScalarValue::Utf8(Some(str_value)) => { + Some(lit(str_value.to_uppercase())) + } _ => None, - }; + }, + _ => None, + }; Ok(if let Some(transformed) = transformed { Transformed::yes(transformed) } else { diff --git a/datafusion/physical-expr-common/src/expressions/mod.rs b/datafusion/physical-expr-common/src/expressions/mod.rs index 48fd2a1a07a34..a6da353bf25f9 100644 --- a/datafusion/physical-expr-common/src/expressions/mod.rs +++ b/datafusion/physical-expr-common/src/expressions/mod.rs @@ -19,6 +19,7 @@ #[macro_use] pub mod binary; +pub mod case; pub mod cast; pub mod column; pub mod datum; @@ -28,5 +29,6 @@ pub mod is_null; pub mod like; pub mod literal; pub mod negative; +pub mod no_op; pub mod not; pub mod try_cast; diff --git a/datafusion/physical-expr/src/expressions/no_op.rs b/datafusion/physical-expr-common/src/expressions/no_op.rs similarity index 98% rename from datafusion/physical-expr/src/expressions/no_op.rs rename to datafusion/physical-expr-common/src/expressions/no_op.rs index b558ccab154d5..109e28d7c0f86 100644 --- a/datafusion/physical-expr/src/expressions/no_op.rs +++ b/datafusion/physical-expr-common/src/expressions/no_op.rs @@ -27,7 +27,7 @@ use arrow::{ }; use crate::physical_expr::down_cast_any_ref; -use crate::PhysicalExpr; +use crate::physical_expr::PhysicalExpr; use datafusion_common::{internal_err, Result}; use datafusion_expr::ColumnarValue; diff --git a/datafusion/physical-expr-common/src/physical_expr.rs b/datafusion/physical-expr-common/src/physical_expr.rs index d84f86fee6061..73d7de32183c1 100644 --- a/datafusion/physical-expr-common/src/physical_expr.rs +++ b/datafusion/physical-expr-common/src/physical_expr.rs @@ -39,6 +39,7 @@ use datafusion_expr::{ }; use crate::expressions::binary::binary; +use crate::expressions::case; use crate::expressions::cast::cast; use crate::expressions::column::Column; use crate::expressions::in_list::in_list; @@ -332,7 +333,6 @@ pub fn physical_exprs_equal( /// * `e` - The logical expression /// * `input_dfschema` - The DataFusion schema for the input, used to resolve `Column` references /// to qualified or unqualified fields by name. -#[allow(clippy::only_used_in_recursion)] pub fn create_physical_expr( e: &Expr, input_dfschema: &DFSchema, @@ -451,43 +451,43 @@ pub fn create_physical_expr( input_schema, ) } - // Expr::Case(case) => { - // let expr: Option> = if let Some(e) = &case.expr { - // Some(create_physical_expr( - // e.as_ref(), - // input_dfschema, - // execution_props, - // )?) - // } else { - // None - // }; - // let (when_expr, then_expr): (Vec<&Expr>, Vec<&Expr>) = case - // .when_then_expr - // .iter() - // .map(|(w, t)| (w.as_ref(), t.as_ref())) - // .unzip(); - // let when_expr = - // create_physical_exprs(when_expr, input_dfschema, execution_props)?; - // let then_expr = - // create_physical_exprs(then_expr, input_dfschema, execution_props)?; - // let when_then_expr: Vec<(Arc, Arc)> = - // when_expr - // .iter() - // .zip(then_expr.iter()) - // .map(|(w, t)| (w.clone(), t.clone())) - // .collect(); - // let else_expr: Option> = - // if let Some(e) = &case.else_expr { - // Some(create_physical_expr( - // e.as_ref(), - // input_dfschema, - // execution_props, - // )?) - // } else { - // None - // }; - // Ok(expressions::case(expr, when_then_expr, else_expr)?) - // } + Expr::Case(case) => { + let expr: Option> = if let Some(e) = &case.expr { + Some(create_physical_expr( + e.as_ref(), + input_dfschema, + execution_props, + )?) + } else { + None + }; + let (when_expr, then_expr): (Vec<&Expr>, Vec<&Expr>) = case + .when_then_expr + .iter() + .map(|(w, t)| (w.as_ref(), t.as_ref())) + .unzip(); + let when_expr = + create_physical_exprs(when_expr, input_dfschema, execution_props)?; + let then_expr = + create_physical_exprs(then_expr, input_dfschema, execution_props)?; + let when_then_expr: Vec<(Arc, Arc)> = + when_expr + .iter() + .zip(then_expr.iter()) + .map(|(w, t)| (w.clone(), t.clone())) + .collect(); + let else_expr: Option> = + if let Some(e) = &case.else_expr { + Some(create_physical_expr( + e.as_ref(), + input_dfschema, + execution_props, + )?) + } else { + None + }; + Ok(case::case(expr, when_then_expr, else_expr)?) + } Expr::Cast(Cast { expr, data_type }) => cast( create_physical_expr(expr, input_dfschema, execution_props)?, input_schema, @@ -604,3 +604,42 @@ where .map(|expr| create_physical_expr(expr, input_dfschema, execution_props)) .collect::>>() } + +#[cfg(test)] +mod tests { + use std::sync::Arc; + + use arrow::array::{ArrayRef, BooleanArray, RecordBatch, StringArray}; + use arrow::datatypes::{DataType, Field, Schema}; + + use datafusion_common::{DFSchema, Result}; + use datafusion_expr::execution_props::ExecutionProps; + use datafusion_expr::{col, lit}; + + use super::create_physical_expr; + + #[test] + fn test_create_physical_expr_scalar_input_output() -> Result<()> { + let expr = col("letter").eq(lit("A")); + + let schema = Schema::new(vec![Field::new("letter", DataType::Utf8, false)]); + let df_schema = DFSchema::try_from_qualified_schema("data", &schema)?; + let p = create_physical_expr(&expr, &df_schema, &ExecutionProps::new())?; + + let batch = RecordBatch::try_new( + Arc::new(schema), + vec![Arc::new(StringArray::from_iter_values(vec![ + "A", "B", "C", "D", + ]))], + )?; + let result = p.evaluate(&batch)?; + let result = result.into_array(4).expect("Failed to convert to array"); + + assert_eq!( + &result, + &(Arc::new(BooleanArray::from(vec![true, false, false, false,])) as ArrayRef) + ); + + Ok(()) + } +} diff --git a/datafusion/physical-expr/src/expressions/mod.rs b/datafusion/physical-expr/src/expressions/mod.rs index 50cad65b829dc..93fbcdfff7d25 100644 --- a/datafusion/physical-expr/src/expressions/mod.rs +++ b/datafusion/physical-expr/src/expressions/mod.rs @@ -17,9 +17,7 @@ //! Defines physical expressions that can evaluated at runtime during query execution -mod case; mod column; -mod no_op; /// Module with some convenient methods used in expression building pub mod helpers { @@ -67,10 +65,10 @@ pub use datafusion_functions_aggregate::first_last::{ FirstValuePhysicalExpr as FirstValue, LastValuePhysicalExpr as LastValue, }; -pub use case::{case, CaseExpr}; pub use column::UnKnownColumn; pub use datafusion_expr::utils::format_state_name; pub use datafusion_physical_expr_common::expressions::binary::{binary, BinaryExpr}; +pub use datafusion_physical_expr_common::expressions::case::{case, CaseExpr}; pub use datafusion_physical_expr_common::expressions::cast::{ cast, cast_with_options, CastExpr, }; @@ -85,9 +83,9 @@ pub use datafusion_physical_expr_common::expressions::literal::{lit, Literal}; pub use datafusion_physical_expr_common::expressions::negative::{ negative, NegativeExpr, }; +pub use datafusion_physical_expr_common::expressions::no_op::NoOp; pub use datafusion_physical_expr_common::expressions::not::{not, NotExpr}; pub use datafusion_physical_expr_common::expressions::try_cast::{try_cast, TryCastExpr}; -pub use no_op::NoOp; #[cfg(test)] pub(crate) mod tests { diff --git a/datafusion/physical-expr/src/planner.rs b/datafusion/physical-expr/src/planner.rs index 285877ee9ef6f..ac25bf28a44aa 100644 --- a/datafusion/physical-expr/src/planner.rs +++ b/datafusion/physical-expr/src/planner.rs @@ -15,415 +15,5 @@ // specific language governing permissions and limitations // under the License. -use std::sync::Arc; - -use arrow::datatypes::Schema; - -use datafusion_common::{ - exec_err, internal_err, not_impl_err, plan_err, DFSchema, Result, ScalarValue, -}; -use datafusion_expr::execution_props::ExecutionProps; -use datafusion_expr::expr::{Alias, Cast, InList, ScalarFunction}; -use datafusion_expr::var_provider::is_system_variables; -use datafusion_expr::var_provider::VarType; -use datafusion_expr::{ - binary_expr, Between, BinaryExpr, Expr, GetFieldAccess, GetIndexedField, Like, - Operator, ScalarFunctionDefinition, TryCast, -}; - -use crate::{ - expressions::{self, binary, like, Column, Literal}, - PhysicalExpr, -}; -use datafusion_physical_expr_common::udf; - -/// [PhysicalExpr] evaluate DataFusion expressions such as `A + 1`, or `CAST(c1 -/// AS int)`. -/// -/// [PhysicalExpr] are the physical counterpart to [Expr] used in logical -/// planning, and can be evaluated directly on a [RecordBatch]. They are -/// normally created from [Expr] by a [PhysicalPlanner] and can be created -/// directly using [create_physical_expr]. -/// -/// A Physical expression knows its type, nullability and how to evaluate itself. -/// -/// [PhysicalPlanner]: https://docs.rs/datafusion/latest/datafusion/physical_planner/trait.PhysicalPlanner.html -/// [RecordBatch]: https://docs.rs/arrow/latest/arrow/record_batch/struct.RecordBatch.html -/// -/// # Example: Create `PhysicalExpr` from `Expr` -/// ``` -/// # use arrow::datatypes::{DataType, Field, Schema}; -/// # use datafusion_common::DFSchema; -/// # use datafusion_expr::{Expr, col, lit}; -/// # use datafusion_physical_expr::create_physical_expr; -/// # use datafusion_expr::execution_props::ExecutionProps; -/// // For a logical expression `a = 1`, we can create a physical expression -/// let expr = col("a").eq(lit(1)); -/// // To create a PhysicalExpr we need 1. a schema -/// let schema = Schema::new(vec![Field::new("a", DataType::Int32, true)]); -/// let df_schema = DFSchema::try_from(schema).unwrap(); -/// // 2. ExecutionProps -/// let props = ExecutionProps::new(); -/// // We can now create a PhysicalExpr: -/// let physical_expr = create_physical_expr(&expr, &df_schema, &props).unwrap(); -/// ``` -/// -/// # Example: Executing a PhysicalExpr to obtain [ColumnarValue] -/// ``` -/// # use std::sync::Arc; -/// # use arrow::array::{cast::AsArray, BooleanArray, Int32Array, RecordBatch}; -/// # use arrow::datatypes::{DataType, Field, Schema}; -/// # use datafusion_common::{assert_batches_eq, DFSchema}; -/// # use datafusion_expr::{Expr, col, lit, ColumnarValue}; -/// # use datafusion_physical_expr::create_physical_expr; -/// # use datafusion_expr::execution_props::ExecutionProps; -/// # let expr = col("a").eq(lit(1)); -/// # let schema = Schema::new(vec![Field::new("a", DataType::Int32, true)]); -/// # let df_schema = DFSchema::try_from(schema.clone()).unwrap(); -/// # let props = ExecutionProps::new(); -/// // Given a PhysicalExpr, for `a = 1` we can evaluate it against a RecordBatch like this: -/// let physical_expr = create_physical_expr(&expr, &df_schema, &props).unwrap(); -/// // Input of [1,2,3] -/// let input_batch = RecordBatch::try_from_iter(vec![ -/// ("a", Arc::new(Int32Array::from(vec![1, 2, 3])) as _) -/// ]).unwrap(); -/// // The result is a ColumnarValue (either an Array or a Scalar) -/// let result = physical_expr.evaluate(&input_batch).unwrap(); -/// // In this case, a BooleanArray with the result of the comparison -/// let ColumnarValue::Array(arr) = result else { -/// panic!("Expected an array") -/// }; -/// assert_eq!(arr.as_boolean(), &BooleanArray::from(vec![true, false, false])); -/// ``` -/// -/// [ColumnarValue]: datafusion_expr::ColumnarValue -/// -/// Create a physical expression from a logical expression ([Expr]). -/// -/// # Arguments -/// -/// * `e` - The logical expression -/// * `input_dfschema` - The DataFusion schema for the input, used to resolve `Column` references -/// to qualified or unqualified fields by name. -pub fn create_physical_expr( - e: &Expr, - input_dfschema: &DFSchema, - execution_props: &ExecutionProps, -) -> Result> { - use datafusion_physical_expr_common::physical_expr::create_physical_expr as create_physical_expr_common; - - // PR #10074: Temporary solution, after all the logic is moved to common, we can remove this function - let res = create_physical_expr_common(e, input_dfschema, execution_props); - if res.is_ok() { - return res; - } - - let input_schema: &Schema = &input_dfschema.into(); - - match e { - Expr::Alias(Alias { expr, .. }) => { - Ok(create_physical_expr(expr, input_dfschema, execution_props)?) - } - Expr::Column(c) => { - let idx = input_dfschema.index_of_column(c)?; - Ok(Arc::new(Column::new(&c.name, idx))) - } - Expr::Literal(value) => Ok(Arc::new(Literal::new(value.clone()))), - Expr::ScalarVariable(_, variable_names) => { - if is_system_variables(variable_names) { - match execution_props.get_var_provider(VarType::System) { - Some(provider) => { - let scalar_value = provider.get_value(variable_names.clone())?; - Ok(Arc::new(Literal::new(scalar_value))) - } - _ => plan_err!("No system variable provider found"), - } - } else { - match execution_props.get_var_provider(VarType::UserDefined) { - Some(provider) => { - let scalar_value = provider.get_value(variable_names.clone())?; - Ok(Arc::new(Literal::new(scalar_value))) - } - _ => plan_err!("No user defined variable provider found"), - } - } - } - Expr::IsTrue(expr) => { - let binary_op = binary_expr( - expr.as_ref().clone(), - Operator::IsNotDistinctFrom, - Expr::Literal(ScalarValue::Boolean(Some(true))), - ); - create_physical_expr(&binary_op, input_dfschema, execution_props) - } - Expr::IsNotTrue(expr) => { - let binary_op = binary_expr( - expr.as_ref().clone(), - Operator::IsDistinctFrom, - Expr::Literal(ScalarValue::Boolean(Some(true))), - ); - create_physical_expr(&binary_op, input_dfschema, execution_props) - } - Expr::IsFalse(expr) => { - let binary_op = binary_expr( - expr.as_ref().clone(), - Operator::IsNotDistinctFrom, - Expr::Literal(ScalarValue::Boolean(Some(false))), - ); - create_physical_expr(&binary_op, input_dfschema, execution_props) - } - Expr::IsNotFalse(expr) => { - let binary_op = binary_expr( - expr.as_ref().clone(), - Operator::IsDistinctFrom, - Expr::Literal(ScalarValue::Boolean(Some(false))), - ); - create_physical_expr(&binary_op, input_dfschema, execution_props) - } - Expr::IsUnknown(expr) => { - let binary_op = binary_expr( - expr.as_ref().clone(), - Operator::IsNotDistinctFrom, - Expr::Literal(ScalarValue::Boolean(None)), - ); - create_physical_expr(&binary_op, input_dfschema, execution_props) - } - Expr::IsNotUnknown(expr) => { - let binary_op = binary_expr( - expr.as_ref().clone(), - Operator::IsDistinctFrom, - Expr::Literal(ScalarValue::Boolean(None)), - ); - create_physical_expr(&binary_op, input_dfschema, execution_props) - } - Expr::BinaryExpr(BinaryExpr { left, op, right }) => { - // Create physical expressions for left and right operands - let lhs = create_physical_expr(left, input_dfschema, execution_props)?; - let rhs = create_physical_expr(right, input_dfschema, execution_props)?; - // Note that the logical planner is responsible - // for type coercion on the arguments (e.g. if one - // argument was originally Int32 and one was - // Int64 they will both be coerced to Int64). - // - // There should be no coercion during physical - // planning. - binary(lhs, *op, rhs, input_schema) - } - Expr::Like(Like { - negated, - expr, - pattern, - escape_char, - case_insensitive, - }) => { - if escape_char.is_some() { - return exec_err!("LIKE does not support escape_char"); - } - let physical_expr = - create_physical_expr(expr, input_dfschema, execution_props)?; - let physical_pattern = - create_physical_expr(pattern, input_dfschema, execution_props)?; - like( - *negated, - *case_insensitive, - physical_expr, - physical_pattern, - input_schema, - ) - } - Expr::Case(case) => { - let expr: Option> = if let Some(e) = &case.expr { - Some(create_physical_expr( - e.as_ref(), - input_dfschema, - execution_props, - )?) - } else { - None - }; - let (when_expr, then_expr): (Vec<&Expr>, Vec<&Expr>) = case - .when_then_expr - .iter() - .map(|(w, t)| (w.as_ref(), t.as_ref())) - .unzip(); - let when_expr = - create_physical_exprs(when_expr, input_dfschema, execution_props)?; - let then_expr = - create_physical_exprs(then_expr, input_dfschema, execution_props)?; - let when_then_expr: Vec<(Arc, Arc)> = - when_expr - .iter() - .zip(then_expr.iter()) - .map(|(w, t)| (w.clone(), t.clone())) - .collect(); - let else_expr: Option> = - if let Some(e) = &case.else_expr { - Some(create_physical_expr( - e.as_ref(), - input_dfschema, - execution_props, - )?) - } else { - None - }; - Ok(expressions::case(expr, when_then_expr, else_expr)?) - } - Expr::Cast(Cast { expr, data_type }) => expressions::cast( - create_physical_expr(expr, input_dfschema, execution_props)?, - input_schema, - data_type.clone(), - ), - Expr::TryCast(TryCast { expr, data_type }) => expressions::try_cast( - create_physical_expr(expr, input_dfschema, execution_props)?, - input_schema, - data_type.clone(), - ), - Expr::Not(expr) => { - expressions::not(create_physical_expr(expr, input_dfschema, execution_props)?) - } - Expr::Negative(expr) => expressions::negative( - create_physical_expr(expr, input_dfschema, execution_props)?, - input_schema, - ), - Expr::IsNull(expr) => expressions::is_null(create_physical_expr( - expr, - input_dfschema, - execution_props, - )?), - Expr::IsNotNull(expr) => expressions::is_not_null(create_physical_expr( - expr, - input_dfschema, - execution_props, - )?), - Expr::GetIndexedField(GetIndexedField { expr: _, field }) => match field { - GetFieldAccess::NamedStructField { name: _ } => { - internal_err!( - "NamedStructField should be rewritten in OperatorToFunction" - ) - } - GetFieldAccess::ListIndex { key: _ } => { - internal_err!("ListIndex should be rewritten in OperatorToFunction") - } - GetFieldAccess::ListRange { - start: _, - stop: _, - stride: _, - } => { - internal_err!("ListRange should be rewritten in OperatorToFunction") - } - }, - - Expr::ScalarFunction(ScalarFunction { func_def, args }) => { - let physical_args = - create_physical_exprs(args, input_dfschema, execution_props)?; - - match func_def { - ScalarFunctionDefinition::UDF(fun) => udf::create_physical_expr( - fun.clone().as_ref(), - &physical_args, - input_schema, - args, - input_dfschema, - ), - ScalarFunctionDefinition::Name(_) => { - internal_err!("Function `Expr` with name should be resolved.") - } - } - } - Expr::Between(Between { - expr, - negated, - low, - high, - }) => { - let value_expr = create_physical_expr(expr, input_dfschema, execution_props)?; - let low_expr = create_physical_expr(low, input_dfschema, execution_props)?; - let high_expr = create_physical_expr(high, input_dfschema, execution_props)?; - - // rewrite the between into the two binary operators - let binary_expr = binary( - binary(value_expr.clone(), Operator::GtEq, low_expr, input_schema)?, - Operator::And, - binary(value_expr.clone(), Operator::LtEq, high_expr, input_schema)?, - input_schema, - ); - - if *negated { - expressions::not(binary_expr?) - } else { - binary_expr - } - } - Expr::InList(InList { - expr, - list, - negated, - }) => match expr.as_ref() { - Expr::Literal(ScalarValue::Utf8(None)) => { - Ok(expressions::lit(ScalarValue::Boolean(None))) - } - _ => { - let value_expr = - create_physical_expr(expr, input_dfschema, execution_props)?; - - let list_exprs = - create_physical_exprs(list, input_dfschema, execution_props)?; - expressions::in_list(value_expr, list_exprs, negated, input_schema) - } - }, - other => { - not_impl_err!("Physical plan does not support logical expression {other:?}") - } - } -} - -/// Create vector of Physical Expression from a vector of logical expression -pub fn create_physical_exprs<'a, I>( - exprs: I, - input_dfschema: &DFSchema, - execution_props: &ExecutionProps, -) -> Result>> -where - I: IntoIterator, -{ - exprs - .into_iter() - .map(|expr| create_physical_expr(expr, input_dfschema, execution_props)) - .collect::>>() -} - -#[cfg(test)] -mod tests { - use arrow_array::{ArrayRef, BooleanArray, RecordBatch, StringArray}; - use arrow_schema::{DataType, Field, Schema}; - - use datafusion_common::{DFSchema, Result}; - use datafusion_expr::{col, lit}; - - use super::*; - - #[test] - fn test_create_physical_expr_scalar_input_output() -> Result<()> { - let expr = col("letter").eq(lit("A")); - - let schema = Schema::new(vec![Field::new("letter", DataType::Utf8, false)]); - let df_schema = DFSchema::try_from_qualified_schema("data", &schema)?; - let p = create_physical_expr(&expr, &df_schema, &ExecutionProps::new())?; - - let batch = RecordBatch::try_new( - Arc::new(schema), - vec![Arc::new(StringArray::from_iter_values(vec![ - "A", "B", "C", "D", - ]))], - )?; - let result = p.evaluate(&batch)?; - let result = result.into_array(4).expect("Failed to convert to array"); - - assert_eq!( - &result, - &(Arc::new(BooleanArray::from(vec![true, false, false, false,])) as ArrayRef) - ); - - Ok(()) - } -} +pub use datafusion_physical_expr_common::physical_expr::create_physical_expr; +pub use datafusion_physical_expr_common::physical_expr::create_physical_exprs;