Skip to content

Commit

Permalink
Merge remote-tracking branch 'origin/main' into kellen/jPQcoder
Browse files Browse the repository at this point in the history
  • Loading branch information
RustedBones committed Nov 27, 2024
2 parents e334184 + a1fce09 commit 92fad69
Show file tree
Hide file tree
Showing 13 changed files with 235 additions and 190 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -366,7 +366,7 @@ jobs:
run: sbt '++ ${{ matrix.scala }}' coverage test coverageAggregate

- name: Upload coverage report
uses: codecov/codecov-action@v4
uses: codecov/codecov-action@v5
with:
token: ${{ secrets.CODECOV_TOKEN }}

Expand Down
8 changes: 7 additions & 1 deletion build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -294,7 +294,7 @@ ThisBuild / githubWorkflowAddedJobs ++= Seq(
name = Some("Test coverage")
),
WorkflowStep.Use(
UseRef.Public("codecov", "codecov-action", "v4"),
UseRef.Public("codecov", "codecov-action", "v5"),
Map("token" -> "${{ secrets.CODECOV_TOKEN }}"),
name = Some("Upload coverage report")
)
Expand Down Expand Up @@ -424,6 +424,10 @@ ThisBuild / mimaBinaryIssueFilters ++= Seq(
),
ProblemFilters.exclude[DirectAbstractMethodProblem](
"org.apache.beam.sdk.coders.Coder.getCoderArguments"
),
// added BQ Json object
ProblemFilters.exclude[MissingTypesProblem](
"com.spotify.scio.bigquery.types.package$Json$"
)
)

Expand Down Expand Up @@ -962,6 +966,7 @@ lazy val `scio-google-cloud-platform` = project
libraryDependencies ++= Seq(
// compile
"com.esotericsoftware" % "kryo-shaded" % kryoVersion,
"com.fasterxml.jackson.core" % "jackson-databind" % jacksonVersion,
"com.google.api" % "gax" % gcpBom.key.value,
"com.google.api" % "gax-grpc" % gcpBom.key.value,
"com.google.api-client" % "google-api-client" % gcpBom.key.value,
Expand Down Expand Up @@ -1714,6 +1719,7 @@ lazy val integration = project
unusedCompileDependenciesTest := unusedCompileDependenciesTestSkipped.value,
libraryDependencies ++= Seq(
// compile
"com.fasterxml.jackson.core" % "jackson-databind" % jacksonVersion,
"com.google.api-client" % "google-api-client" % gcpBom.key.value,
"com.google.apis" % "google-api-services-bigquery" % googleApiServicesBigQueryVersion,
"com.google.guava" % "guava" % guavaVersion,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,21 +18,21 @@
package com.spotify.scio.bigquery

import com.google.protobuf.ByteString
import com.spotify.scio._
import com.spotify.scio.avro._
import com.spotify.scio.coders.Coder
import com.spotify.scio.bigquery.BigQueryTypedTable.Format
import com.spotify.scio.bigquery.client.BigQuery
import com.spotify.scio.bigquery.types.{BigNumeric, Geography, Json}
import com.spotify.scio.testing._
import magnolify.scalacheck.auto._
import org.apache.avro.{LogicalTypes, Schema, SchemaBuilder}
import org.apache.avro.generic.{GenericRecord, GenericRecordBuilder}
import org.apache.avro.generic.GenericRecord
import org.apache.beam.sdk.options.PipelineOptionsFactory
import org.joda.time.{Instant, LocalDate, LocalDateTime, LocalTime}
import org.joda.time.format.DateTimeFormat
import org.scalacheck._
import org.scalatest.BeforeAndAfterAll

import scala.util.Random
import scala.util.{Random, Try}

object TypedBigQueryIT {
@BigQueryType.toTable
Expand All @@ -42,24 +42,53 @@ object TypedBigQueryIT {
long: Long,
float: Float,
double: Double,
numeric: BigDecimal,
string: String,
byteString: ByteString,
timestamp: Instant,
date: LocalDate,
time: LocalTime,
datetime: LocalDateTime
datetime: LocalDateTime,
geography: Geography,
json: Json,
bigNumeric: BigNumeric
)

// Workaround for millis rounding error
val epochGen: Gen[Long] = Gen.chooseNum[Long](0L, 1000000000000L).map(x => x / 1000 * 1000)
def arbBigDecimal(precision: Int, scale: Int): Arbitrary[BigDecimal] = Arbitrary {
val max = BigInt(10).pow(precision) - 1
Gen.choose(-max, max).map(BigDecimal(_, scale))
}

implicit val arbNumeric: Arbitrary[BigDecimal] =
arbBigDecimal(Numeric.MaxNumericPrecision, Numeric.MaxNumericScale)
implicit val arbString: Arbitrary[String] = Arbitrary(Gen.alphaStr)
implicit val arbByteString: Arbitrary[ByteString] = Arbitrary(
Gen.alphaStr.map(ByteString.copyFromUtf8)
)
// Workaround for millis rounding error
val epochGen: Gen[Long] = Gen.chooseNum[Long](0L, 1000000000000L).map(x => x / 1000 * 1000)
implicit val arbInstant: Arbitrary[Instant] = Arbitrary(epochGen.map(new Instant(_)))
implicit val arbDate: Arbitrary[LocalDate] = Arbitrary(epochGen.map(new LocalDate(_)))
implicit val arbTime: Arbitrary[LocalTime] = Arbitrary(epochGen.map(new LocalTime(_)))
implicit val arbDatetime: Arbitrary[LocalDateTime] = Arbitrary(epochGen.map(new LocalDateTime(_)))
implicit val arbGeography: Arbitrary[Geography] = Arbitrary(
for {
x <- Gen.numChar
y <- Gen.numChar
} yield Geography(s"POINT($x $y)")
)
implicit val arbJson: Arbitrary[Json] = Arbitrary(
for {
key <- Gen.alphaStr
value <- Gen.alphaStr
} yield Json(s"""{"$key":"$value"}""")
)

implicit val arbBigNumeric: Arbitrary[BigNumeric] = Arbitrary {
// Precision: 76.76 (the 77th digit is partial)
arbBigDecimal(BigNumeric.MaxNumericPrecision - 1, BigNumeric.MaxNumericScale).arbitrary
.map(BigNumeric.apply)
}

private val recordGen =
implicitly[Arbitrary[Record]].arbitrary
Expand All @@ -71,9 +100,9 @@ object TypedBigQueryIT {
s"data-integration-test:bigquery_avro_it.$name${now}_${Random.nextInt(Int.MaxValue)}"
Table.Spec(spec)
}
private val typedTable = table("records")
private val tableRowTable = table("records_tablerow")
private val avroTable = table("records_avro")
private val avroLogicalTypeTable = table("records_avro_logical_type")

private val records = Gen.listOfN(100, recordGen).sample.get
private val options = PipelineOptionsFactory
Expand All @@ -87,122 +116,59 @@ object TypedBigQueryIT {
class TypedBigQueryIT extends PipelineSpec with BeforeAndAfterAll {
import TypedBigQueryIT._

override protected def beforeAll(): Unit = {
val sc = ScioContext(options)
sc.parallelize(records).saveAsTypedBigQueryTable(tableRowTable)

sc.run()
()
}

override protected def afterAll(): Unit = {
BigQuery.defaultInstance().tables.delete(tableRowTable.ref)
BigQuery.defaultInstance().tables.delete(avroTable.ref)
BigQuery.defaultInstance().tables.delete(avroLogicalTypeTable.ref)
}

"TypedBigQuery" should "read records" in {
val sc = ScioContext(options)
sc.typedBigQuery[Record](tableRowTable) should containInAnyOrder(records)
sc.run()
val bq = BigQuery.defaultInstance()
// best effort cleanup
Try(bq.tables.delete(typedTable.ref))
Try(bq.tables.delete(tableRowTable.ref))
Try(bq.tables.delete(avroTable.ref))
}

it should "convert to avro format" in {
val sc = ScioContext(options)
implicit val coder = avroGenericRecordCoder(Record.avroSchema)
sc.typedBigQuery[Record](tableRowTable)
.map(Record.toAvro)
.map(Record.fromAvro) should containInAnyOrder(
records
)
sc.run()
}
"TypedBigQuery" should "handle records as TableRow" in {
runWithRealContext(options) { sc =>
sc.parallelize(records)
.saveAsTypedBigQueryTable(typedTable, createDisposition = CREATE_IF_NEEDED)
}.waitUntilFinish()

"BigQueryTypedTable" should "read TableRow records" in {
val sc = ScioContext(options)
sc
.bigQueryTable(tableRowTable)
.map(Record.fromTableRow) should containInAnyOrder(records)
sc.run()
runWithRealContext(options) { sc =>
val data = sc.typedBigQuery[Record](typedTable)
data should containInAnyOrder(records)
}
}

it should "read GenericRecord recors" in {
val sc = ScioContext(options)
implicit val coder = avroGenericRecordCoder(Record.avroSchema)
sc
.bigQueryTable(tableRowTable, Format.GenericRecord)
.map(Record.fromAvro) should containInAnyOrder(records)
sc.run()
"BigQueryTypedTable" should "handle records as TableRow format" in {
runWithRealContext(options) { sc =>
sc.parallelize(records)
.map(Record.toTableRow)
.saveAsBigQueryTable(
tableRowTable,
schema = Record.schema,
createDisposition = CREATE_IF_NEEDED
)
}.waitUntilFinish()

runWithRealContext(options) { sc =>
val data = sc.bigQueryTable(tableRowTable).map(Record.fromTableRow)
data should containInAnyOrder(records)
}
}

it should "write GenericRecord records" in {
val sc = ScioContext(options)
implicit val coder = avroGenericRecordCoder(Record.avroSchema)
val schema =
BigQueryUtil.parseSchema("""
|{
| "fields": [
| {"mode": "NULLABLE", "name": "bool", "type": "BOOLEAN"},
| {"mode": "NULLABLE", "name": "int", "type": "INTEGER"},
| {"mode": "NULLABLE", "name": "long", "type": "INTEGER"},
| {"mode": "NULLABLE", "name": "float", "type": "FLOAT"},
| {"mode": "NULLABLE", "name": "double", "type": "FLOAT"},
| {"mode": "NULLABLE", "name": "string", "type": "STRING"},
| {"mode": "NULLABLE", "name": "byteString", "type": "BYTES"},
| {"mode": "NULLABLE", "name": "timestamp", "type": "INTEGER"},
| {"mode": "NULLABLE", "name": "date", "type": "STRING"},
| {"mode": "NULLABLE", "name": "time", "type": "STRING"},
| {"mode": "NULLABLE", "name": "datetime", "type": "STRING"}
| ]
|}
""".stripMargin)
val tap = sc
.bigQueryTable(tableRowTable, Format.GenericRecord)
.saveAsBigQueryTable(avroTable, schema = schema, createDisposition = CREATE_IF_NEEDED)

val result = sc.run().waitUntilDone()
result.tap(tap).map(Record.fromAvro).value.toSet shouldBe records.toSet
// TODO fix if in beam 2.61
ignore should "handle records as avro format" in {
implicit val coder: Coder[GenericRecord] = avroGenericRecordCoder(Record.avroSchema)
runWithRealContext(options) { sc =>
sc.parallelize(records)
.map(Record.toAvro)
.saveAsBigQueryTable(
avroTable,
schema = Record.schema, // This is a bad API. an avro schema should be expected
createDisposition = CREATE_IF_NEEDED
)
}.waitUntilFinish()

runWithRealContext(options) { sc =>
val data = sc.bigQueryTable(avroTable, Format.GenericRecord).map(Record.fromAvro)
data should containInAnyOrder(records)
}
}

it should "write GenericRecord records with logical types" in {
val sc = ScioContext(options)
// format: off
val schema: Schema = SchemaBuilder
.record("Record")
.namespace("com.spotify.scio.bigquery")
.fields()
.name("date").`type`(LogicalTypes.date().addToSchema(Schema.create(Schema.Type.INT))).withDefault(0)
.name("time").`type`(LogicalTypes.timeMicros().addToSchema(Schema.create(Schema.Type.LONG))).withDefault(0L)
.name("datetime").`type`().stringType().stringDefault("")
.endRecord()
// format: on

implicit val coder = avroGenericRecordCoder(schema)
val ltRecords: Seq[GenericRecord] =
Seq(
new GenericRecordBuilder(schema)
.set("date", 10)
.set("time", 1000L)
.set("datetime", "2020-08-03 11:11:11")
.build()
)

val tableSchema =
BigQueryUtil.parseSchema("""
|{
| "fields": [
| {"mode": "REQUIRED", "name": "date", "type": "DATE"},
| {"mode": "REQUIRED", "name": "time", "type": "TIME"},
| {"mode": "REQUIRED", "name": "datetime", "type": "STRING"}
| ]
|}
""".stripMargin)
val tap = sc
.parallelize(ltRecords)
.saveAsBigQueryTable(avroLogicalTypeTable, tableSchema, createDisposition = CREATE_IF_NEEDED)

val result = sc.run().waitUntilDone()
result.tap(tap).value.toList.size shouldBe 1
}

}
Loading

0 comments on commit 92fad69

Please sign in to comment.