Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: rand expression support #1199

Open
wants to merge 8 commits into
base: main
Choose a base branch
from

Conversation

akupchinskiy
Copy link

Which issue does this PR close?

Closes #1198

Rationale for this change

Support of the spark rand() expression

What changes are included in this PR?

  • rand() expression implementation
  • partition-awareness of the planner

How are these changes tested?

Spark compatibility tests and expression correctness test are included in the PR

@codecov-commenter
Copy link

Codecov Report

All modified and coverable lines are covered by tests ✅

Project coverage is 34.11%. Comparing base (58dee73) to head (7e4ca2c).
Report is 2 commits behind head on main.

Additional details and impacted files
@@             Coverage Diff              @@
##               main    #1199      +/-   ##
============================================
- Coverage     34.78%   34.11%   -0.67%     
+ Complexity      957      925      -32     
============================================
  Files           115      115              
  Lines         43569    43586      +17     
  Branches       9528     9556      +28     
============================================
- Hits          15155    14870     -285     
- Misses        25449    25763     +314     
+ Partials       2965     2953      -12     

☔ View full report in Codecov by Sentry.
📢 Have feedback on the report? Share it here.

@andygrove
Copy link
Member

Thanks @akupchinskiy. I plan on reviewing this after the holidays.

@mbutrovich
Copy link
Contributor

Are the partition related changes necessary for this PR? Otherwise, it might be better to reduce the scope to just the rand() expression.

Comment on lines 32 to 33
const DOUBLE_UNIT: f64 = 1.1102230246251565e-16;
const SPARK_MURMUR_ARRAY_SEED: u32 = 0x3c074a61;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It would really helpful if you could add documentation / refrences around these constants

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added doc comments with all the references.

@@ -317,7 +317,7 @@ pub unsafe extern "system" fn Java_org_apache_comet_Native_executePlan(
// query plan, we need to defer stream initialization to first time execution.
if exec_context.root_op.is_none() {
let start = Instant::now();
let planner = PhysicalPlanner::new(Arc::clone(&exec_context.session_ctx))
let planner = PhysicalPlanner::new(Arc::clone(&exec_context.session_ctx), partition)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

here is interesting. Is there any reason the partition is not used in Comet native physical planner? this is def used in DF physical plan during plan node execution https://github.com/apache/datafusion/blob/main/datafusion/physical-plan/src/execution_plan.rs#L371

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The spark partition index is erased when a native DF plan is sent for the execution for some reason : https://github.com/apache/datafusion-comet/blob/main/native/core/src/execution/jni_api.rs#L496

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is something that I would like to see improved. We currently use partition 0 for each native plan rather than the real partition id.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@andygrove Can i do it as a part of this PR or it would be better to create a separate one?

@akupchinskiy
Copy link
Author

Are the partition related changes necessary for this PR? Otherwise, it might be better to reduce the scope to just the rand() expression.

There is a handful of expressions besides rand() relying on the partition index. All of them implement nondetermenistic trait providing a hook method to initialize a state before a partition evaluation for spark runtime.

Encapsulation-wise, I agree that the scope of the partition exposure should be limited. But I could not find another way to extract it other than making it a part of a planner struct.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

Add support of rand() expression
7 participants