Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Error when serializing physical window functions to proto #13401

Closed
mwylde opened this issue Nov 14, 2024 · 5 comments · Fixed by #13421
Closed

Error when serializing physical window functions to proto #13401

mwylde opened this issue Nov 14, 2024 · 5 comments · Fixed by #13421
Assignees
Labels
bug Something isn't working

Comments

@mwylde
Copy link

mwylde commented Nov 14, 2024

Describe the bug

Recently, window functions were migrated to UDFs (as part of #8709). This appears to have broken protobuf serialization of some types of window functions (specifically, those that get planned into BuiltInWindowExpr), producing an error like

BuiltIn function not supported: WindowUDFExpr { fun: WindowUDF { inner: RowNumber { signature: Signature { type_signature: Any(0), volatility: Immutable } } }, args: [], name: "row_number() PARTITION BY [window] ORDER BY [count DESC NULLS FIRST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW", input_types: [], is_reversed: false, ignore_nulls: false }

An example query of this is

CREATE TABLE test (
    a BIGINT,
    b BIGINT
);

SELECT
    a,
    ROW_NUMBER() OVER (PARTITION BY a ORDER BY b DESC) AS row_num
FROM
    test;

which produces the logical plan

...
                window_expr: [
                    WindowFunction(
                        WindowFunction {
                            fun: WindowUDF(WindowUDF { inner: RowNumber }),
                            args: [],
                            partition_by: [Column(Column { relation: Some(Bare { table: "test" }), name: "a" })],
                            order_by: [
                                Sort { expr: Column(Column { relation: Some(Bare { table: "test" }), name: "b" }), asc: false, nulls_first: true }
                            ],
                            window_frame: WindowFrame {
                                units: Range,
                                start_bound: Preceding(NULL),
                                end_bound: CurrentRow
                            }
                        }
                    )
                ],
...

and the physical plan

    input: BoundedWindowAggExec {
        input: SortExec {
            input: MemoryExec { ...  },
            expr: LexOrdering {...},
        window_expr: [
            BuiltInWindowExpr {
                expr: WindowUDFExpr {
                    fun: WindowUDF { inner: RowNumber },
                    args: [],
                    name: "row_number() PARTITION BY [test.a] ORDER BY [test.b DESC NULLS FIRST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW",
                    partition_by: [Column { name: "a", index: 0 }],
                    order_by: LexOrdering {
                        inner: [
                            PhysicalSortExpr { expr: Column { name: "b", index: 1 }, options: SortOptions { descending: true, nulls_first: true } }
                        ]
                    },
                    window_frame: WindowFrame {
                        units: Range,
                        start_bound: Preceding(Int64(NULL)),
                        end_bound: CurrentRow
                    }
                }
            }
        ],
        schema: Schema { ...  },
        partition_keys: [Column { name: "a", index: 0 }]
    }

The error comes from here:

return not_impl_err!("WindowExpr not supported: {window_expr:?}");

As we can see, there is no logic handling the BuiltInWindowExpr case

To Reproduce

Here's a test case that demonstrates the issue:

#[test]
fn roundtrip_built_in_window() -> Result<()> {
    let field_a = Field::new("a", DataType::Int64, false);
    let field_b = Field::new("b", DataType::Int64, false);
    let schema = Arc::new(Schema::new(vec![field_a, field_b]));

    let udf = Arc::new(WindowUDF::new_from_impl(RowNumber::new()));

    let built_in_window_expr = Arc::new(BuiltInWindowExpr::new(
        create_udwf_window_expr(
            &udf,
            &[],
            &*schema,
            "row_number() PARTITION BY [a] ORDER BY [b] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW".to_string(),
            false,
        )?,
        &[
            col("a", &*schema)?
        ],
        &LexOrdering::new(vec![
            PhysicalSortExpr::new(col("b", &*schema)?, SortOptions::new(true, true)),
        ]),
        Arc::new(WindowFrame::new(None)),
    ));

    let input = Arc::new(EmptyExec::new(schema.clone()));

    roundtrip_test(Arc::new(BoundedWindowAggExec::try_new(
        vec![built_in_window_expr],
        input,
        vec![col("a", &schema)?],
    InputOrderMode::Sorted)?))
}

Expected behavior

We should be able to serialize window functions to protobuf.

Additional context

No response

@mwylde mwylde added the bug Something isn't working label Nov 14, 2024
@alamb
Copy link
Contributor

alamb commented Nov 14, 2024

Yes, this seems like a bug @mwylde

I don't know why this stopped working or why it isn't covered by tests.

Note that just yesterday @buraksenn and @jcsherin completed migrating the last of the window functions in #13201 and we are about to remove BuiltInWindowFunction entirely (in other words I suspect the bug about serializing window functions doesn't exist on main)

If you would like to release this fix in a 43.1.0 release, we can make a branch-43 to start preparing

@buraksenn
Copy link
Contributor

@alamb in the PR we had this discussion #13201 (comment). There was a problem with physical plan roundtrip but I thought since slt tests about windows passed there were no issues.

@jcsherin
Copy link
Contributor

take

@mwylde
Copy link
Author

mwylde commented Nov 14, 2024

Note that just yesterday @buraksenn and @jcsherin completed migrating the last of the window functions in #13201 and we are about to remove BuiltInWindowFunction entirely (in other words I suspect the bug about serializing window functions doesn't exist on main)

Unfortunately this does still exist on main, because BuiltInWindowExpr still exists and still gets planned for queries like the one in the issue description. I think there may be some confusion (at least there is for me!) because of the name BuiltInWindowExpr. From my reading of the physical planning code, all LogicalPlan::Window nodes produce BuiltInWindowExpr, from this call:

But despite the name, BuiltInWindowExpr actually now wraps a UDWF.

@jcsherin
Copy link
Contributor

But despite the name, BuiltInWindowExpr actually now wraps a UDWF.

Yes, this is indeed the case.

None of the user-defined window functions are currently serialized. We missed this during conversion.

I expect serialization of user-defined window functions to be simpler compared to built-in window functions. In built-in window serialization depended on the internals. You can see that in the serialization code here:

} else if let Some(window_shift_expr) =
built_in_fn_expr.downcast_ref::<WindowShift>()
{
args.insert(
1,
Arc::new(Literal::new(datafusion_common::ScalarValue::Int64(Some(
window_shift_expr.get_shift_offset(),
)))),
);
args.insert(
2,
Arc::new(Literal::new(window_shift_expr.get_default_value())),
);
if window_shift_expr.get_shift_offset() >= 0 {
protobuf::BuiltInWindowFunction::Lag
} else {
protobuf::BuiltInWindowFunction::Lead
}

We need to also extend PhysicalExtensionCodec for user-defined window function.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
bug Something isn't working
Projects
None yet
Development

Successfully merging a pull request may close this issue.

4 participants