Skip to content

Commit

Permalink
simplify code in CometExecIterator and avoid some small overhead (apa…
Browse files Browse the repository at this point in the history
  • Loading branch information
andygrove authored and kazuyukitanimura committed Jul 1, 2024
1 parent 2c4d0e4 commit 615ac97
Showing 1 changed file with 14 additions and 27 deletions.
41 changes: 14 additions & 27 deletions spark/src/main/scala/org/apache/comet/CometExecIterator.scala
Original file line number Diff line number Diff line change
Expand Up @@ -67,20 +67,6 @@ class CometExecIterator(
private var currentBatch: ColumnarBatch = null
private var closed: Boolean = false

private def executeNative(): ExecutionState = {
val result = nativeLib.executePlan(plan)

val flag = result(0)
if (flag == -1) EOF
else if (flag == 1) {
val numRows = result(1)
val addresses = result.slice(2, result.length)
Batch(numRows = numRows.toInt, addresses = addresses)
} else {
throw new IllegalStateException(s"Invalid native flag: $flag")
}
}

/**
* Creates a new configuration map to be passed to the native side.
*/
Expand Down Expand Up @@ -110,21 +96,22 @@ class CometExecIterator(
result
}

/** Execution result from Comet native */
trait ExecutionState

/** A new batch is available */
case class Batch(numRows: Int, addresses: Array[Long]) extends ExecutionState

/** The execution is finished - no more batch */
case object EOF extends ExecutionState

def getNextBatch(): Option[ColumnarBatch] = {
executeNative() match {
case EOF => None
case Batch(numRows, addresses) =>
// we execute the native plan each time we need another output batch and this could
// result in multiple input batches being processed
val result = nativeLib.executePlan(plan)

result(0) match {
case -1 =>
// EOF
None
case 1 =>
val numRows = result(1)
val addresses = result.slice(2, result.length)
val cometVectors = nativeUtil.importVector(addresses)
Some(new ColumnarBatch(cometVectors.toArray, numRows))
Some(new ColumnarBatch(cometVectors.toArray, numRows.toInt))
case flag =>
throw new IllegalStateException(s"Invalid native flag: $flag")
}
}

Expand Down

0 comments on commit 615ac97

Please sign in to comment.