-
Notifications
You must be signed in to change notification settings - Fork 176
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
chore: Move all array_* serde to new framework, use correct INCOMPAT config #1349
Changes from all commits
2d97a57
7f00ab9
6460add
5c333b7
1c5e992
a3e9c46
d44bb0e
d6fa542
e523871
23accfa
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change | ||||
---|---|---|---|---|---|---|
|
@@ -929,6 +929,19 @@ object QueryPlanSerde extends Logging with ShimQueryPlanSerde with CometExprShim | |||||
binding: Boolean): Option[Expr] = { | ||||||
SQLConf.get | ||||||
|
||||||
def convert(handler: CometExpressionSerde): Option[Expr] = { | ||||||
handler match { | ||||||
case _: IncompatExpr if !CometConf.COMET_EXPR_ALLOW_INCOMPATIBLE.get() => | ||||||
withInfo( | ||||||
expr, | ||||||
s"$expr is not fully compatible with Spark. To enable it anyway, set " + | ||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Suggested change
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I pushed a commit to update this by referencing a new |
||||||
s"${CometConf.COMET_EXPR_ALLOW_INCOMPATIBLE.key}=true. ${CometConf.COMPAT_GUIDE}.") | ||||||
None | ||||||
case _ => | ||||||
handler.convert(expr, inputs, binding) | ||||||
} | ||||||
} | ||||||
|
||||||
expr match { | ||||||
case a @ Alias(_, _) => | ||||||
val r = exprToProtoInternal(a.child, inputs, binding) | ||||||
|
@@ -2371,83 +2384,19 @@ object QueryPlanSerde extends Logging with ShimQueryPlanSerde with CometExprShim | |||||
withInfo(expr, "unsupported arguments for GetArrayStructFields", child) | ||||||
None | ||||||
} | ||||||
case expr: ArrayRemove => CometArrayRemove.convert(expr, inputs, binding) | ||||||
case expr if expr.prettyName == "array_contains" => | ||||||
createBinaryExpr( | ||||||
expr, | ||||||
expr.children(0), | ||||||
expr.children(1), | ||||||
inputs, | ||||||
binding, | ||||||
(builder, binaryExpr) => builder.setArrayContains(binaryExpr)) | ||||||
case _ if expr.prettyName == "array_append" => | ||||||
createBinaryExpr( | ||||||
expr, | ||||||
expr.children(0), | ||||||
expr.children(1), | ||||||
inputs, | ||||||
binding, | ||||||
(builder, binaryExpr) => builder.setArrayAppend(binaryExpr)) | ||||||
case _ if expr.prettyName == "array_intersect" => | ||||||
createBinaryExpr( | ||||||
expr, | ||||||
expr.children(0), | ||||||
expr.children(1), | ||||||
inputs, | ||||||
binding, | ||||||
(builder, binaryExpr) => builder.setArrayIntersect(binaryExpr)) | ||||||
case ArrayJoin(arrayExpr, delimiterExpr, nullReplacementExpr) => | ||||||
val arrayExprProto = exprToProto(arrayExpr, inputs, binding) | ||||||
val delimiterExprProto = exprToProto(delimiterExpr, inputs, binding) | ||||||
|
||||||
if (arrayExprProto.isDefined && delimiterExprProto.isDefined) { | ||||||
val arrayJoinBuilder = nullReplacementExpr match { | ||||||
case Some(nrExpr) => | ||||||
val nullReplacementExprProto = exprToProto(nrExpr, inputs, binding) | ||||||
ExprOuterClass.ArrayJoin | ||||||
.newBuilder() | ||||||
.setArrayExpr(arrayExprProto.get) | ||||||
.setDelimiterExpr(delimiterExprProto.get) | ||||||
.setNullReplacementExpr(nullReplacementExprProto.get) | ||||||
case None => | ||||||
ExprOuterClass.ArrayJoin | ||||||
.newBuilder() | ||||||
.setArrayExpr(arrayExprProto.get) | ||||||
.setDelimiterExpr(delimiterExprProto.get) | ||||||
} | ||||||
Some( | ||||||
ExprOuterClass.Expr | ||||||
.newBuilder() | ||||||
.setArrayJoin(arrayJoinBuilder) | ||||||
.build()) | ||||||
} else { | ||||||
val exprs: List[Expression] = nullReplacementExpr match { | ||||||
case Some(nrExpr) => List(arrayExpr, delimiterExpr, nrExpr) | ||||||
case None => List(arrayExpr, delimiterExpr) | ||||||
} | ||||||
withInfo(expr, "unsupported arguments for ArrayJoin", exprs: _*) | ||||||
None | ||||||
} | ||||||
case ArraysOverlap(leftArrayExpr, rightArrayExpr) => | ||||||
if (CometConf.COMET_CAST_ALLOW_INCOMPATIBLE.get()) { | ||||||
createBinaryExpr( | ||||||
expr, | ||||||
leftArrayExpr, | ||||||
rightArrayExpr, | ||||||
inputs, | ||||||
binding, | ||||||
(builder, binaryExpr) => builder.setArraysOverlap(binaryExpr)) | ||||||
} else { | ||||||
withInfo( | ||||||
expr, | ||||||
s"$expr is not supported yet. To enable all incompatible casts, set " + | ||||||
s"${CometConf.COMET_CAST_ALLOW_INCOMPATIBLE.key}=true") | ||||||
None | ||||||
} | ||||||
case _: ArrayRemove => convert(CometArrayRemove) | ||||||
case _: ArrayContains => convert(CometArrayContains) | ||||||
// Function introduced in 3.4.0. Refer by name to provide compatibility | ||||||
// with older Spark builds | ||||||
case _ if expr.prettyName == "array_append" => convert(CometArrayAppend) | ||||||
andygrove marked this conversation as resolved.
Show resolved
Hide resolved
|
||||||
case _: ArrayIntersect => convert(CometArrayIntersect) | ||||||
case _: ArrayJoin => convert(CometArrayJoin) | ||||||
case _: ArraysOverlap => convert(CometArraysOverlap) | ||||||
case _ => | ||||||
withInfo(expr, s"${expr.prettyName} is not supported", expr.children: _*) | ||||||
None | ||||||
} | ||||||
|
||||||
} | ||||||
|
||||||
/** | ||||||
|
@@ -3490,3 +3439,6 @@ trait CometExpressionSerde { | |||||
inputs: Seq[Attribute], | ||||||
binding: Boolean): Option[ExprOuterClass.Expr] | ||||||
} | ||||||
|
||||||
/** Marker trait for an expression that is not guaranteed to be 100% compatible with Spark */ | ||||||
trait IncompatExpr {} | ||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Later, we could add a method to this trait to provide the reason why an expression is incompatible and this could be used to generate documentation. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Not a blocker but looks like the compatibility guide does not mention about the array related issues
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks. I had updated this locally but forgot that these changes have to go in
compatibility-template.md
instead ofcompatibility,md
. 🤦♂️I have added them now.