Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: Aggregation without aggregation expressions should use correct result expressions #175

Merged
merged 1 commit into from
Mar 9, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -1783,6 +1783,13 @@ object QueryPlanSerde extends Logging with ShimQueryPlanSerde {
if (aggregateExpressions.isEmpty) {
val hashAggBuilder = OperatorOuterClass.HashAggregate.newBuilder()
hashAggBuilder.addAllGroupingExprs(groupingExprs.map(_.get).asJava)
val attributes = groupingExpressions.map(_.toAttribute) ++ aggregateAttributes
val resultExprs = resultExpressions.map(exprToProto(_, attributes))
if (resultExprs.exists(_.isEmpty)) {
emitWarning(s"Unsupported result expressions found in: ${resultExpressions}")
return None
}
Comment on lines +1786 to +1791
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Even aggregateExpressions is empty, we still need to assign result expressions. This is the latest bug I found during fixing TPCDS query failures in SortMergeJoin work.

hashAggBuilder.addAllResultExprs(resultExprs.map(_.get).asJava)
Some(result.setHashAgg(hashAggBuilder).build())
} else {
val modes = aggregateExpressions.map(_.mode).distinct
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,22 @@ import org.apache.comet.CometSparkSessionExtensions.isSpark34Plus
class CometAggregateSuite extends CometTestBase with AdaptiveSparkPlanHelper {
import testImplicits._

test("Aggregation without aggregate expressions should use correct result expressions") {
withSQLConf(
CometConf.COMET_ENABLED.key -> "true",
CometConf.COMET_EXEC_SHUFFLE_ENABLED.key -> "true",
CometConf.COMET_COLUMNAR_SHUFFLE_ENABLED.key -> "true") {
withTempDir { dir =>
val path = new Path(dir.toURI.toString, "test")
makeParquetFile(path, 10000, 10, false)
withParquetTable(path.toUri.toString, "tbl") {
val df = sql("SELECT _g5 FROM tbl GROUP BY _g1, _g2, _g3, _g4, _g5")
checkSparkAnswer(df)
}
}
}
}

test("Final aggregation should not bind to the input of partial aggregation") {
withSQLConf(
CometConf.COMET_ENABLED.key -> "true",
Expand Down
Loading