diff --git a/datafusion-examples/examples/advanced_udaf.rs b/datafusion-examples/examples/advanced_udaf.rs index 4b8ca57e4b40..cf284472212f 100644 --- a/datafusion-examples/examples/advanced_udaf.rs +++ b/datafusion-examples/examples/advanced_udaf.rs @@ -31,7 +31,7 @@ use datafusion::error::Result; use datafusion::prelude::*; use datafusion_common::{cast::as_float64_array, ScalarValue}; use datafusion_expr::{ - function::{AccumulatorArgs, GroupsAccumulatorSupportedArgs, StateFieldsArgs}, + function::{AccumulatorArgs, StateFieldsArgs}, Accumulator, AggregateUDF, AggregateUDFImpl, GroupsAccumulator, Signature, }; @@ -101,10 +101,7 @@ impl AggregateUDFImpl for GeoMeanUdaf { /// Tell DataFusion that this aggregate supports the more performant `GroupsAccumulator` /// which is used for cases when there are grouping columns in the query - fn groups_accumulator_supported( - &self, - _args: GroupsAccumulatorSupportedArgs, - ) -> bool { + fn groups_accumulator_supported(&self, _args: AccumulatorArgs) -> bool { true } diff --git a/datafusion-examples/examples/simplify_udaf_expression.rs b/datafusion-examples/examples/simplify_udaf_expression.rs index 8dcd6fb6424d..08b6bcab0190 100644 --- a/datafusion-examples/examples/simplify_udaf_expression.rs +++ b/datafusion-examples/examples/simplify_udaf_expression.rs @@ -17,9 +17,7 @@ use arrow_schema::{Field, Schema}; use datafusion::{arrow::datatypes::DataType, logical_expr::Volatility}; -use datafusion_expr::function::{ - AggregateFunctionSimplification, GroupsAccumulatorSupportedArgs, StateFieldsArgs, -}; +use datafusion_expr::function::{AggregateFunctionSimplification, StateFieldsArgs}; use datafusion_expr::simplify::SimplifyInfo; use std::{any::Any, sync::Arc}; @@ -76,10 +74,7 @@ impl AggregateUDFImpl for BetterAvgUdaf { unimplemented!("should not be invoked") } - fn groups_accumulator_supported( - &self, - _args: GroupsAccumulatorSupportedArgs, - ) -> bool { + fn groups_accumulator_supported(&self, _args: AccumulatorArgs) -> bool { true } diff --git a/datafusion/core/src/datasource/file_format/arrow.rs b/datafusion/core/src/datasource/file_format/arrow.rs index 9d58465191e1..8c6790541597 100644 --- a/datafusion/core/src/datasource/file_format/arrow.rs +++ b/datafusion/core/src/datasource/file_format/arrow.rs @@ -40,7 +40,7 @@ use arrow::ipc::reader::FileReader; use arrow::ipc::writer::IpcWriteOptions; use arrow::ipc::{root_as_message, CompressionType}; use arrow_schema::{ArrowError, Schema, SchemaRef}; -use datafusion_common::{not_impl_err, DataFusionError, FileType, Statistics}; +use datafusion_common::{not_impl_err, DataFusionError, Statistics}; use datafusion_execution::{SendableRecordBatchStream, TaskContext}; use datafusion_physical_expr::{PhysicalExpr, PhysicalSortRequirement}; use datafusion_physical_plan::insert::{DataSink, DataSinkExec}; @@ -136,10 +136,6 @@ impl FileFormat for ArrowFormat { order_requirements, )) as _) } - - fn file_type(&self) -> FileType { - FileType::ARROW - } } /// Implements [`DataSink`] for writing to arrow_ipc files diff --git a/datafusion/core/src/datasource/file_format/avro.rs b/datafusion/core/src/datasource/file_format/avro.rs index 132dae14c684..7b2c26a2c4f9 100644 --- a/datafusion/core/src/datasource/file_format/avro.rs +++ b/datafusion/core/src/datasource/file_format/avro.rs @@ -23,7 +23,6 @@ use std::sync::Arc; use arrow::datatypes::Schema; use arrow::datatypes::SchemaRef; use async_trait::async_trait; -use datafusion_common::FileType; use datafusion_physical_expr::PhysicalExpr; use object_store::{GetResultPayload, ObjectMeta, ObjectStore}; @@ -89,10 +88,6 @@ impl FileFormat for AvroFormat { let exec = AvroExec::new(conf); Ok(Arc::new(exec)) } - - fn file_type(&self) -> FileType { - FileType::AVRO - } } #[cfg(test)] diff --git a/datafusion/core/src/datasource/file_format/csv.rs b/datafusion/core/src/datasource/file_format/csv.rs index 17bc7aafce85..ae5ac52025cf 100644 --- a/datafusion/core/src/datasource/file_format/csv.rs +++ b/datafusion/core/src/datasource/file_format/csv.rs @@ -42,7 +42,7 @@ use arrow::datatypes::SchemaRef; use arrow::datatypes::{DataType, Field, Fields, Schema}; use datafusion_common::config::CsvOptions; use datafusion_common::file_options::csv_writer::CsvWriterOptions; -use datafusion_common::{exec_err, not_impl_err, DataFusionError, FileType}; +use datafusion_common::{exec_err, not_impl_err, DataFusionError}; use datafusion_execution::TaskContext; use datafusion_physical_expr::{PhysicalExpr, PhysicalSortRequirement}; use datafusion_physical_plan::metrics::MetricsSet; @@ -280,10 +280,6 @@ impl FileFormat for CsvFormat { order_requirements, )) as _) } - - fn file_type(&self) -> FileType { - FileType::CSV - } } impl CsvFormat { @@ -549,8 +545,9 @@ mod tests { use arrow::compute::concat_batches; use datafusion_common::cast::as_string_array; + use datafusion_common::internal_err; use datafusion_common::stats::Precision; - use datafusion_common::{internal_err, GetExt}; + use datafusion_common::{FileType, GetExt}; use datafusion_execution::runtime_env::{RuntimeConfig, RuntimeEnv}; use datafusion_expr::{col, lit}; diff --git a/datafusion/core/src/datasource/file_format/json.rs b/datafusion/core/src/datasource/file_format/json.rs index 9f526e1c87b4..6e6c79848594 100644 --- a/datafusion/core/src/datasource/file_format/json.rs +++ b/datafusion/core/src/datasource/file_format/json.rs @@ -43,7 +43,7 @@ use arrow::json::reader::{infer_json_schema_from_iterator, ValueIter}; use arrow_array::RecordBatch; use datafusion_common::config::JsonOptions; use datafusion_common::file_options::json_writer::JsonWriterOptions; -use datafusion_common::{not_impl_err, FileType}; +use datafusion_common::not_impl_err; use datafusion_execution::TaskContext; use datafusion_physical_expr::{PhysicalExpr, PhysicalSortRequirement}; use datafusion_physical_plan::metrics::MetricsSet; @@ -184,10 +184,6 @@ impl FileFormat for JsonFormat { order_requirements, )) as _) } - - fn file_type(&self) -> FileType { - FileType::JSON - } } impl Default for JsonSerializer { diff --git a/datafusion/core/src/datasource/file_format/mod.rs b/datafusion/core/src/datasource/file_format/mod.rs index fdb89a264951..243a91b7437b 100644 --- a/datafusion/core/src/datasource/file_format/mod.rs +++ b/datafusion/core/src/datasource/file_format/mod.rs @@ -41,7 +41,7 @@ use crate::error::Result; use crate::execution::context::SessionState; use crate::physical_plan::{ExecutionPlan, Statistics}; -use datafusion_common::{not_impl_err, FileType}; +use datafusion_common::not_impl_err; use datafusion_physical_expr::{PhysicalExpr, PhysicalSortRequirement}; use async_trait::async_trait; @@ -104,9 +104,6 @@ pub trait FileFormat: Send + Sync + fmt::Debug { ) -> Result> { not_impl_err!("Writer not implemented for this format") } - - /// Returns the FileType corresponding to this FileFormat - fn file_type(&self) -> FileType; } #[cfg(test)] diff --git a/datafusion/core/src/datasource/file_format/parquet.rs b/datafusion/core/src/datasource/file_format/parquet.rs index fa379eb5b445..8182ced6f228 100644 --- a/datafusion/core/src/datasource/file_format/parquet.rs +++ b/datafusion/core/src/datasource/file_format/parquet.rs @@ -47,7 +47,7 @@ use datafusion_common::config::TableParquetOptions; use datafusion_common::file_options::parquet_writer::ParquetWriterOptions; use datafusion_common::stats::Precision; use datafusion_common::{ - exec_err, internal_datafusion_err, not_impl_err, DataFusionError, FileType, + exec_err, internal_datafusion_err, not_impl_err, DataFusionError, }; use datafusion_common_runtime::SpawnedTask; use datafusion_execution::TaskContext; @@ -286,10 +286,6 @@ impl FileFormat for ParquetFormat { order_requirements, )) as _) } - - fn file_type(&self) -> FileType { - FileType::PARQUET - } } fn summarize_min_max( diff --git a/datafusion/core/src/physical_optimizer/optimizer.rs b/datafusion/core/src/physical_optimizer/optimizer.rs index 08cbf68fa617..416985983dfe 100644 --- a/datafusion/core/src/physical_optimizer/optimizer.rs +++ b/datafusion/core/src/physical_optimizer/optimizer.rs @@ -112,11 +112,6 @@ impl PhysicalOptimizer { // Remove the ancillary output requirement operator since we are done with the planning // phase. Arc::new(OutputRequirements::new_remove_mode()), - // The PipelineChecker rule will reject non-runnable query plans that use - // pipeline-breaking operators on infinite input(s). The rule generates a - // diagnostic error message when this happens. It makes no changes to the - // given query plan; i.e. it only acts as a final gatekeeping rule. - Arc::new(PipelineChecker::new()), // The aggregation limiter will try to find situations where the accumulator count // is not tied to the cardinality, i.e. when the output of the aggregation is passed // into an `order by max(x) limit y`. In this case it will copy the limit value down @@ -129,6 +124,11 @@ impl PhysicalOptimizer { // are not present, the load of executors such as join or union will be // reduced by narrowing their input tables. Arc::new(ProjectionPushdown::new()), + // The PipelineChecker rule will reject non-runnable query plans that use + // pipeline-breaking operators on infinite input(s). The rule generates a + // diagnostic error message when this happens. It makes no changes to the + // given query plan; i.e. it only acts as a final gatekeeping rule. + Arc::new(PipelineChecker::new()), ]; Self::with_rules(rules) diff --git a/datafusion/core/tests/fuzz_cases/window_fuzz.rs b/datafusion/core/tests/fuzz_cases/window_fuzz.rs index 2514324a9541..fe0c408dc114 100644 --- a/datafusion/core/tests/fuzz_cases/window_fuzz.rs +++ b/datafusion/core/tests/fuzz_cases/window_fuzz.rs @@ -22,11 +22,10 @@ use arrow::compute::{concat_batches, SortOptions}; use arrow::datatypes::SchemaRef; use arrow::record_batch::RecordBatch; use arrow::util::pretty::pretty_format_batches; -use arrow_schema::{Field, Schema}; use datafusion::physical_plan::memory::MemoryExec; use datafusion::physical_plan::sorts::sort::SortExec; use datafusion::physical_plan::windows::{ - create_window_expr, BoundedWindowAggExec, WindowAggExec, + create_window_expr, schema_add_window_field, BoundedWindowAggExec, WindowAggExec, }; use datafusion::physical_plan::InputOrderMode::{Linear, PartiallySorted, Sorted}; use datafusion::physical_plan::{collect, InputOrderMode}; @@ -40,7 +39,6 @@ use datafusion_expr::{ }; use datafusion_physical_expr::expressions::{cast, col, lit}; use datafusion_physical_expr::{PhysicalExpr, PhysicalSortExpr}; -use itertools::Itertools; use test_utils::add_empty_batches; use hashbrown::HashMap; @@ -276,7 +274,7 @@ async fn bounded_window_causal_non_causal() -> Result<()> { }; let extended_schema = - schema_add_window_fields(&args, &schema, &window_fn, fn_name)?; + schema_add_window_field(&args, &schema, &window_fn, fn_name)?; let window_expr = create_window_expr( &window_fn, @@ -683,7 +681,7 @@ async fn run_window_test( exec1 = Arc::new(SortExec::new(sort_keys, exec1)) as _; } - let extended_schema = schema_add_window_fields(&args, &schema, &window_fn, &fn_name)?; + let extended_schema = schema_add_window_field(&args, &schema, &window_fn, &fn_name)?; let usual_window_exec = Arc::new(WindowAggExec::try_new( vec![create_window_expr( @@ -754,32 +752,6 @@ async fn run_window_test( Ok(()) } -// The planner has fully updated schema before calling the `create_window_expr` -// Replicate the same for this test -fn schema_add_window_fields( - args: &[Arc], - schema: &Arc, - window_fn: &WindowFunctionDefinition, - fn_name: &str, -) -> Result> { - let data_types = args - .iter() - .map(|e| e.clone().as_ref().data_type(schema)) - .collect::>>()?; - let window_expr_return_type = window_fn.return_type(&data_types)?; - let mut window_fields = schema - .fields() - .iter() - .map(|f| f.as_ref().clone()) - .collect_vec(); - window_fields.extend_from_slice(&[Field::new( - fn_name, - window_expr_return_type, - true, - )]); - Ok(Arc::new(Schema::new(window_fields))) -} - /// Return randomly sized record batches with: /// three sorted int32 columns 'a', 'b', 'c' ranged from 0..DISTINCT as columns /// one random int32 column x diff --git a/datafusion/core/tests/user_defined/user_defined_aggregates.rs b/datafusion/core/tests/user_defined/user_defined_aggregates.rs index 80526d812b92..5f5070b7748b 100644 --- a/datafusion/core/tests/user_defined/user_defined_aggregates.rs +++ b/datafusion/core/tests/user_defined/user_defined_aggregates.rs @@ -726,10 +726,7 @@ impl AggregateUDFImpl for TestGroupsAccumulator { panic!("accumulator shouldn't invoke"); } - fn groups_accumulator_supported( - &self, - _args: GroupsAccumulatorSupportedArgs, - ) -> bool { + fn groups_accumulator_supported(&self, _args: AccumulatorArgs) -> bool { true } diff --git a/datafusion/expr/src/function.rs b/datafusion/expr/src/function.rs index 466ffceeca8a..714cfa1af671 100644 --- a/datafusion/expr/src/function.rs +++ b/datafusion/expr/src/function.rs @@ -45,8 +45,10 @@ pub type ReturnTypeFunction = pub struct AccumulatorArgs<'a> { /// The return type of the aggregate function. pub data_type: &'a DataType, + /// The schema of the input arguments pub schema: &'a Schema, + /// Whether to ignore nulls. /// /// SQL allows the user to specify `IGNORE NULLS`, for example: @@ -67,42 +69,39 @@ pub struct AccumulatorArgs<'a> { /// /// If no `ORDER BY` is specified, `sort_exprs`` will be empty. pub sort_exprs: &'a [Expr], + + /// Whether the aggregate function is distinct. + /// + /// ```sql + /// SELECT COUNT(DISTINCT column1) FROM t; + /// ``` pub is_distinct: bool, - pub input_type: &'a DataType, -} -impl<'a> AccumulatorArgs<'a> { - pub fn new( - data_type: &'a DataType, - schema: &'a Schema, - ignore_nulls: bool, - sort_exprs: &'a [Expr], - is_distinct: bool, - input_type: &'a DataType, - ) -> Self { - Self { - data_type, - schema, - ignore_nulls, - sort_exprs, - is_distinct, - input_type, - } - } -} + /// The input type of the aggregate function. + pub input_type: &'a DataType, -/// [`GroupsAccumulatorSupportedArgs`] contains information to determine if an -/// aggregate function supports the groups accumulator. -pub struct GroupsAccumulatorSupportedArgs { + /// The number of arguments the aggregate function takes. pub args_num: usize, - pub is_distinct: bool, } +/// [`StateFieldsArgs`] contains information about the fields that an +/// aggregate function's accumulator should have. Used for [`AggregateUDFImpl::state_fields`]. +/// +/// [`AggregateUDFImpl::state_fields`]: crate::udaf::AggregateUDFImpl::state_fields pub struct StateFieldsArgs<'a> { + /// The name of the aggregate function. pub name: &'a str, + + /// The input type of the aggregate function. pub input_type: &'a DataType, + + /// The return type of the aggregate function. pub return_type: &'a DataType, + + /// The ordering fields of the aggregate function. pub ordering_fields: &'a [Field], + + /// Whether the aggregate function is distinct. pub is_distinct: bool, } diff --git a/datafusion/expr/src/udaf.rs b/datafusion/expr/src/udaf.rs index 1bdd3c7ef09e..f01d6a07d8ab 100644 --- a/datafusion/expr/src/udaf.rs +++ b/datafusion/expr/src/udaf.rs @@ -18,8 +18,7 @@ //! [`AggregateUDF`]: User Defined Aggregate Functions use crate::function::{ - AccumulatorArgs, AggregateFunctionSimplification, GroupsAccumulatorSupportedArgs, - StateFieldsArgs, + AccumulatorArgs, AggregateFunctionSimplification, StateFieldsArgs, }; use crate::groups_accumulator::GroupsAccumulator; use crate::utils::format_state_name; @@ -185,10 +184,7 @@ impl AggregateUDF { } /// See [`AggregateUDFImpl::groups_accumulator_supported`] for more details. - pub fn groups_accumulator_supported( - &self, - args: GroupsAccumulatorSupportedArgs, - ) -> bool { + pub fn groups_accumulator_supported(&self, args: AccumulatorArgs) -> bool { self.inner.groups_accumulator_supported(args) } @@ -239,7 +235,7 @@ where /// # use arrow::datatypes::DataType; /// # use datafusion_common::{DataFusionError, plan_err, Result}; /// # use datafusion_expr::{col, ColumnarValue, Signature, Volatility, Expr}; -/// # use datafusion_expr::{AggregateUDFImpl, AggregateUDF, Accumulator, function::AccumulatorArgs, function::StateFieldsArgs}; +/// # use datafusion_expr::{AggregateUDFImpl, AggregateUDF, Accumulator, function::{AccumulatorArgs, StateFieldsArgs}}; /// # use arrow::datatypes::Schema; /// # use arrow::datatypes::Field; /// #[derive(Debug, Clone)] @@ -349,10 +345,7 @@ pub trait AggregateUDFImpl: Debug + Send + Sync { /// `Self::accumulator` for certain queries, such as when this aggregate is /// used as a window function or when there no GROUP BY columns in the /// query. - fn groups_accumulator_supported( - &self, - _args: GroupsAccumulatorSupportedArgs, - ) -> bool { + fn groups_accumulator_supported(&self, _args: AccumulatorArgs) -> bool { false } @@ -398,6 +391,7 @@ pub trait AggregateUDFImpl: Debug + Send + Sync { None } + /// Returns the reverse expression of the aggregate function. fn reverse_expr(&self) -> ReversedUDAF { ReversedUDAF::NotSupported } diff --git a/datafusion/functions/src/datetime/common.rs b/datafusion/functions/src/datetime/common.rs index f0689ffd64e9..4f48ab188403 100644 --- a/datafusion/functions/src/datetime/common.rs +++ b/datafusion/functions/src/datetime/common.rs @@ -93,7 +93,9 @@ pub(crate) fn string_to_datetime_formatted( if let Err(e) = &dt { // no timezone or other failure, try without a timezone - let ndt = parsed.to_naive_datetime_with_offset(0); + let ndt = parsed + .to_naive_datetime_with_offset(0) + .or_else(|_| parsed.to_naive_date().map(|nd| nd.into())); if let Err(e) = &ndt { return Err(err(&e.to_string())); } diff --git a/datafusion/functions/src/datetime/to_timestamp.rs b/datafusion/functions/src/datetime/to_timestamp.rs index a7bcca62944c..af878b4505bc 100644 --- a/datafusion/functions/src/datetime/to_timestamp.rs +++ b/datafusion/functions/src/datetime/to_timestamp.rs @@ -670,6 +670,10 @@ mod tests { parse_timestamp_formatted("09-08-2020 13/42/29", "%m-%d-%Y %H/%M/%S") .unwrap() ); + assert_eq!( + 1642896000000000000, + parse_timestamp_formatted("2022-01-23", "%Y-%m-%d").unwrap() + ); } fn parse_timestamp_formatted(s: &str, format: &str) -> Result { diff --git a/datafusion/optimizer/src/analyzer/type_coercion.rs b/datafusion/optimizer/src/analyzer/type_coercion.rs index d2c5c7bc3055..e366e663e0ba 100644 --- a/datafusion/optimizer/src/analyzer/type_coercion.rs +++ b/datafusion/optimizer/src/analyzer/type_coercion.rs @@ -22,7 +22,7 @@ use std::sync::Arc; use arrow::datatypes::{DataType, IntervalUnit}; use datafusion_common::config::ConfigOptions; -use datafusion_common::tree_node::{Transformed, TreeNodeRewriter}; +use datafusion_common::tree_node::{Transformed, TreeNode, TreeNodeRewriter}; use datafusion_common::{ exec_err, internal_err, not_impl_err, plan_datafusion_err, plan_err, DFSchema, DataFusionError, Result, ScalarValue, @@ -31,8 +31,8 @@ use datafusion_expr::expr::{ self, AggregateFunctionDefinition, Between, BinaryExpr, Case, Exists, InList, InSubquery, Like, ScalarFunction, WindowFunction, }; -use datafusion_expr::expr_rewriter::rewrite_preserving_name; use datafusion_expr::expr_schema::cast_subquery; +use datafusion_expr::logical_plan::tree_node::unwrap_arc; use datafusion_expr::logical_plan::Subquery; use datafusion_expr::type_coercion::binary::{ comparison_coercion, get_input_types, like_coercion, @@ -52,6 +52,7 @@ use datafusion_expr::{ }; use crate::analyzer::AnalyzerRule; +use crate::utils::NamePreserver; #[derive(Default)] pub struct TypeCoercion {} @@ -68,26 +69,28 @@ impl AnalyzerRule for TypeCoercion { } fn analyze(&self, plan: LogicalPlan, _: &ConfigOptions) -> Result { - analyze_internal(&DFSchema::empty(), &plan) + let empty_schema = DFSchema::empty(); + + let transformed_plan = plan + .transform_up_with_subqueries(|plan| analyze_internal(&empty_schema, plan))? + .data; + + Ok(transformed_plan) } } +/// use the external schema to handle the correlated subqueries case +/// +/// Assumes that children have already been optimized fn analyze_internal( - // use the external schema to handle the correlated subqueries case external_schema: &DFSchema, - plan: &LogicalPlan, -) -> Result { - // optimize child plans first - let new_inputs = plan - .inputs() - .iter() - .map(|p| analyze_internal(external_schema, p)) - .collect::>>()?; + plan: LogicalPlan, +) -> Result> { // get schema representing all available input fields. This is used for data type // resolution only, so order does not matter here - let mut schema = merge_schema(new_inputs.iter().collect()); + let mut schema = merge_schema(plan.inputs()); - if let LogicalPlan::TableScan(ts) = plan { + if let LogicalPlan::TableScan(ts) = &plan { let source_schema = DFSchema::try_from_qualified_schema( ts.table_name.clone(), &ts.source.schema(), @@ -100,25 +103,75 @@ fn analyze_internal( // select t2.c2 from t1 where t1.c1 in (select t2.c1 from t2 where t2.c2=t1.c3) schema.merge(external_schema); - let mut expr_rewrite = TypeCoercionRewriter { schema: &schema }; - - let new_expr = plan - .expressions() - .into_iter() - .map(|expr| { - // ensure aggregate names don't change: - // https://github.com/apache/datafusion/issues/3555 - rewrite_preserving_name(expr, &mut expr_rewrite) - }) - .collect::>>()?; - - plan.with_new_exprs(new_expr, new_inputs) + let mut expr_rewrite = TypeCoercionRewriter::new(&schema); + + let name_preserver = NamePreserver::new(&plan); + // apply coercion rewrite all expressions in the plan individually + plan.map_expressions(|expr| { + let original_name = name_preserver.save(&expr)?; + expr.rewrite(&mut expr_rewrite)? + .map_data(|expr| original_name.restore(expr)) + })? + // coerce join expressions specially + .map_data(|plan| expr_rewrite.coerce_joins(plan))? + // recompute the schema after the expressions have been rewritten as the types may have changed + .map_data(|plan| plan.recompute_schema()) } pub(crate) struct TypeCoercionRewriter<'a> { pub(crate) schema: &'a DFSchema, } +impl<'a> TypeCoercionRewriter<'a> { + fn new(schema: &'a DFSchema) -> Self { + Self { schema } + } + + /// Coerce join equality expressions + /// + /// Joins must be treated specially as their equality expressions are stored + /// as a parallel list of left and right expressions, rather than a single + /// equality expression + /// + /// For example, on_exprs like `t1.a = t2.b AND t1.x = t2.y` will be stored + /// as a list of `(t1.a, t2.b), (t1.x, t2.y)` + fn coerce_joins(&mut self, plan: LogicalPlan) -> Result { + let LogicalPlan::Join(mut join) = plan else { + return Ok(plan); + }; + + join.on = join + .on + .into_iter() + .map(|(lhs, rhs)| { + // coerce the arguments as though they were a single binary equality + // expression + let (lhs, rhs) = self.coerce_binary_op(lhs, Operator::Eq, rhs)?; + Ok((lhs, rhs)) + }) + .collect::>>()?; + + Ok(LogicalPlan::Join(join)) + } + + fn coerce_binary_op( + &self, + left: Expr, + op: Operator, + right: Expr, + ) -> Result<(Expr, Expr)> { + let (left_type, right_type) = get_input_types( + &left.get_type(self.schema)?, + &op, + &right.get_type(self.schema)?, + )?; + Ok(( + left.cast_to(&left_type, self.schema)?, + right.cast_to(&right_type, self.schema)?, + )) + } +} + impl<'a> TreeNodeRewriter for TypeCoercionRewriter<'a> { type Node = Expr; @@ -131,14 +184,15 @@ impl<'a> TreeNodeRewriter for TypeCoercionRewriter<'a> { subquery, outer_ref_columns, }) => { - let new_plan = analyze_internal(self.schema, &subquery)?; + let new_plan = analyze_internal(self.schema, unwrap_arc(subquery))?.data; Ok(Transformed::yes(Expr::ScalarSubquery(Subquery { subquery: Arc::new(new_plan), outer_ref_columns, }))) } Expr::Exists(Exists { subquery, negated }) => { - let new_plan = analyze_internal(self.schema, &subquery.subquery)?; + let new_plan = + analyze_internal(self.schema, unwrap_arc(subquery.subquery))?.data; Ok(Transformed::yes(Expr::Exists(Exists { subquery: Subquery { subquery: Arc::new(new_plan), @@ -152,7 +206,8 @@ impl<'a> TreeNodeRewriter for TypeCoercionRewriter<'a> { subquery, negated, }) => { - let new_plan = analyze_internal(self.schema, &subquery.subquery)?; + let new_plan = + analyze_internal(self.schema, unwrap_arc(subquery.subquery))?.data; let expr_type = expr.get_type(self.schema)?; let subquery_type = new_plan.schema().field(0).data_type(); let common_type = comparison_coercion(&expr_type, subquery_type).ok_or(plan_datafusion_err!( @@ -221,15 +276,11 @@ impl<'a> TreeNodeRewriter for TypeCoercionRewriter<'a> { )))) } Expr::BinaryExpr(BinaryExpr { left, op, right }) => { - let (left_type, right_type) = get_input_types( - &left.get_type(self.schema)?, - &op, - &right.get_type(self.schema)?, - )?; + let (left, right) = self.coerce_binary_op(*left, op, *right)?; Ok(Transformed::yes(Expr::BinaryExpr(BinaryExpr::new( - Box::new(left.cast_to(&left_type, self.schema)?), + Box::new(left), op, - Box::new(right.cast_to(&right_type, self.schema)?), + Box::new(right), )))) } Expr::Between(Between { diff --git a/datafusion/optimizer/src/push_down_limit.rs b/datafusion/optimizer/src/push_down_limit.rs index 1af246fc556d..9190881335af 100644 --- a/datafusion/optimizer/src/push_down_limit.rs +++ b/datafusion/optimizer/src/push_down_limit.rs @@ -17,6 +17,7 @@ //! [`PushDownLimit`] pushes `LIMIT` earlier in the query plan +use std::cmp::min; use std::sync::Arc; use crate::optimizer::ApplyOrder; @@ -56,47 +57,12 @@ impl OptimizerRule for PushDownLimit { if let LogicalPlan::Limit(child) = &*limit.input { // Merge the Parent Limit and the Child Limit. - - // Case 0: Parent and Child are disjoint. (child_fetch <= skip) - // Before merging: - // |........skip........|---fetch-->| Parent Limit - // |...child_skip...|---child_fetch-->| Child Limit - // After merging: - // |.........(child_skip + skip).........| - // Before merging: - // |...skip...|------------fetch------------>| Parent Limit - // |...child_skip...|-------------child_fetch------------>| Child Limit - // After merging: - // |....(child_skip + skip)....|---(child_fetch - skip)-->| - - // Case 1: Parent is beyond the range of Child. (skip < child_fetch <= skip + fetch) - // Before merging: - // |...skip...|------------fetch------------>| Parent Limit - // |...child_skip...|-------------child_fetch------------>| Child Limit - // After merging: - // |....(child_skip + skip)....|---(child_fetch - skip)-->| - - // Case 2: Parent is in the range of Child. (skip + fetch < child_fetch) - // Before merging: - // |...skip...|---fetch-->| Parent Limit - // |...child_skip...|-------------child_fetch------------>| Child Limit - // After merging: - // |....(child_skip + skip)....|---fetch-->| - let parent_skip = limit.skip; - let new_fetch = match (limit.fetch, child.fetch) { - (Some(fetch), Some(child_fetch)) => { - Some(min(fetch, child_fetch.saturating_sub(parent_skip))) - } - (Some(fetch), None) => Some(fetch), - (None, Some(child_fetch)) => { - Some(child_fetch.saturating_sub(parent_skip)) - } - (None, None) => None, - }; + let (skip, fetch) = + combine_limit(limit.skip, limit.fetch, child.skip, child.fetch); let plan = LogicalPlan::Limit(Limit { - skip: child.skip + parent_skip, - fetch: new_fetch, + skip, + fetch, input: Arc::new((*child.input).clone()), }); return self @@ -217,6 +183,78 @@ impl OptimizerRule for PushDownLimit { } } +/// Combines two limits into a single +/// +/// Returns the combined limit `(skip, fetch)` +/// +/// # Case 0: Parent and Child are disjoint. (`child_fetch <= skip`) +/// +/// ```text +/// Before merging: +/// |........skip........|---fetch-->| Parent Limit +/// |...child_skip...|---child_fetch-->| Child Limit +/// ``` +/// +/// After merging: +/// ```text +/// |.........(child_skip + skip).........| +/// ``` +/// +/// Before merging: +/// ```text +/// |...skip...|------------fetch------------>| Parent Limit +/// |...child_skip...|-------------child_fetch------------>| Child Limit +/// ``` +/// +/// After merging: +/// ```text +/// |....(child_skip + skip)....|---(child_fetch - skip)-->| +/// ``` +/// +/// # Case 1: Parent is beyond the range of Child. (`skip < child_fetch <= skip + fetch`) +/// +/// Before merging: +/// ```text +/// |...skip...|------------fetch------------>| Parent Limit +/// |...child_skip...|-------------child_fetch------------>| Child Limit +/// ``` +/// +/// After merging: +/// ```text +/// |....(child_skip + skip)....|---(child_fetch - skip)-->| +/// ``` +/// +/// # Case 2: Parent is in the range of Child. (`skip + fetch < child_fetch`) +/// Before merging: +/// ```text +/// |...skip...|---fetch-->| Parent Limit +/// |...child_skip...|-------------child_fetch------------>| Child Limit +/// ``` +/// +/// After merging: +/// ```text +/// |....(child_skip + skip)....|---fetch-->| +/// ``` +fn combine_limit( + parent_skip: usize, + parent_fetch: Option, + child_skip: usize, + child_fetch: Option, +) -> (usize, Option) { + let combined_skip = child_skip.saturating_add(parent_skip); + + let combined_fetch = match (parent_fetch, child_fetch) { + (Some(parent_fetch), Some(child_fetch)) => { + Some(min(parent_fetch, child_fetch.saturating_sub(parent_skip))) + } + (Some(parent_fetch), None) => Some(parent_fetch), + (None, Some(child_fetch)) => Some(child_fetch.saturating_sub(parent_skip)), + (None, None) => None, + }; + + (combined_skip, combined_fetch) +} + fn push_down_join(join: &Join, limit: usize) -> Option { use JoinType::*; diff --git a/datafusion/optimizer/src/simplify_expressions/expr_simplifier.rs b/datafusion/optimizer/src/simplify_expressions/expr_simplifier.rs index f71516b8d71b..455d659fb25e 100644 --- a/datafusion/optimizer/src/simplify_expressions/expr_simplifier.rs +++ b/datafusion/optimizer/src/simplify_expressions/expr_simplifier.rs @@ -1759,7 +1759,7 @@ fn inlist_except(mut l1: InList, l2: InList) -> Result { mod tests { use datafusion_common::{assert_contains, DFSchemaRef, ToDFSchema}; use datafusion_expr::{ - function::{AggregateFunctionSimplification, GroupsAccumulatorSupportedArgs}, + function::{AccumulatorArgs, AggregateFunctionSimplification}, interval_arithmetic::Interval, *, }; @@ -3785,10 +3785,7 @@ mod tests { unimplemented!("not needed for tests") } - fn groups_accumulator_supported( - &self, - _args: GroupsAccumulatorSupportedArgs, - ) -> bool { + fn groups_accumulator_supported(&self, _args: AccumulatorArgs) -> bool { unimplemented!("not needed for testing") } diff --git a/datafusion/physical-expr-common/src/aggregate/mod.rs b/datafusion/physical-expr-common/src/aggregate/mod.rs index 827761e28569..7ffd4b5af10b 100644 --- a/datafusion/physical-expr-common/src/aggregate/mod.rs +++ b/datafusion/physical-expr-common/src/aggregate/mod.rs @@ -22,7 +22,7 @@ pub mod utils; use arrow::datatypes::{DataType, Field, Schema}; use datafusion_common::{not_impl_err, Result}; -use datafusion_expr::function::{GroupsAccumulatorSupportedArgs, StateFieldsArgs}; +use datafusion_expr::function::StateFieldsArgs; use datafusion_expr::type_coercion::aggregates::check_arg_count; use datafusion_expr::ReversedUDAF; use datafusion_expr::{ @@ -198,7 +198,7 @@ impl AggregateExpr for AggregateFunctionExpr { fn state_fields(&self) -> Result> { let args = StateFieldsArgs { - name: self.name(), + name: &self.name, input_type: &self.input_type, return_type: &self.data_type, ordering_fields: &self.ordering_fields, @@ -213,14 +213,15 @@ impl AggregateExpr for AggregateFunctionExpr { } fn create_accumulator(&self) -> Result> { - let acc_args = AccumulatorArgs::new( - &self.data_type, - &self.schema, - self.ignore_nulls, - &self.sort_exprs, - self.is_distinct, - &self.input_type, - ); + let acc_args = AccumulatorArgs { + data_type: &self.data_type, + schema: &self.schema, + ignore_nulls: self.ignore_nulls, + sort_exprs: &self.sort_exprs, + is_distinct: self.is_distinct, + input_type: &self.input_type, + args_num: self.args.len(), + }; self.fun.accumulator(acc_args) } @@ -285,11 +286,15 @@ impl AggregateExpr for AggregateFunctionExpr { } fn groups_accumulator_supported(&self) -> bool { - let args = GroupsAccumulatorSupportedArgs { - args_num: self.args.len(), + let args = AccumulatorArgs { + data_type: &self.data_type, + schema: &self.schema, + ignore_nulls: self.ignore_nulls, + sort_exprs: &self.sort_exprs, is_distinct: self.is_distinct, + input_type: &self.input_type, + args_num: self.args.len(), }; - self.fun.groups_accumulator_supported(args) } diff --git a/datafusion/physical-expr/src/aggregate/array_agg_distinct.rs b/datafusion/physical-expr/src/aggregate/array_agg_distinct.rs index b8671c39a943..244a44acdcb5 100644 --- a/datafusion/physical-expr/src/aggregate/array_agg_distinct.rs +++ b/datafusion/physical-expr/src/aggregate/array_agg_distinct.rs @@ -153,12 +153,11 @@ impl Accumulator for DistinctArrayAggAccumulator { return Ok(()); } - let array = &states[0]; - - assert_eq!(array.len(), 1, "state array should only include 1 row!"); - // Unwrap outer ListArray then do update batch - let inner_array = array.as_list::().value(0); - self.update_batch(&[inner_array]) + states[0] + .as_list::() + .iter() + .flatten() + .try_for_each(|val| self.update_batch(&[val])) } fn evaluate(&mut self) -> Result { diff --git a/datafusion/physical-plan/src/windows/mod.rs b/datafusion/physical-plan/src/windows/mod.rs index d1223f78808c..42c630741cc9 100644 --- a/datafusion/physical-plan/src/windows/mod.rs +++ b/datafusion/physical-plan/src/windows/mod.rs @@ -42,6 +42,7 @@ use datafusion_physical_expr::{ window::{BuiltInWindowFunctionExpr, SlidingAggregateWindowExpr}, AggregateExpr, EquivalenceProperties, LexOrdering, PhysicalSortRequirement, }; +use itertools::Itertools; mod bounded_window_agg_exec; mod window_agg_exec; @@ -52,6 +53,31 @@ pub use datafusion_physical_expr::window::{ }; pub use window_agg_exec::WindowAggExec; +/// Build field from window function and add it into schema +pub fn schema_add_window_field( + args: &[Arc], + schema: &Schema, + window_fn: &WindowFunctionDefinition, + fn_name: &str, +) -> Result> { + let data_types = args + .iter() + .map(|e| e.clone().as_ref().data_type(schema)) + .collect::>>()?; + let window_expr_return_type = window_fn.return_type(&data_types)?; + let mut window_fields = schema + .fields() + .iter() + .map(|f| f.as_ref().clone()) + .collect_vec(); + window_fields.extend_from_slice(&[Field::new( + fn_name, + window_expr_return_type, + false, + )]); + Ok(Arc::new(Schema::new(window_fields))) +} + /// Create a physical expression for window function #[allow(clippy::too_many_arguments)] pub fn create_window_expr( diff --git a/datafusion/proto/src/physical_plan/from_proto.rs b/datafusion/proto/src/physical_plan/from_proto.rs index c907e991fb86..a290f30586ce 100644 --- a/datafusion/proto/src/physical_plan/from_proto.rs +++ b/datafusion/proto/src/physical_plan/from_proto.rs @@ -40,7 +40,7 @@ use datafusion::physical_plan::expressions::{ in_list, BinaryExpr, CaseExpr, CastExpr, Column, IsNotNullExpr, IsNullExpr, LikeExpr, Literal, NegativeExpr, NotExpr, TryCastExpr, }; -use datafusion::physical_plan::windows::create_window_expr; +use datafusion::physical_plan::windows::{create_window_expr, schema_add_window_field}; use datafusion::physical_plan::{ ColumnStatistics, Partitioning, PhysicalExpr, Statistics, WindowExpr, }; @@ -155,14 +155,18 @@ pub fn parse_physical_window_expr( ) })?; + let fun: WindowFunctionDefinition = convert_required!(proto.window_function)?; + let name = proto.name.clone(); + let extended_schema = + schema_add_window_field(&window_node_expr, input_schema, &fun, &name)?; create_window_expr( - &convert_required!(proto.window_function)?, - proto.name.clone(), + &fun, + name, &window_node_expr, &partition_by, &order_by, Arc::new(window_frame), - input_schema, + &extended_schema, false, ) } diff --git a/datafusion/proto/tests/cases/roundtrip_physical_plan.rs b/datafusion/proto/tests/cases/roundtrip_physical_plan.rs index 30a28081edff..dd8e450d3165 100644 --- a/datafusion/proto/tests/cases/roundtrip_physical_plan.rs +++ b/datafusion/proto/tests/cases/roundtrip_physical_plan.rs @@ -253,8 +253,7 @@ fn roundtrip_nested_loop_join() -> Result<()> { fn roundtrip_window() -> Result<()> { let field_a = Field::new("a", DataType::Int64, false); let field_b = Field::new("b", DataType::Int64, false); - let field_c = Field::new("FIRST_VALUE(a) PARTITION BY [b] ORDER BY [a ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW", DataType::Int64, false); - let schema = Arc::new(Schema::new(vec![field_a, field_b, field_c])); + let schema = Arc::new(Schema::new(vec![field_a, field_b])); let window_frame = WindowFrame::new_bounds( datafusion_expr::WindowFrameUnits::Range, diff --git a/datafusion/sql/src/unparser/expr.rs b/datafusion/sql/src/unparser/expr.rs index 804fa6d306b4..23e3d9ab3594 100644 --- a/datafusion/sql/src/unparser/expr.rs +++ b/datafusion/sql/src/unparser/expr.rs @@ -391,7 +391,9 @@ impl Unparser<'_> { Expr::ScalarVariable(_, _) => { not_impl_err!("Unsupported Expr conversion: {expr:?}") } - Expr::IsNull(_) => not_impl_err!("Unsupported Expr conversion: {expr:?}"), + Expr::IsNull(expr) => { + Ok(ast::Expr::IsNull(Box::new(self.expr_to_sql(expr)?))) + } Expr::IsNotFalse(_) => not_impl_err!("Unsupported Expr conversion: {expr:?}"), Expr::GetIndexedField(_) => { not_impl_err!("Unsupported Expr conversion: {expr:?}") @@ -863,7 +865,7 @@ mod tests { use datafusion_expr::{ case, col, exists, expr::{AggregateFunction, AggregateFunctionDefinition}, - lit, not, not_exists, table_scan, wildcard, ColumnarValue, ScalarUDF, + lit, not, not_exists, table_scan, when, wildcard, ColumnarValue, ScalarUDF, ScalarUDFImpl, Signature, Volatility, WindowFrame, WindowFunctionDefinition, }; @@ -933,6 +935,14 @@ mod tests { .otherwise(lit(ScalarValue::Null))?, r#"CASE "a" WHEN 1 THEN true WHEN 0 THEN false ELSE NULL END"#, ), + ( + when(col("a").is_null(), lit(true)).otherwise(lit(false))?, + r#"CASE WHEN "a" IS NULL THEN true ELSE false END"#, + ), + ( + when(col("a").is_not_null(), lit(true)).otherwise(lit(false))?, + r#"CASE WHEN "a" IS NOT NULL THEN true ELSE false END"#, + ), ( Expr::Cast(Cast { expr: Box::new(col("a")), @@ -959,6 +969,18 @@ mod tests { ScalarUDF::new_from_impl(DummyUDF::new()).call(vec![col("a"), col("b")]), r#"dummy_udf("a", "b")"#, ), + ( + ScalarUDF::new_from_impl(DummyUDF::new()) + .call(vec![col("a"), col("b")]) + .is_null(), + r#"dummy_udf("a", "b") IS NULL"#, + ), + ( + ScalarUDF::new_from_impl(DummyUDF::new()) + .call(vec![col("a"), col("b")]) + .is_not_null(), + r#"dummy_udf("a", "b") IS NOT NULL"#, + ), ( Expr::Like(Like { negated: true, @@ -1081,6 +1103,7 @@ mod tests { r#"COUNT(*) OVER (ORDER BY "a" DESC NULLS FIRST RANGE BETWEEN 6 PRECEDING AND 2 FOLLOWING)"#, ), (col("a").is_not_null(), r#""a" IS NOT NULL"#), + (col("a").is_null(), r#""a" IS NULL"#), ( (col("a") + col("b")).gt(lit(4)).is_true(), r#"(("a" + "b") > 4) IS TRUE"#, diff --git a/datafusion/sqllogictest/test_files/aggregate.slt b/datafusion/sqllogictest/test_files/aggregate.slt index 40d66f9b52ce..78421d0b6431 100644 --- a/datafusion/sqllogictest/test_files/aggregate.slt +++ b/datafusion/sqllogictest/test_files/aggregate.slt @@ -198,6 +198,73 @@ statement error This feature is not implemented: LIMIT not supported in ARRAY_AG SELECT array_agg(c13 LIMIT 1) FROM aggregate_test_100 +# Test distinct aggregate function with merge batch +query II +with A as ( + select 1 as id, 2 as foo + UNION ALL + select 1, null + UNION ALL + select 1, null + UNION ALL + select 1, 3 + UNION ALL + select 1, 2 + ---- The order is non-deterministic, verify with length +) select array_length(array_agg(distinct a.foo)), sum(distinct 1) from A a group by a.id; +---- +3 1 + +# It has only AggregateExec with FinalPartitioned mode, so `merge_batch` is used +# If the plan is changed, whether the `merge_batch` is used should be verified to ensure the test coverage +query TT +explain with A as ( + select 1 as id, 2 as foo + UNION ALL + select 1, null + UNION ALL + select 1, null + UNION ALL + select 1, 3 + UNION ALL + select 1, 2 +) select array_length(array_agg(distinct a.foo)), sum(distinct 1) from A a group by a.id; +---- +logical_plan +01)Projection: array_length(ARRAY_AGG(DISTINCT a.foo)), SUM(DISTINCT Int64(1)) +02)--Aggregate: groupBy=[[a.id]], aggr=[[ARRAY_AGG(DISTINCT a.foo), SUM(DISTINCT Int64(1))]] +03)----SubqueryAlias: a +04)------SubqueryAlias: a +05)--------Union +06)----------Projection: Int64(1) AS id, Int64(2) AS foo +07)------------EmptyRelation +08)----------Projection: Int64(1) AS id, Int64(NULL) AS foo +09)------------EmptyRelation +10)----------Projection: Int64(1) AS id, Int64(NULL) AS foo +11)------------EmptyRelation +12)----------Projection: Int64(1) AS id, Int64(3) AS foo +13)------------EmptyRelation +14)----------Projection: Int64(1) AS id, Int64(2) AS foo +15)------------EmptyRelation +physical_plan +01)ProjectionExec: expr=[array_length(ARRAY_AGG(DISTINCT a.foo)@1) as array_length(ARRAY_AGG(DISTINCT a.foo)), SUM(DISTINCT Int64(1))@2 as SUM(DISTINCT Int64(1))] +02)--AggregateExec: mode=FinalPartitioned, gby=[id@0 as id], aggr=[ARRAY_AGG(DISTINCT a.foo), SUM(DISTINCT Int64(1))] +03)----CoalesceBatchesExec: target_batch_size=8192 +04)------RepartitionExec: partitioning=Hash([id@0], 4), input_partitions=5 +05)--------AggregateExec: mode=Partial, gby=[id@0 as id], aggr=[ARRAY_AGG(DISTINCT a.foo), SUM(DISTINCT Int64(1))] +06)----------UnionExec +07)------------ProjectionExec: expr=[1 as id, 2 as foo] +08)--------------PlaceholderRowExec +09)------------ProjectionExec: expr=[1 as id, NULL as foo] +10)--------------PlaceholderRowExec +11)------------ProjectionExec: expr=[1 as id, NULL as foo] +12)--------------PlaceholderRowExec +13)------------ProjectionExec: expr=[1 as id, 3 as foo] +14)--------------PlaceholderRowExec +15)------------ProjectionExec: expr=[1 as id, 2 as foo] +16)--------------PlaceholderRowExec + + # FIX: custom absolute values # csv_query_avg_multi_batch diff --git a/datafusion/sqllogictest/test_files/dates.slt b/datafusion/sqllogictest/test_files/dates.slt index 32c0bd14e7cc..e21637bd8913 100644 --- a/datafusion/sqllogictest/test_files/dates.slt +++ b/datafusion/sqllogictest/test_files/dates.slt @@ -224,5 +224,11 @@ SELECT to_date(t.ts, '%Y-%m-%d %H/%M/%S%#z', '%s', '%q', '%d-%m-%Y %H:%M:%S%#z', query error function unsupported data type at index 1: SELECT to_date(t.ts, make_array('%Y-%m-%d %H/%M/%S%#z', '%s', '%q', '%d-%m-%Y %H:%M:%S%#z', '%+')) from ts_utf8_data as t +# verify to_date with format +query D +select to_date('2022-01-23', '%Y-%m-%d'); +---- +2022-01-23 + statement ok drop table ts_utf8_data diff --git a/datafusion/sqllogictest/test_files/explain.slt b/datafusion/sqllogictest/test_files/explain.slt index 3a4ac747ebd6..92c537f975ad 100644 --- a/datafusion/sqllogictest/test_files/explain.slt +++ b/datafusion/sqllogictest/test_files/explain.slt @@ -252,9 +252,9 @@ physical_plan after OptimizeAggregateOrder SAME TEXT AS ABOVE physical_plan after ProjectionPushdown SAME TEXT AS ABOVE physical_plan after coalesce_batches SAME TEXT AS ABOVE physical_plan after OutputRequirements CsvExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/core/tests/data/example.csv]]}, projection=[a, b, c], has_header=true -physical_plan after PipelineChecker SAME TEXT AS ABOVE physical_plan after LimitAggregation SAME TEXT AS ABOVE physical_plan after ProjectionPushdown SAME TEXT AS ABOVE +physical_plan after PipelineChecker SAME TEXT AS ABOVE physical_plan CsvExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/core/tests/data/example.csv]]}, projection=[a, b, c], has_header=true physical_plan_with_stats CsvExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/core/tests/data/example.csv]]}, projection=[a, b, c], has_header=true, statistics=[Rows=Absent, Bytes=Absent, [(Col[0]:),(Col[1]:),(Col[2]:)]] @@ -311,9 +311,9 @@ physical_plan after coalesce_batches SAME TEXT AS ABOVE physical_plan after OutputRequirements 01)GlobalLimitExec: skip=0, fetch=10, statistics=[Rows=Exact(8), Bytes=Absent, [(Col[0]:),(Col[1]:),(Col[2]:),(Col[3]:),(Col[4]:),(Col[5]:),(Col[6]:),(Col[7]:),(Col[8]:),(Col[9]:),(Col[10]:)]] 02)--ParquetExec: file_groups={1 group: [[WORKSPACE_ROOT/parquet-testing/data/alltypes_plain.parquet]]}, projection=[id, bool_col, tinyint_col, smallint_col, int_col, bigint_col, float_col, double_col, date_string_col, string_col, timestamp_col], limit=10, statistics=[Rows=Exact(8), Bytes=Absent, [(Col[0]:),(Col[1]:),(Col[2]:),(Col[3]:),(Col[4]:),(Col[5]:),(Col[6]:),(Col[7]:),(Col[8]:),(Col[9]:),(Col[10]:)]] -physical_plan after PipelineChecker SAME TEXT AS ABOVE physical_plan after LimitAggregation SAME TEXT AS ABOVE physical_plan after ProjectionPushdown SAME TEXT AS ABOVE +physical_plan after PipelineChecker SAME TEXT AS ABOVE physical_plan 01)GlobalLimitExec: skip=0, fetch=10, statistics=[Rows=Exact(8), Bytes=Absent, [(Col[0]:),(Col[1]:),(Col[2]:),(Col[3]:),(Col[4]:),(Col[5]:),(Col[6]:),(Col[7]:),(Col[8]:),(Col[9]:),(Col[10]:)]] 02)--ParquetExec: file_groups={1 group: [[WORKSPACE_ROOT/parquet-testing/data/alltypes_plain.parquet]]}, projection=[id, bool_col, tinyint_col, smallint_col, int_col, bigint_col, float_col, double_col, date_string_col, string_col, timestamp_col], limit=10, statistics=[Rows=Exact(8), Bytes=Absent, [(Col[0]:),(Col[1]:),(Col[2]:),(Col[3]:),(Col[4]:),(Col[5]:),(Col[6]:),(Col[7]:),(Col[8]:),(Col[9]:),(Col[10]:)]] @@ -348,9 +348,9 @@ physical_plan after coalesce_batches SAME TEXT AS ABOVE physical_plan after OutputRequirements 01)GlobalLimitExec: skip=0, fetch=10 02)--ParquetExec: file_groups={1 group: [[WORKSPACE_ROOT/parquet-testing/data/alltypes_plain.parquet]]}, projection=[id, bool_col, tinyint_col, smallint_col, int_col, bigint_col, float_col, double_col, date_string_col, string_col, timestamp_col], limit=10 -physical_plan after PipelineChecker SAME TEXT AS ABOVE physical_plan after LimitAggregation SAME TEXT AS ABOVE physical_plan after ProjectionPushdown SAME TEXT AS ABOVE +physical_plan after PipelineChecker SAME TEXT AS ABOVE physical_plan 01)GlobalLimitExec: skip=0, fetch=10 02)--ParquetExec: file_groups={1 group: [[WORKSPACE_ROOT/parquet-testing/data/alltypes_plain.parquet]]}, projection=[id, bool_col, tinyint_col, smallint_col, int_col, bigint_col, float_col, double_col, date_string_col, string_col, timestamp_col], limit=10 diff --git a/datafusion/sqllogictest/test_files/timestamps.slt b/datafusion/sqllogictest/test_files/timestamps.slt index 13fb8fba0d31..5f75bca4f0fa 100644 --- a/datafusion/sqllogictest/test_files/timestamps.slt +++ b/datafusion/sqllogictest/test_files/timestamps.slt @@ -2795,3 +2795,9 @@ SELECT '2000-12-01 04:04:12' AT TIME ZONE 'America/New York'; # abbreviated timezone is not supported statement error SELECT '2023-03-12 02:00:00' AT TIME ZONE 'EDT'; + +# Test current_time without parentheses +query B +select current_time = current_time; +---- +true diff --git a/datafusion/substrait/Cargo.toml b/datafusion/substrait/Cargo.toml index dce8ce10b587..e4be6e68ff16 100644 --- a/datafusion/substrait/Cargo.toml +++ b/datafusion/substrait/Cargo.toml @@ -39,7 +39,7 @@ itertools = { workspace = true } object_store = { workspace = true } prost = "0.12" prost-types = "0.12" -substrait = "0.32.0" +substrait = "0.33.3" [dev-dependencies] tokio = { workspace = true }