Skip to content

Commit

Permalink
Update roundtrip_physical_plan.rs
Browse files Browse the repository at this point in the history
  • Loading branch information
berkaysynnada committed Nov 11, 2024
1 parent aa18c1d commit be02f03
Showing 1 changed file with 28 additions and 25 deletions.
53 changes: 28 additions & 25 deletions datafusion/proto/tests/cases/roundtrip_physical_plan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ use std::vec;
use arrow::array::RecordBatch;
use arrow::csv::WriterBuilder;
use arrow::datatypes::{Fields, TimeUnit};
use datafusion::physical_expr::aggregate::AggregateExprBuilder;
use datafusion::physical_expr::aggregate::{AggregateExprBuilder, AggregateFunctionExpr};
use datafusion::physical_plan::coalesce_batches::CoalesceBatchesExec;
use datafusion_expr::dml::InsertOp;
use datafusion_functions_aggregate::approx_percentile_cont::approx_percentile_cont_udaf;
Expand Down Expand Up @@ -269,34 +269,33 @@ fn roundtrip_window() -> Result<()> {
let field_b = Field::new("b", DataType::Int64, false);
let schema = Arc::new(Schema::new(vec![field_a, field_b]));

let _window_frame = WindowFrame::new_bounds(
let window_frame = WindowFrame::new_bounds(
datafusion_expr::WindowFrameUnits::Range,
WindowFrameBound::Preceding(ScalarValue::Int64(None)),
WindowFrameBound::CurrentRow,
);

// let udwf_window_expr = Expr::WindowFunction(WindowFunction::new(
// WindowFunctionDefinition::WindowUDF(first_value_udwf()),

// let builtin_window_expr = Arc::new(BuiltInWindowExpr::new(
// Arc::new(NthValue::first(
// "FIRST_VALUE(a) PARTITION BY [b] ORDER BY [a ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW",
// col("a", &schema)?,
// DataType::Int64,
// false,
// )),
// &[col("b", &schema)?],
// &LexOrdering{
// inner: vec![PhysicalSortExpr {
// expr: col("a", &schema)?,
// options: SortOptions {
// descending: false,
// nulls_first: false,
// },
// }]
// },
// Arc::new(window_frame),
// ));
let args = vec![cast(col("a", &schema)?, &schema, DataType::Int64)?];
let nth_value_expr = AggregateExprBuilder::new(nth_value_udaf(), args)
.order_by(LexOrdering {
inner: vec![PhysicalSortExpr {
expr: col("a", &schema)?,
options: SortOptions {
descending: false,
nulls_first: false,
},
}],
})
.schema(Arc::clone(&schema))
.alias("FIRST_VALUE(a) PARTITION BY [b] ORDER BY [a ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW")
.build()
.map(Arc::new)?;
let sliding_aggr_window_nth_value = Arc::new(SlidingAggregateWindowExpr::new(
nth_value_expr,
&[col("b", &schema)?],
&LexOrdering::default(),
Arc::new(window_frame),
));

let plain_aggr_window_expr = Arc::new(PlainAggregateWindowExpr::new(
AggregateExprBuilder::new(
Expand Down Expand Up @@ -335,7 +334,11 @@ fn roundtrip_window() -> Result<()> {
let input = Arc::new(EmptyExec::new(schema.clone()));

roundtrip_test(Arc::new(WindowAggExec::try_new(
vec![plain_aggr_window_expr, sliding_aggr_window_expr],
vec![
plain_aggr_window_expr,
sliding_aggr_window_expr,
sliding_aggr_window_nth_value,
],
input,
vec![col("b", &schema)?],
)?))
Expand Down

0 comments on commit be02f03

Please sign in to comment.