Skip to content

Commit

Permalink
Merge branch 'master' into Zhe-empty-csv-file
Browse files Browse the repository at this point in the history
  • Loading branch information
MiuMiuMiue authored Jan 16, 2025
2 parents 07e9257 + 13cb52e commit dec959f
Show file tree
Hide file tree
Showing 3 changed files with 5 additions and 4 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ package edu.uci.ics.amber.operator.source.scan.csv
import com.univocity.parsers.csv.{CsvFormat, CsvParser, CsvParserSettings}
import edu.uci.ics.amber.core.executor.SourceOperatorExecutor
import edu.uci.ics.amber.core.storage.DocumentFactory
import edu.uci.ics.amber.core.tuple.{AttributeTypeUtils, TupleLike}
import edu.uci.ics.amber.core.tuple.{AttributeTypeUtils, Schema, TupleLike}
import edu.uci.ics.amber.util.JSONUtils.objectMapper

import java.io.InputStreamReader
Expand All @@ -16,6 +16,7 @@ class CSVScanSourceOpExec private[csv] (descString: String) extends SourceOperat
var parser: CsvParser = _
var nextRow: Array[String] = _
var numRowGenerated = 0
private val schema: Schema = desc.sourceSchema()

override def produceTuple(): Iterator[TupleLike] = {

Expand All @@ -42,7 +43,7 @@ class CSVScanSourceOpExec private[csv] (descString: String) extends SourceOperat
try {
TupleLike(
ArraySeq.unsafeWrapArray(
AttributeTypeUtils.parseFields(row.asInstanceOf[Array[Any]], desc.sourceSchema())
AttributeTypeUtils.parseFields(row.asInstanceOf[Array[Any]], schema)
): _*
)
} catch {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ class ParallelCSVScanSourceOpExec private[csv] (
val desc: ParallelCSVScanSourceOpDesc =
objectMapper.readValue(descString, classOf[ParallelCSVScanSourceOpDesc])
private var reader: BufferedBlockReader = _
private val schema = desc.sourceSchema()

override def produceTuple(): Iterator[TupleLike] =
new Iterator[TupleLike]() {
Expand All @@ -42,7 +43,6 @@ class ParallelCSVScanSourceOpExec private[csv] (
return null
}

val schema = desc.sourceSchema()
// however the null values won't present if omitted in the end, we need to match nulls.
if (fields.length != schema.getAttributes.size)
fields = Stream
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,11 +21,11 @@ class JSONLScanSourceOpExec private[json] (
objectMapper.readValue(descString, classOf[JSONLScanSourceOpDesc])
private var rows: Iterator[String] = _
private var reader: BufferedReader = _
private val schema = desc.sourceSchema()

override def produceTuple(): Iterator[TupleLike] = {
rows.flatMap { line =>
Try {
val schema = desc.sourceSchema()
val data = JSONToMap(objectMapper.readTree(line), desc.flatten).withDefaultValue(null)
val fields = schema.getAttributeNames.map { fieldName =>
parseField(data(fieldName), schema.getAttribute(fieldName).getType)
Expand Down

0 comments on commit dec959f

Please sign in to comment.