Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: add ExpandRel support to core and spark #295

Merged
merged 2 commits into from
Sep 27, 2024

Conversation

andrew-coleman
Copy link
Contributor

This PR implements the Expand relation in the core and spark modules.
It is required in the spark module to support optimised queries that contain distinct aggregations, and increases the number of successful test cases in the TPCDS suite.

@andrew-coleman
Copy link
Contributor Author

andrew-coleman commented Sep 20, 2024

osv-scanner failed due to vulnerability in protobuf-java, so I've updated this to its latest version 3.25.5.
https://osv.dev/vulnerability/GHSA-735f-pc8j-v9w8

Copy link
Member

@vbarua vbarua left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Left some questions and some comments.

@@ -21,6 +22,8 @@ public interface Rel {

List<Rel> getInputs();

Optional<RelCommon.Hint> getHint();
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We shouldn't include references to protobuf classes in this layer. To keep this layer independent of the protobufs we should introduce a Hint class and map it to and from protobuf.

@@ -72,7 +71,8 @@ class ToSubstraitRel extends AbstractLogicalPlanVisitor with Logging {
val substraitExps = expression.aggregateFunction.children.map(toExpression(output))
val invocation =
SparkExtension.toAggregateFunction.apply(expression, substraitExps)
relation.Aggregate.Measure.builder.function(invocation).build()
val filter = expression.filter map toExpression(output)
relation.Aggregate.Measure.builder.function(invocation).preMeasureFilter(Optional.ofNullable(filter.orNull)).build()
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I assume including the filter is an opportunistic bug fix?

I don't think it needs to be it's own PR, but in the future if you could make it a separate commit with a fix(spark) style message it makes it easier for me to notice and call out such changes in the release log.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure, I've split this into a separate commit.

@@ -67,6 +67,7 @@ class FunctionMappings {
s[Count]("count"),
s[Min]("min"),
s[Max]("max"),
s[First]("any_value"),
Copy link
Member

@vbarua vbarua Sep 20, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

While you can treat any_value as first, I don't think you can treat first as any_value. Is this mapping bi-directional?

Copy link
Contributor Author

@andrew-coleman andrew-coleman Sep 24, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure, my first proposal was to add the first function to functions_aggregate_general.yaml, but this suggestion was given as an alternative.
substrait-io/substrait#697 (comment)

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah, if Spark itself handles First as any_value this makes sense. Though truth be told if I'm being pedantic this sounds like Spark doesn't actually support First correctly. Not a Substrait problem though.

Copy link
Member

@vbarua vbarua left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Left some minor comments, can do a full pass tomorrow. I want to get this in this week.


public RelCommon.Hint toProto() {
var builder = RelCommon.Hint.newBuilder().addAllOutputNames(getOutputNames());
builder.setAlias(getAlias().orElse(""));
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it's preferable to leave this unset instead of setting it to "", otherwise we can't distinguish between an unset alias and the empty string.

return TypeCreator.of(initial.nullable())
.struct(
Stream.concat(
initial.fields().stream(), getFields().stream().map(ExpandField::getType)));
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

From https://github.com/substrait-io/substrait/blob/1f3354d9f0f8b4425e98623f34d1e4578e2142bd/proto/substrait/algebra.proto#L420-L425

// Duplicates records by emitting one or more rows per input row.  The number of rows emitted per
// input row is the same for all input rows.
//
// In addition to a field being emitted per input field an extra int64 field is emitted which
// contains a zero-indexed ordinal corresponding to the duplicate definition.

It sounds like the output record types consists of

  1. All input fields
  2. A single extra int64 field

From what I'm seeing, you're outputting all input fields + one field per expansion field. Is that correct? I'm not super familiar with this relation.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, I think you're correct

Copy link
Member

@vbarua vbarua left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Left some minor comments, but nothing blocking. I will merge this before Friday at latest (I've set a reminder for myself).

public Type.Struct deriveRecordType() {
Type.Struct initial = getInput().getRecordType();
return TypeCreator.of(initial.nullable())
.struct(Stream.concat(initial.fields().stream(), Stream.of(TypeCreator.REQUIRED.I64)));
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I just noticed that the protobuf comment

In addition to a field being emitted per input field an extra int64 field is emitted

Disagrees with the written spec:

The expand fields followed by an i32 column describing the index of the duplicate that the row is derived from.

We should reconcile this, eventually. I fine with picking one or the other for now but we may have to change it in the future.

Made a issue for this: substrait-io/substrait#714

builder
.commonExtension(optionalAdvancedExtension(rel.getCommon()))
.remap(optionalRelmap(rel.getCommon()))
.hint(optionalHint(rel.getCommon()));
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Made an issue to track handling of Hint information for all relations: #297

@@ -82,11 +81,15 @@ class ToLogicalPlan(spark: SparkSession) extends DefaultRelVisitor[LogicalPlan]
)
throw new IllegalArgumentException(msg)
})

val filter = Option(measure.getPreMeasureFilter.orElse(null))
.map(_.accept(expressionConverter))
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Note to myself to include this fix in the release notes.

@@ -193,6 +196,27 @@ class ToLogicalPlan(spark: SparkSession) extends DefaultRelVisitor[LogicalPlan]
}
}

override def visit(expand: relation.Expand): LogicalPlan = {
val child = expand.getInput.accept(this)
val names = expand.getHint.get().getOutputNames.asScala
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What happens if this is not set? Can you generate names for this on the fly?

I'm okay with leaving this as is for now, but generally when consuming a plan hints should be entirely optional for producers to set.

Copy link
Member

@vbarua vbarua left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Left some minor comments, but nothing blocking. I will merge this before Friday at latest (I've set a reminder for myself).

@vbarua vbarua merged commit 32fea18 into substrait-io:main Sep 27, 2024
12 checks passed
@andrew-coleman andrew-coleman deleted the expand_relation branch September 30, 2024 08:06
mbwhite pushed a commit to mbwhite/substrait-java that referenced this pull request Oct 1, 2024
feat(pojo): initial support for Hint messages
feat(pojo): builder support for ExpandRel

feat(spark): add mapping for any_value function
feat(spark): add support for consuming NullLiteral expressions
feat(spark): handle filter field on Measure
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants