From be02f030e71708e979dc9f80df046836622c3562 Mon Sep 17 00:00:00 2001 From: berkaysynnada Date: Mon, 11 Nov 2024 10:29:04 +0300 Subject: [PATCH] Update roundtrip_physical_plan.rs --- .../tests/cases/roundtrip_physical_plan.rs | 53 ++++++++++--------- 1 file changed, 28 insertions(+), 25 deletions(-) diff --git a/datafusion/proto/tests/cases/roundtrip_physical_plan.rs b/datafusion/proto/tests/cases/roundtrip_physical_plan.rs index 88939b5bccf4..5cea26356e4b 100644 --- a/datafusion/proto/tests/cases/roundtrip_physical_plan.rs +++ b/datafusion/proto/tests/cases/roundtrip_physical_plan.rs @@ -24,7 +24,7 @@ use std::vec; use arrow::array::RecordBatch; use arrow::csv::WriterBuilder; use arrow::datatypes::{Fields, TimeUnit}; -use datafusion::physical_expr::aggregate::AggregateExprBuilder; +use datafusion::physical_expr::aggregate::{AggregateExprBuilder, AggregateFunctionExpr}; use datafusion::physical_plan::coalesce_batches::CoalesceBatchesExec; use datafusion_expr::dml::InsertOp; use datafusion_functions_aggregate::approx_percentile_cont::approx_percentile_cont_udaf; @@ -269,34 +269,33 @@ fn roundtrip_window() -> Result<()> { let field_b = Field::new("b", DataType::Int64, false); let schema = Arc::new(Schema::new(vec![field_a, field_b])); - let _window_frame = WindowFrame::new_bounds( + let window_frame = WindowFrame::new_bounds( datafusion_expr::WindowFrameUnits::Range, WindowFrameBound::Preceding(ScalarValue::Int64(None)), WindowFrameBound::CurrentRow, ); - // let udwf_window_expr = Expr::WindowFunction(WindowFunction::new( - // WindowFunctionDefinition::WindowUDF(first_value_udwf()), - - // let builtin_window_expr = Arc::new(BuiltInWindowExpr::new( - // Arc::new(NthValue::first( - // "FIRST_VALUE(a) PARTITION BY [b] ORDER BY [a ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW", - // col("a", &schema)?, - // DataType::Int64, - // false, - // )), - // &[col("b", &schema)?], - // &LexOrdering{ - // inner: vec![PhysicalSortExpr { - // expr: col("a", &schema)?, - // options: SortOptions { - // descending: false, - // nulls_first: false, - // }, - // }] - // }, - // Arc::new(window_frame), - // )); + let args = vec![cast(col("a", &schema)?, &schema, DataType::Int64)?]; + let nth_value_expr = AggregateExprBuilder::new(nth_value_udaf(), args) + .order_by(LexOrdering { + inner: vec![PhysicalSortExpr { + expr: col("a", &schema)?, + options: SortOptions { + descending: false, + nulls_first: false, + }, + }], + }) + .schema(Arc::clone(&schema)) + .alias("FIRST_VALUE(a) PARTITION BY [b] ORDER BY [a ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW") + .build() + .map(Arc::new)?; + let sliding_aggr_window_nth_value = Arc::new(SlidingAggregateWindowExpr::new( + nth_value_expr, + &[col("b", &schema)?], + &LexOrdering::default(), + Arc::new(window_frame), + )); let plain_aggr_window_expr = Arc::new(PlainAggregateWindowExpr::new( AggregateExprBuilder::new( @@ -335,7 +334,11 @@ fn roundtrip_window() -> Result<()> { let input = Arc::new(EmptyExec::new(schema.clone())); roundtrip_test(Arc::new(WindowAggExec::try_new( - vec![plain_aggr_window_expr, sliding_aggr_window_expr], + vec![ + plain_aggr_window_expr, + sliding_aggr_window_expr, + sliding_aggr_window_nth_value, + ], input, vec![col("b", &schema)?], )?))