Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Add datafusion-spark crate #14392

Open
wants to merge 69 commits into
base: main
Choose a base branch
from

Conversation

andygrove
Copy link
Member

@andygrove andygrove commented Jan 31, 2025

Which issue does this PR close?

Closes #5600

Rationale for this change

See discussion in #5600

TL;DR Many projects want Spark-compatible expressions for use with DataFusion. We have some in Comet and there are also some in the Sail project. Let's move Comet's expressions into the core repo and collaborate there going forward.

What changes are included in this PR?

  • Add copy of datafusion-comet-spark-expr crate with history
  • Rename to datafusion-spark
  • Add to workspace
  • Pass CI checks

Are these changes tested?

Are there any user-facing changes?

andygrove and others added 30 commits July 10, 2024 13:32
…-compatible DataFusion expressions (apache#638)

* convert into workspace project

* update GitHub actions

* update Makefile

* fix regression

* update target path

* update protobuf path in pom.xml

* update more paths

* add new datafusion-comet-expr crate

* rename CometAbsFunc to Abs and add documentation

* fix error message

* improve error handling

* update crate description

* remove unused dep

* address feedback

* finish renaming crate

* update README for datafusion-spark-expr

* rename crate to datafusion-comet-spark-expr
* refactor in preparation for moving cast to spark-expr crate

* errors

* move cast to spark-expr crate

* machete

* refactor errors

* clean up imports
…che#660)

* Move temporal expressions to spark-expr crate

* reduce public api

* reduce public api

* update imports in benchmarks

* fmt

* remove unused dep
…pache#627)

* dedup code

* transforming the dict directly

* code optimization for cast string to timestamp

* minor optimizations

* fmt fixes and casting to dict array without unpacking to array first

* bug fixes

* revert unrelated change

* Added test case and code refactor

* minor optimization

* minor optimization again

* convert the cast to array

* Revert "convert the cast to array"

This reverts commit 9270aedeafa12dacabc664ca9df7c85236e05d85.

* bug fixes

* rename the test to cast_dict_to_timestamp arr
* chore: Make rust clippy happy

Signed-off-by: Xuanwo <[email protected]>

* Format code

Signed-off-by: Xuanwo <[email protected]>

---------

Signed-off-by: Xuanwo <[email protected]>
* Unify IF and CASE expressions

* revert test changes

* fix
…apache#741)

## Which issue does this PR close?

Part of apache#670

## Rationale for this change

This PR improves the native execution performance on decimals with a small precision

## What changes are included in this PR?

This PR changes not to promote decimal128 to decimal256 if the precisions are small enough

## How are these changes tested?

Existing tests
* Add GetStructField support

* Add custom types to CometBatchScanExec

* Remove test explain

* Rust fmt

* Fix struct type support checks

* Support converting StructArray to native

* fix style

* Attempt to fix scalar subquery issue

* Fix other unit test

* Cleanup

* Default query plan supporting complex type to false

* Migrate struct expressions to spark-expr

* Update shouldApplyRowToColumnar comment

* Add nulls to test

* Rename to allowStruct

* Add DataTypeSupport trait

* Fix parquet datatype test
## Which issue does this PR close?

## Rationale for this change

This PR improves read_side_padding that is used for CHAR() schema

## What changes are included in this PR?

Optimized spark_read_side_padding

## How are these changes tested?

Added tests
* feat: Optimze CreateNamedStruct preserve dictionaries

Instead of serializing the return data_type we just serialize the field
names. The original implmentation was done as it lead to slightly
simpler implementation, but it clear from apache#750 that this was the wrong
choice and leads to issues with the physical data_type.

* Support dictionary data_types in StructVector and MapVector

* Add length checks
…crates (apache#859)

* feat: Enable `clippy::clone_on_ref_ptr` on `proto` and `spark_exprs` crates
…he#870)

* basic version of string to float/double/decimal

* docs

* update benches

* update benches

* rust doc
* add skeleton for StructsToJson

* first test passes

* add support for nested structs

* add support for strings and improve test

* clippy

* format

* prepare for review

* remove perf results

* update user guide

* add microbenchmark

* remove comment

* update docs

* reduce size of diff

* add failing test for quotes in field names and values

* test passes

* clippy

* revert a docs change

* Update native/spark-expr/src/to_json.rs

Co-authored-by: Emil Ejbyfeldt <[email protected]>

* address feedback

* support tabs

* newlines

* backspace

* clippy

* fix test regression

* cargo fmt

---------

Co-authored-by: Emil Ejbyfeldt <[email protected]>
* Init

* test

* test

* test

* Use specified commit to test

* Fix format

* fix clippy

* fix

* fix

* Fix

* Change to SQL syntax

* Disable SMJ LeftAnti with join filter

* Fix

* Add test

* Add test

* Update to last DataFusion commit

* fix format

* fix

* Update diffs
* update dependency version

* update avg

* update avg_decimal

* update sum_decimal

* variance

* stddev

* covariance

* correlation

* save progress

* code compiles

* clippy

* remove duplicate of down_cast_any_ref function

* remove duplicate of down_cast_any_ref function

* machete

* bump DF version again and use StatsType from DataFusion

* implement groups accumulator for stddev and variance

* refactor

* fmt

* revert group accumulator
* date_add test case.

* Add DateAdd to proto and QueryPlanSerde. Next up is the native side.

* Add DateAdd in planner.rs that generates a Literal for right child. Need to confirm if any other type of expression can occur here.

* Minor refactor.

* Change test predicate to actually select some rows.

* Switch to scalar UDF implementation for date_add.

* Docs and minor refactor.

* Add a new test to explicitly cover array scenario.

* cargo clippy fixes

* Fix Scala 2.13.

* New approved plans for q72 due to date_add.

* Address first round of feedback.

* Add date_sub and tests.

* Fix error message to be more general.

* Update error message for Spark 4.0+

* Support Int8 and Int16 for days.
* Start working on GetArrayStructFIelds

* Almost have working

* Working

* Add another test

* Remove unused

* Remove unused sql conf
## Which issue does this PR close?

## Rationale for this change

Arrow-rs 53.1.0 includes performance improvements 

## What changes are included in this PR?

Bumping arrow-rs to 53.1.0 and datafusion to a revision

## How are these changes tested?

existing tests
@comphead
Copy link
Contributor

I'm wondering how the user should choose what function to use. For instance to_timestamp function which may behave differently between non spark and spark env.

So the developer implements spark compliant to_timestamp function in the crate but when he is running the query
select to_timestamp() from t1 what exactly implementation will be picked up? Like we should introduce something like FUNCTION CATALOG or something similar to switch between implementations?

@andygrove andygrove marked this pull request as ready for review January 31, 2025 20:38
@andygrove andygrove requested review from alamb and viirya January 31, 2025 20:44
@andygrove
Copy link
Member Author

I'm wondering how the user should choose what function to use. For instance to_timestamp function which may behave differently between non spark and spark env.

In the short term, the main goal is to allow downstream projects to add these functions to a DataFusion context, but users could manually configure their DataFusion context to choose which functions to use.

Cargo.toml Outdated
Comment on lines 33 to 37
"datafusion/functions-table",
"datafusion/functions-nested",
"datafusion/functions-spark",
"datafusion/functions-window",
"datafusion/functions-window-common",
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm, it is good to add this crate. But I'm wondering that our other function crates like functions-nested should also follow Spark behavior, right? So it sounds like all these functions should be in datafusion-functions-spark?

Copy link
Contributor

@alamb alamb Jan 31, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🤔 you are right that it is kind of strange to have all spark functions in a single crate when we have the other functions in more fine grained crates

Maybe we should make one crate like datafusion_spark that holds all the different types of functions:

i    "datafusion/functions-spark",
+    "datafusion/spark",

And then over time we can add things like datafusion_spark_functions_nested in datafusion/spark/functions-nested 🤔

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks @viirya. That is a good point. I renamed functions-spark to spark for now.

@alamb
Copy link
Contributor

alamb commented Jan 31, 2025

I'm wondering how the user should choose what function to use. For instance to_timestamp function which may behave differently between non spark and spark env.

So the developer implements spark compliant to_timestamp function in the crate but when he is running the query select to_timestamp() from t1 what exactly implementation will be picked up? Like we should introduce something like FUNCTION CATALOG or something similar to switch between implementations?

What I was imagining was that just like today DataFusion comes with some set if "included" functions (mostly following the postgres semantics). Nothing new is added to the core

For users that want to use the spark functions, I am hoping we have something like

use datafusion_functions_spark::install_spark;
...
// by default this SessionContext has all the same functions as it does today
let ctx = SessionContext::new();
// for users who want spark compatible functions, install the 
// spark compatibility functions, potentially overriding
// some/all "built in functions", rewrites, etc
install_spark(&mut ctx)?;
// running queries now use spark compatible syntax
ctx.sql(...)

@andygrove andygrove changed the title feat: Add datafusion-functions-spark crate feat: Add datafusion-spark crate Jan 31, 2025

/// AVG aggregate expression
#[derive(Debug, Clone)]
pub struct Avg {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

not needed for this PR but I am intrested why there is a separate AVG implementation

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

One difference is that this version uses an i64 for the count in the accumulator rather than a u64 (which makes sense because Java does not have unsigned ints).

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Of course, we should really have some documentation for these expressions explaining what the differences are.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Subtle differences (e.g., i32 instead of i64, nullableinstead of non-nullable, microseconds instead of nanoseconds, etc.) between the DataFusion "default" implementation and the Spark implementation are going to be a common theme with many of these UDFs.

@andygrove @alamb This is where having something like #14296 (comment) would be useful. Otherwise we will always have to keep track of any changes made to the original DataFusion "default" implementation.

Copy link
Contributor

@alamb alamb left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As I have mentioned previously I think this is a great idea.

In order to really move it into DataFusion though I think we should:

  1. Have tests of this code in DataFusion (not via spark, but maybe via sqllogictests)
  2. Have some way to register these functions so other users can use them

But in general the idea is 😍 to me

cc @Blizzara and @shehabgamin as you have mentioned interest in helping here

use std::fmt::Debug;
use std::sync::Arc;

macro_rules! make_comet_scalar_udf {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we would probably want to rename this eventually so it didn't refer to comet

@shehabgamin
Copy link
Contributor

cc @Blizzara and @shehabgamin as you have mentioned interest in helping here

Will catch up on this thread tonight. So much happening so fast, exciting!

Copy link
Contributor

@shehabgamin shehabgamin left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Function Names

I'm wondering how the user should choose what function to use. For instance to_timestamp function which may behave differently between non spark and spark env.

In the short term, the main goal is to allow downstream projects to add these functions to a DataFusion context, but users could manually configure their DataFusion context to choose which functions to use.

What I was imagining was that just like today DataFusion comes with some set if "included" functions (mostly following the postgres semantics). Nothing new is added to the core

For users that want to use the spark functions, I am hoping we have something like

use datafusion_functions_spark::install_spark;
...
// by default this SessionContext has all the same functions as it does today
let ctx = SessionContext::new();
// for users who want spark compatible functions, install the 
// spark compatibility functions, potentially overriding
// some/all "built in functions", rewrites, etc
install_spark(&mut ctx)?;
// running queries now use spark compatible syntax
ctx.sql(...)

@comphead @andygrove @alamb It may be a good idea to prefix all function names with spark_ to avoid confusion, conflicts, or unknown behavior between functions that share the same name.

Subtle UDF Differences

See comment here: #14392 (comment)

Testing

In order to really move it into DataFusion though I think we should:

1. Have tests of this code in DataFusion (not via spark, but maybe via sqllogictests)

All functions that Sail ports over can be tested without the JVM or Spark Client. Because there is no JVM involvement when running a Sail server, it would be a relatively straightforward process to port function tests from Sail upstream.

1. Gold Data Tests -> SLT

Gold Data tests are SQL queries; functions get tested using Scalar input. We should be able to port over the vast majority of Gold Data tests for functions upstream to the SLTs.
The only caveat here is that Spark’s SQL semantics (Hive dialect) are different from DataFusion’s SQL semantics.
The implication of this is that some Gold Data tests can’t be ported upstream. However, I expect that the number of tests we can’t port over will be very small. The vast majority of Gold Data tests specifically for functions are very basic queries (e.g., SELECT ascii(2)).

2. Spark Functions Doc Tests -> SLT

These tests use the PySpark client (no JVM is required), and functions generally get tested using Array input (e.g., by calling a function with some DataFrame column as input). We should be able to translate almost all of these tests (if not all) to SQL.

Code Organization, Sail Contributions, etc.

It probably makes the most sense to merge this PR first before diving into these topics.

@jayzhan211
Copy link
Contributor

jayzhan211 commented Feb 2, 2025

It may be a good idea to prefix all function names with spark_ to avoid confusion, conflicts, or unknown behavior between functions that share the same name.

In optimizer, we rely on the name to do such optimization so if we rename it to name like 'spark_count' we might need to add the spark name to those optimize rules as well, which increase the maintainence cost. If we assume the datafusion native function and spark function is mutually exclusive (I guess we do so) then having consistent name for optimizer is preferred choice.

Have some way to register these functions so other users can use them

It will be cool if we can switch between spark and native function by the configuration (override functions in register if we switch the flag). It will be useful for testing in sqllogictest.

@shehabgamin
Copy link
Contributor

In optimizer, we rely on the name to do such optimization so if we rename it to name like 'spark_count' we might need to add the spark name to those optimize rules as well, which increase the maintainence cost. If we assume the datafusion native function and spark function is mutually exclusive (I guess we do so) then having consistent name for optimizer is preferred choice.

I'm glad you brought this up, @jayzhan211.

Some Spark functions behave identically to DataFusion functions but have different names. For example:

  • Spark’s startswith(str, substr) corresponds to DataFusion’s expr_fn::starts_with(str, substr)

There are also cases where functions take input arguments in a different order. For example:

  • Spark’s position(substr, str) corresponds to DataFusion’s expr_fn::strpos(str, substr)

@jayzhan211
Copy link
Contributor

jayzhan211 commented Feb 2, 2025

In optimizer, we rely on the name to do such optimization so if we rename it to name like 'spark_count' we might need to add the spark name to those optimize rules as well, which increase the maintainence cost. If we assume the datafusion native function and spark function is mutually exclusive (I guess we do so) then having consistent name for optimizer is preferred choice.

I'm glad you brought this up, @jayzhan211.

Some Spark functions behave identically to DataFusion functions but have different names. For example:

  • Spark’s startswith(str, substr) corresponds to DataFusion’s expr_fn::starts_with(str, substr)

There are also cases where functions take input arguments in a different order. For example:

  • Spark’s position(substr, str) corresponds to DataFusion’s expr_fn::strpos(str, substr)

If the behavior is the same we don't have any reason to copy one to spark crate, adding alias to the function is enough.

If the diff between the native and spark one is minor we can also add another flag to switch code logic instead of maintaining a copy in another crate so that the maintenance cost could be minimized.

@shehabgamin
Copy link
Contributor

If the behavior is the same we don't have any reason to copy one to spark crate, adding alias to the function is enough.

@jayzhan211 I’ll need to verify this, but I recall encountering a case in DataFusion where some two function names, A and B, are mapped inversely in Spark. Specifically, some Spark's B corresponds to some DataFusion's A, and some Spark's A corresponds to some DataFusion's B.

@andygrove
Copy link
Member Author

@comphead @andygrove @alamb It may be a good idea to prefix all function names with spark_ to avoid confusion, conflicts, or unknown behavior between functions that share the same name.

For Comet, we need the function names to match Spark.

@andygrove
Copy link
Member Author

For Comet, we need the function names to match Spark.

Actually, this isn't true. In Comet, we just need the Scala wrapper classes to have the same function names as Spark.

@comphead
Copy link
Contributor

comphead commented Feb 2, 2025

That is exactly what I was mentioning #14392 (comment) not sure how to separate out functions with the same names. I think this crate and PR makes a lot of sense until the user wants a mixed implementation, another thing is the code is duplicated, so changes in base function may not be reflected in spark variant.

@andygrove @alamb maybe we can do a conditional compilation on function level instead of separate crate?
Sort of introduce a feature spark and have different implementations

#[cfg(spark)]
fn to_timestamp() {}

#![cfg(spark)]
fn to_timestamp() {}

@andygrove
Copy link
Member Author

I'm going to move this to draft for now. Let's keep the conversation going, though.

I plan on working with the community to start adding tests to the datafusion-comet-spark-expr crate that don't depend on Spark, and to add some examples, as those are two large areas of valid concern.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

[DISCUSSION] Add separate crate to cover spark builtin functions