Skip to content

Commit

Permalink
Fix slow scan operators (#3215)
Browse files Browse the repository at this point in the history
This PR fixes the slow workflow execution issue by calling
desc.sourceSchema only once in the scan operator executor's constructor,
instead of for every output tuple.

This PR also includes some reformatting to remove unused imports.
  • Loading branch information
shengquan-ni authored Jan 16, 2025
1 parent 64007f7 commit 13cb52e
Show file tree
Hide file tree
Showing 3 changed files with 5 additions and 4 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ package edu.uci.ics.amber.operator.source.scan.csv
import com.univocity.parsers.csv.{CsvFormat, CsvParser, CsvParserSettings}
import edu.uci.ics.amber.core.executor.SourceOperatorExecutor
import edu.uci.ics.amber.core.storage.DocumentFactory
import edu.uci.ics.amber.core.tuple.{AttributeTypeUtils, TupleLike}
import edu.uci.ics.amber.core.tuple.{AttributeTypeUtils, Schema, TupleLike}
import edu.uci.ics.amber.util.JSONUtils.objectMapper

import java.io.InputStreamReader
Expand All @@ -16,6 +16,7 @@ class CSVScanSourceOpExec private[csv] (descString: String) extends SourceOperat
var parser: CsvParser = _
var nextRow: Array[String] = _
var numRowGenerated = 0
private val schema: Schema = desc.sourceSchema()

override def produceTuple(): Iterator[TupleLike] = {

Expand All @@ -42,7 +43,7 @@ class CSVScanSourceOpExec private[csv] (descString: String) extends SourceOperat
try {
TupleLike(
ArraySeq.unsafeWrapArray(
AttributeTypeUtils.parseFields(row.asInstanceOf[Array[Any]], desc.sourceSchema())
AttributeTypeUtils.parseFields(row.asInstanceOf[Array[Any]], schema)
): _*
)
} catch {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ class ParallelCSVScanSourceOpExec private[csv] (
val desc: ParallelCSVScanSourceOpDesc =
objectMapper.readValue(descString, classOf[ParallelCSVScanSourceOpDesc])
private var reader: BufferedBlockReader = _
private val schema = desc.sourceSchema()

override def produceTuple(): Iterator[TupleLike] =
new Iterator[TupleLike]() {
Expand All @@ -42,7 +43,6 @@ class ParallelCSVScanSourceOpExec private[csv] (
return null
}

val schema = desc.sourceSchema()
// however the null values won't present if omitted in the end, we need to match nulls.
if (fields.length != schema.getAttributes.size)
fields = Stream
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,11 +21,11 @@ class JSONLScanSourceOpExec private[json] (
objectMapper.readValue(descString, classOf[JSONLScanSourceOpDesc])
private var rows: Iterator[String] = _
private var reader: BufferedReader = _
private val schema = desc.sourceSchema()

override def produceTuple(): Iterator[TupleLike] = {
rows.flatMap { line =>
Try {
val schema = desc.sourceSchema()
val data = JSONToMap(objectMapper.readTree(line), desc.flatten).withDefaultValue(null)
val fields = schema.getAttributeNames.map { fieldName =>
parseField(data(fieldName), schema.getAttribute(fieldName).getType)
Expand Down

0 comments on commit 13cb52e

Please sign in to comment.