Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: CometExec's outputPartitioning might not be same as Spark expects after AQE interferes #299

Merged
merged 3 commits into from
Apr 23, 2024

Conversation

viirya
Copy link
Member

@viirya viirya commented Apr 22, 2024

Which issue does this PR close?

Closes #298.

Rationale for this change

What changes are included in this PR?

How are these changes tested?

@viirya
Copy link
Member Author

viirya commented Apr 22, 2024

cc @sunchao @andygrove

@viirya viirya force-pushed the fix_output_partitioning branch from 45b4588 to d1257ea Compare April 22, 2024 14:47
@@ -377,7 +383,8 @@ case class CometProjectExec(
override val output: Seq[Attribute],
child: SparkPlan,
override val serializedPlanOpt: SerializedPlan)
extends CometUnaryExec {
extends CometUnaryExec
with PartitioningPreservingUnaryExecNode {
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

PartitioningPreservingUnaryExecNode implements outputPartitioning for ProjectExec.

@@ -586,7 +607,8 @@ case class CometHashAggregateExec(
mode: Option[AggregateMode],
child: SparkPlan,
override val serializedPlanOpt: SerializedPlan)
extends CometUnaryExec {
extends CometUnaryExec
with PartitioningPreservingUnaryExecNode {
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

PartitioningPreservingUnaryExecNode implements outputPartitioning for HashAggregateExec.

* This is copied from Spark's `PartitioningPreservingUnaryExecNode` because it is only available
* in Spark 3.4+. This is a workaround to make it available in Spark 3.2+.
*/
trait PartitioningPreservingUnaryExecNode extends UnaryExecNode with AliasAwareOutputExpression {
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Spark 3.4 has a bit more change on outputPartitioning of some nodes like ProjectExec. Copied it from Spark 3.4.

Comment on lines 490 to +492
extends CometUnaryExec {

override def outputPartitioning: Partitioning = child.outputPartitioning
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would it make sense to add this outputPartitioning to CometUnaryExec as the default?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Because Spark's default outputPartitioning is UnknownPartitioning, if we add child.outputPartitioning to CometUnaryExec as default, it will possibly change outputPartitioning if we don't notice it.

Currently CometExec uses original Spark plan's outputPartitioning as default which is safer option, I think. Except for the case that Spark dynamically changes output partitioning during execution like AQE, it should be correct because Comet doesn't change output partitioning from Spark.

Copy link
Member

@andygrove andygrove left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not an expert on the specific changes but it looks like we are just porting Spark logic over, so LGTM.

@viirya viirya force-pushed the fix_output_partitioning branch from 208d9e2 to f0cb1cb Compare April 23, 2024 14:53
@viirya
Copy link
Member Author

viirya commented Apr 23, 2024

Thank you @andygrove

@viirya viirya merged commit f789c21 into apache:main Apr 23, 2024
28 checks passed
@viirya
Copy link
Member Author

viirya commented Apr 23, 2024

Merged. Thanks.

@viirya viirya deleted the fix_output_partitioning branch April 23, 2024 16:04
himadripal pushed a commit to himadripal/datafusion-comet that referenced this pull request Sep 7, 2024
…s after AQE interferes (apache#299)

* fix: CometExec's outputPartitioning might not be same as Spark expects after AQE interferes

* Add compatibility with Spark 3.2 and 3.3

* Remove unused import
himadripal pushed a commit to himadripal/datafusion-comet that referenced this pull request Sep 7, 2024
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

CometExec's outputPartitioning might not be same as Spark expects after AQE interferes
2 participants