Skip to content

Commit

Permalink
Stable behavior
Browse files Browse the repository at this point in the history
  • Loading branch information
RustedBones committed Nov 28, 2024
1 parent 98e3faf commit 35a1b7e
Show file tree
Hide file tree
Showing 4 changed files with 51 additions and 22 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -177,14 +177,16 @@ class TypedBigQueryIT extends PipelineSpec with BeforeAndAfterAll {
.saveAsBigQueryTable(
avroTable,
schema = Record.schema, // This is a bad API. an avro schema should be expected
createDisposition = CREATE_IF_NEEDED
createDisposition = CREATE_IF_NEEDED,
configOverride = _.useAvroLogicalTypes() // annoying this is not exposed
)
}.waitUntilFinish()

waitForTable(avroTable)

runWithRealContext(options) { sc =>
val data = sc.bigQueryTable(avroTable, Format.GenericRecord).map(Record.fromAvro)
val data =
sc.bigQueryTable(avroTable, Format.GenericRecordWithLogicalTypes).map(Record.fromAvro)
data should containInAnyOrder(records)
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -328,7 +328,11 @@ object BigQueryTypedTable {
/** Defines the format in which BigQuery can be read and written to. */
sealed abstract class Format[F]
object Format {
case object GenericRecord extends Format[GenericRecord]
sealed abstract private[bigquery] class AvroFormat(val useLogicalTypes: Boolean)
extends Format[GenericRecord]

case object GenericRecord extends AvroFormat(false)
case object GenericRecordWithLogicalTypes extends AvroFormat(true)
case object TableRow extends Format[TableRow]
}

Expand Down Expand Up @@ -389,25 +393,31 @@ object BigQueryTypedTable {
)(coders.tableRowCoder)

private[this] def genericRecord(
table: Table
table: Table,
useLogicalTypes: Boolean
)(implicit c: Coder[GenericRecord]): BigQueryTypedTable[GenericRecord] =
BigQueryTypedTable(
_.getRecord(),
identity[GenericRecord],
(genericRecord: GenericRecord, _: TableSchema) => genericRecord,
table
beam.BigQueryIO
.read(_.getRecord)
.pipe(r => if (useLogicalTypes) r.useAvroLogicalTypes() else r),
beam.BigQueryIO
.write[GenericRecord]()
.withAvroFormatFunction(_.getElement)
.pipe(r => if (useLogicalTypes) r.useAvroLogicalTypes() else r),
table,
(genericRecord: GenericRecord, _: TableSchema) => genericRecord
)

/**
* Creates a new instance of [[BigQueryTypedTable]] based on the supplied [[Format]].
*
* NOTE: LogicalType support when using `Format.GenericRecord` has some caveats: Reading: Bigquery
* types DATE, TIME, DATIME will be read as STRING. Writing: Supports LogicalTypes only for DATE
* and TIME. DATETIME is not yet supported. https://issuetracker.google.com/issues/140681683
* types DATE, TIME, DATEIME will be read as STRING. Use `Format.GenericRecordWithLogicalTypes`
* for avro `date`, `timestamp-micros` and `local-timestamp-micros` (avro 1.10+)
*/
def apply[F: Coder](table: Table, format: Format[F]): BigQueryTypedTable[F] =
format match {
case Format.GenericRecord => genericRecord(table)
case f: Format.AvroFormat => genericRecord(table, f.useLogicalTypes)
case Format.TableRow => tableRow(table)
}

Expand Down Expand Up @@ -444,9 +454,6 @@ object BigQueryTypedTable {
.write[T]()
.useAvroLogicalTypes()
.withAvroFormatFunction(input => wFn(input.getElement()))
.withAvroSchemaFactory { ts =>
BigQueryUtils.toGenericAvroSchema("root", ts.getFields(), true)
}

BigQueryTypedTable(reader, writer, table, fn)
}
Expand Down Expand Up @@ -740,12 +747,31 @@ object BigQueryTyped {
override type ReadP = Unit
override type WriteP = Table.WriteParam[T]

private val underlying = BigQueryTypedTable[T](
(i: SchemaAndRecord) => BigQueryType[T].fromAvro(i.getRecord),
BigQueryType[T].toTableRow,
BigQueryType[T].fromTableRow,
table
)
private val underlying = {
val readFn = Functions.serializableFn[SchemaAndRecord, T] { x =>
BigQueryType[T].fromAvro(x.getRecord)
}
val writeFn = Functions.serializableFn[AvroWriteRequest[T], GenericRecord] { x =>
BigQueryType[T].toAvro(x.getElement)
}
val schemaFactory = Functions.serializableFn[TableSchema, org.apache.avro.Schema] { _ =>
BigQueryType[T].avroSchema
}
val parseFn = (r: GenericRecord, _: TableSchema) => BigQueryType[T].fromAvro(r)

BigQueryTypedTable[T](
beam.BigQueryIO
.read(readFn)
.useAvroLogicalTypes(),
beam.BigQueryIO
.write[T]()
.withAvroFormatFunction(writeFn)
.withAvroSchemaFactory(schemaFactory)
.useAvroLogicalTypes(),
table,
parseFn
)
}

override def testId: String = s"BigQueryIO(${table.spec})"

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,8 @@ final class ScioContextOps(private val self: ScioContext) extends AnyVal {
* Reading records as GenericRecord **should** offer better performance over TableRow records.
*
* Note: When using `Format.GenericRecord` Bigquery types DATE, TIME and DATETIME are read as
* STRING.
* STRING. Use `Format.GenericRecordWithLogicalTypes` for avro `date`, `timestamp-micros` and
* `local-timestamp-micros` (avro 1.10+)
*/
def bigQueryTable[F: Coder](table: Table, format: Format[F]): SCollection[F] =
self.read(BigQueryTypedTable(table, format))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -334,7 +334,7 @@ private[types] object TypeProvider {
q"override def schema: ${p(c, GModel)}.TableSchema = ${p(c, SUtil)}.parseSchema(${schema.toString})"
}
val defAvroSchema =
q"override def avroSchema: org.apache.avro.Schema = ${p(c, BigQueryUtils)}.toGenericAvroSchema(${cName.toString}, this.schema.getFields)"
q"override def avroSchema: org.apache.avro.Schema = ${p(c, BigQueryUtils)}.toGenericAvroSchema(this.schema, true)"
val defToPrettyString =
q"override def toPrettyString(indent: Int = 0): String = ${p(c, s"$SBQ.types.SchemaUtil")}.toPrettyString(this.schema, ${cName.toString}, indent)"

Expand Down

0 comments on commit 35a1b7e

Please sign in to comment.