From 13cb52ed5ae5698b859df08204ff0594dd34816d Mon Sep 17 00:00:00 2001 From: Shengquan Ni <13672781+shengquan-ni@users.noreply.github.com> Date: Wed, 15 Jan 2025 19:27:04 -0800 Subject: [PATCH] Fix slow scan operators (#3215) This PR fixes the slow workflow execution issue by calling desc.sourceSchema only once in the scan operator executor's constructor, instead of for every output tuple. This PR also includes some reformatting to remove unused imports. --- .../amber/operator/source/scan/csv/CSVScanSourceOpExec.scala | 5 +++-- .../source/scan/csv/ParallelCSVScanSourceOpExec.scala | 2 +- .../operator/source/scan/json/JSONLScanSourceOpExec.scala | 2 +- 3 files changed, 5 insertions(+), 4 deletions(-) diff --git a/core/workflow-operator/src/main/scala/edu/uci/ics/amber/operator/source/scan/csv/CSVScanSourceOpExec.scala b/core/workflow-operator/src/main/scala/edu/uci/ics/amber/operator/source/scan/csv/CSVScanSourceOpExec.scala index df5953371e..eb99ae4f83 100644 --- a/core/workflow-operator/src/main/scala/edu/uci/ics/amber/operator/source/scan/csv/CSVScanSourceOpExec.scala +++ b/core/workflow-operator/src/main/scala/edu/uci/ics/amber/operator/source/scan/csv/CSVScanSourceOpExec.scala @@ -3,7 +3,7 @@ package edu.uci.ics.amber.operator.source.scan.csv import com.univocity.parsers.csv.{CsvFormat, CsvParser, CsvParserSettings} import edu.uci.ics.amber.core.executor.SourceOperatorExecutor import edu.uci.ics.amber.core.storage.DocumentFactory -import edu.uci.ics.amber.core.tuple.{AttributeTypeUtils, TupleLike} +import edu.uci.ics.amber.core.tuple.{AttributeTypeUtils, Schema, TupleLike} import edu.uci.ics.amber.util.JSONUtils.objectMapper import java.io.InputStreamReader @@ -16,6 +16,7 @@ class CSVScanSourceOpExec private[csv] (descString: String) extends SourceOperat var parser: CsvParser = _ var nextRow: Array[String] = _ var numRowGenerated = 0 + private val schema: Schema = desc.sourceSchema() override def produceTuple(): Iterator[TupleLike] = { @@ -42,7 +43,7 @@ class CSVScanSourceOpExec private[csv] (descString: String) extends SourceOperat try { TupleLike( ArraySeq.unsafeWrapArray( - AttributeTypeUtils.parseFields(row.asInstanceOf[Array[Any]], desc.sourceSchema()) + AttributeTypeUtils.parseFields(row.asInstanceOf[Array[Any]], schema) ): _* ) } catch { diff --git a/core/workflow-operator/src/main/scala/edu/uci/ics/amber/operator/source/scan/csv/ParallelCSVScanSourceOpExec.scala b/core/workflow-operator/src/main/scala/edu/uci/ics/amber/operator/source/scan/csv/ParallelCSVScanSourceOpExec.scala index 108050c397..f17a7901f2 100644 --- a/core/workflow-operator/src/main/scala/edu/uci/ics/amber/operator/source/scan/csv/ParallelCSVScanSourceOpExec.scala +++ b/core/workflow-operator/src/main/scala/edu/uci/ics/amber/operator/source/scan/csv/ParallelCSVScanSourceOpExec.scala @@ -20,6 +20,7 @@ class ParallelCSVScanSourceOpExec private[csv] ( val desc: ParallelCSVScanSourceOpDesc = objectMapper.readValue(descString, classOf[ParallelCSVScanSourceOpDesc]) private var reader: BufferedBlockReader = _ + private val schema = desc.sourceSchema() override def produceTuple(): Iterator[TupleLike] = new Iterator[TupleLike]() { @@ -42,7 +43,6 @@ class ParallelCSVScanSourceOpExec private[csv] ( return null } - val schema = desc.sourceSchema() // however the null values won't present if omitted in the end, we need to match nulls. if (fields.length != schema.getAttributes.size) fields = Stream diff --git a/core/workflow-operator/src/main/scala/edu/uci/ics/amber/operator/source/scan/json/JSONLScanSourceOpExec.scala b/core/workflow-operator/src/main/scala/edu/uci/ics/amber/operator/source/scan/json/JSONLScanSourceOpExec.scala index 9c4e9b01e0..bb13a06fdf 100644 --- a/core/workflow-operator/src/main/scala/edu/uci/ics/amber/operator/source/scan/json/JSONLScanSourceOpExec.scala +++ b/core/workflow-operator/src/main/scala/edu/uci/ics/amber/operator/source/scan/json/JSONLScanSourceOpExec.scala @@ -21,11 +21,11 @@ class JSONLScanSourceOpExec private[json] ( objectMapper.readValue(descString, classOf[JSONLScanSourceOpDesc]) private var rows: Iterator[String] = _ private var reader: BufferedReader = _ + private val schema = desc.sourceSchema() override def produceTuple(): Iterator[TupleLike] = { rows.flatMap { line => Try { - val schema = desc.sourceSchema() val data = JSONToMap(objectMapper.readTree(line), desc.flatten).withDefaultValue(null) val fields = schema.getAttributeNames.map { fieldName => parseField(data(fieldName), schema.getAttribute(fieldName).getType)