Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Improve BQ typed support #5529

Open
wants to merge 9 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -478,6 +478,10 @@ ThisBuild / mimaBinaryIssueFilters ++= Seq(
),
ProblemFilters.exclude[DirectMissingMethodProblem](
"com.spotify.scio.tensorflow.syntax.SCollectionSyntax.tensorFlowPredictSCollectionOps"
),
// dropped custom BigQueryAvroUtilsWrapper
ProblemFilters.exclude[MissingClassProblem](
"org.apache.beam.sdk.io.gcp.bigquery.BigQueryAvroUtilsWrapper"
)
)

Expand Down
138 changes: 121 additions & 17 deletions integration/src/test/scala/com/spotify/scio/bigquery/BigQueryIOIT.scala
Original file line number Diff line number Diff line change
Expand Up @@ -17,47 +17,151 @@

package com.spotify.scio.bigquery

import com.spotify.scio.bigquery.BigQueryTypedTable.Format
import com.spotify.scio.coders.Coder
import com.spotify.scio.avro._
import org.apache.beam.sdk.options._
import com.spotify.scio.testing._
import com.spotify.scio.testing.util.ItUtils
import org.apache.avro.generic.GenericRecord

object BigQueryIOIT {
@BigQueryType.fromTable("bigquery-public-data:samples.shakespeare")
class ShakespeareFromTable

@BigQueryType.fromQuery(
"""
SELECT word, word_count FROM `bigquery-public-data.samples.shakespeare` LIMIT 10
"""
"""SELECT word, word_count
|FROM `bigquery-public-data.samples.shakespeare`
|WHERE corpus = 'kinglear'
|ORDER BY word_count DESC
|LIMIT 5""".stripMargin
)
class ShakespeareFromQuery

val tempLocation: String = ItUtils.gcpTempLocation("bigquery-it")
}

class BigQueryIOIT extends PipelineSpec {
import BigQueryIOIT._
import ItUtils.project

val tempLocation: String = ItUtils.gcpTempLocation("bigquery-it")
val options: PipelineOptions = PipelineOptionsFactory
.fromArgs(s"--project=$project", s"--tempLocation=$tempLocation")
.create()

"Select" should "read typed values from a SQL query" in
val kinglearTop5: Seq[(String, Long)] = Seq(
"the" -> 786L,
"I" -> 622L,
"and" -> 594L,
"of" -> 447L,
"to" -> 438L
)

def extractCorpus(r: TableRow): String =
r.get("corpus").asInstanceOf[String]

def extractCorpus(r: GenericRecord): String =
r.get("corpus").asInstanceOf[String]

def extractWordCount(r: TableRow): (String, Long) = {
val word = r.get("word").asInstanceOf[String]
val count = r.get("word_count").asInstanceOf[String].toLong
word -> count
}

def extractWordCount(r: GenericRecord): (String, Long) = {
val word = r.get("word").asInstanceOf[String]
val count = r.get("word_count").asInstanceOf[String].toLong
word -> count
}

"Select" should "read values from a SQL query" in {
runWithRealContext(options) { sc =>
val query = Query(ShakespeareFromQuery.queryRaw)
val scoll = sc.bigQuerySelect(query).map(extractWordCount)

scoll should containInAnyOrder(kinglearTop5)
}
}

it should "read storage values from a SQL query" in {
runWithRealContext(options) { sc =>
val scoll = sc.read(BigQueryTyped[ShakespeareFromQuery])
scoll should haveSize(10)
scoll should satisfy[ShakespeareFromQuery] {
_.forall(_.getClass == classOf[ShakespeareFromQuery])
}
val query = Query(ShakespeareFromQuery.queryRaw)
val scoll = sc
.bigQueryStorage(query)
.map(extractWordCount)

scoll should containInAnyOrder(kinglearTop5)
}
}

"TableRef" should "read typed values from table" in
it should "read typed values from a SQL query" in {
runWithRealContext(options) { sc =>
val scoll = sc.read(BigQueryTyped[ShakespeareFromTable])
scoll.take(10) should haveSize(10)
scoll should satisfy[ShakespeareFromTable] {
_.forall(_.getClass == classOf[ShakespeareFromTable])
}
val scoll = sc
.typedBigQuery[ShakespeareFromQuery]()
.flatMap { r =>
for {
w <- r.word
c <- r.word_count
} yield w -> c
}

scoll should containInAnyOrder(kinglearTop5)
}
}

"Table" should "read values from table" in {
runWithRealContext(options) { sc =>
val table = Table.Spec(ShakespeareFromTable.table)
val scoll = sc
.bigQueryTable(table)
.filter(r => extractCorpus(r) == "kinglear")
.map(extractWordCount)
.top(5)(Ordering.by(_._2))
.flatten

scoll should containInAnyOrder(kinglearTop5)
}
}

it should "read avro values from table" in {
runWithRealContext(options) { sc =>
// BQ limitation: We can't give an avro reader schema
implicit val coder: Coder[GenericRecord] = avroGenericRecordCoder
val table = Table.Spec(ShakespeareFromTable.table)
val scoll = sc
.bigQueryTable[GenericRecord](table, Format.GenericRecord)
.filter(r => extractCorpus(r) == "kinglear")
.map(extractWordCount)
.top(5)(Ordering.by(_._2))
.flatten

scoll should containInAnyOrder(kinglearTop5)
}
}

it should "read storage values from table" in {
runWithRealContext(options) { sc =>
val table = Table.Spec(ShakespeareFromTable.table)
val scoll = sc
.bigQueryStorage(table)
.filter(r => extractCorpus(r) == "kinglear")
.map(extractWordCount)
.top(5)(Ordering.by(_._2))
.flatten

scoll should containInAnyOrder(kinglearTop5)
}
}

it should "read typed values from table" in {
runWithRealContext(options) { sc =>
val scoll = sc
.typedBigQuery[ShakespeareFromTable]()
.collect { case r if r.corpus == "kinglear" => r.word -> r.word_count }
.top(5)(Ordering.by(_._2))
.flatten

scoll should containInAnyOrder(kinglearTop5)
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,11 @@ object TypedBigQueryIT {
timestamp: Instant,
date: LocalDate,
time: LocalTime,
datetime: LocalDateTime,
// BQ DATETIME is problematic with avro as BQ api uses different representations:
// - BQ export uses 'string(datetime)'
// - BQ load uses 'long(local-timestamp-micros)'
// BigQueryType avroSchema favors read with string type
// datetime: LocalDateTime,
geography: Geography,
json: Json,
bigNumeric: BigNumeric
Expand Down Expand Up @@ -116,8 +120,9 @@ object TypedBigQueryIT {
class TypedBigQueryIT extends PipelineSpec with BeforeAndAfterAll {
import TypedBigQueryIT._

private val bq = BigQuery.defaultInstance()

override protected def afterAll(): Unit = {
val bq = BigQuery.defaultInstance()
// best effort cleanup
Try(bq.tables.delete(typedTable.ref))
Try(bq.tables.delete(tableRowTable.ref))
Expand Down Expand Up @@ -153,9 +158,9 @@ class TypedBigQueryIT extends PipelineSpec with BeforeAndAfterAll {
}
}

// TODO fix if in beam 2.61
ignore should "handle records as avro format" in {
it should "handle records as avro format" in {
implicit val coder: Coder[GenericRecord] = avroGenericRecordCoder(Record.avroSchema)

runWithRealContext(options) { sc =>
sc.parallelize(records)
.map(Record.toAvro)
Expand All @@ -167,7 +172,8 @@ class TypedBigQueryIT extends PipelineSpec with BeforeAndAfterAll {
}.waitUntilFinish()

runWithRealContext(options) { sc =>
val data = sc.bigQueryTable(avroTable, Format.GenericRecord).map(Record.fromAvro)
val data =
sc.bigQueryTable(avroTable, Format.GenericRecordWithLogicalTypes).map(Record.fromAvro)
data should containInAnyOrder(records)
}
}
Expand Down

This file was deleted.

Loading
Loading