Skip to content

Commit

Permalink
[GLUTEN-7807] Bind attr with name if its exprId is not found in trans…
Browse files Browse the repository at this point in the history
…forming relation (#7819)
  • Loading branch information
yikf authored Nov 6, 2024
1 parent 17080e7 commit 49e1ca5
Showing 1 changed file with 13 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ import org.apache.gluten.utils.ArrowAbiUtil
import org.apache.gluten.vectorized.{ColumnarBatchSerializerJniWrapper, NativeColumnarToRowJniWrapper}

import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.expressions.{Attribute, Expression, UnsafeProjection, UnsafeRow}
import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeReference, Expression, UnsafeProjection, UnsafeRow}
import org.apache.spark.sql.execution.joins.BuildSideRelation
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.utils.SparkArrowUtil
Expand Down Expand Up @@ -103,6 +103,18 @@ case class ColumnarBuildSideRelation(output: Seq[Attribute], batches: Array[Arra

var closed = false

val exprIds = output.map(_.exprId)
val projExpr = key.transformDown {
case attr: AttributeReference if !exprIds.contains(attr.exprId) =>
val i = output.count(_.name == attr.name)
if (i != 1) {
throw new IllegalArgumentException(s"Only one attr with the same name is supported: $key")
} else {
output.find(_.name == attr.name).get
}
}
val proj = UnsafeProjection.create(Seq(projExpr), output)

// Convert columnar to Row.
val jniWrapper = NativeColumnarToRowJniWrapper.create(runtime)
val c2rId = jniWrapper.nativeColumnarToRowInit()
Expand Down Expand Up @@ -141,7 +153,6 @@ case class ColumnarBuildSideRelation(output: Seq[Attribute], batches: Array[Arra
ColumnarBatches.getNativeHandle(batch),
0)
batch.close()
val proj = UnsafeProjection.create(Seq(key), output)

new Iterator[InternalRow] {
var rowId = 0
Expand Down

0 comments on commit 49e1ca5

Please sign in to comment.