Skip to content

Commit

Permalink
Add more test cases
Browse files Browse the repository at this point in the history
  • Loading branch information
RustedBones committed Nov 29, 2024
1 parent b4cf9ea commit 6da6ac2
Showing 1 changed file with 121 additions and 17 deletions.
138 changes: 121 additions & 17 deletions integration/src/test/scala/com/spotify/scio/bigquery/BigQueryIOIT.scala
Original file line number Diff line number Diff line change
Expand Up @@ -17,47 +17,151 @@

package com.spotify.scio.bigquery

import com.spotify.scio.bigquery.BigQueryTypedTable.Format
import com.spotify.scio.coders.Coder
import com.spotify.scio.avro._
import org.apache.beam.sdk.options._
import com.spotify.scio.testing._
import com.spotify.scio.testing.util.ItUtils
import org.apache.avro.generic.GenericRecord

object BigQueryIOIT {
@BigQueryType.fromTable("bigquery-public-data:samples.shakespeare")
class ShakespeareFromTable

@BigQueryType.fromQuery(
"""
SELECT word, word_count FROM `bigquery-public-data.samples.shakespeare` LIMIT 10
"""
"""SELECT word, word_count
|FROM `bigquery-public-data.samples.shakespeare`
|WHERE corpus = 'kinglear'
|ORDER BY word_count DESC
|LIMIT 5""".stripMargin
)
class ShakespeareFromQuery

val tempLocation: String = ItUtils.gcpTempLocation("bigquery-it")
}

class BigQueryIOIT extends PipelineSpec {
import BigQueryIOIT._
import ItUtils.project

val tempLocation: String = ItUtils.gcpTempLocation("bigquery-it")
val options: PipelineOptions = PipelineOptionsFactory
.fromArgs(s"--project=$project", s"--tempLocation=$tempLocation")
.create()

"Select" should "read typed values from a SQL query" in
val kinglearTop5: Seq[(String, Long)] = Seq(
"the" -> 786L,
"I" -> 622L,
"and" -> 594L,
"of" -> 447L,
"to" -> 438L
)

def extractCorpus(r: TableRow): String =
r.get("corpus").asInstanceOf[String]

def extractCorpus(r: GenericRecord): String =
r.get("corpus").asInstanceOf[String]

def extractWordCount(r: TableRow): (String, Long) = {
val word = r.get("word").asInstanceOf[String]
val count = r.get("word_count").asInstanceOf[String].toLong
word -> count
}

def extractWordCount(r: GenericRecord): (String, Long) = {
val word = r.get("word").asInstanceOf[String]
val count = r.get("word_count").asInstanceOf[String].toLong
word -> count
}

"Select" should "read values from a SQL query" in {
runWithRealContext(options) { sc =>
val query = Query(ShakespeareFromQuery.queryRaw)
val scoll = sc.bigQuerySelect(query).map(extractWordCount)

scoll should containInAnyOrder(kinglearTop5)
}
}

it should "read storage values from a SQL query" in {
runWithRealContext(options) { sc =>
val scoll = sc.read(BigQueryTyped[ShakespeareFromQuery])
scoll should haveSize(10)
scoll should satisfy[ShakespeareFromQuery] {
_.forall(_.getClass == classOf[ShakespeareFromQuery])
}
val query = Query(ShakespeareFromQuery.queryRaw)
val scoll = sc
.bigQueryStorage(query)
.map(extractWordCount)

scoll should containInAnyOrder(kinglearTop5)
}
}

"TableRef" should "read typed values from table" in
it should "read typed values from a SQL query" in {
runWithRealContext(options) { sc =>
val scoll = sc.read(BigQueryTyped[ShakespeareFromTable])
scoll.take(10) should haveSize(10)
scoll should satisfy[ShakespeareFromTable] {
_.forall(_.getClass == classOf[ShakespeareFromTable])
}
val scoll = sc
.typedBigQuery[ShakespeareFromQuery]()
.flatMap { r =>
for {
w <- r.word
c <- r.word_count
} yield w -> c
}

scoll should containInAnyOrder(kinglearTop5)
}
}

"Table" should "read values from table" in {
runWithRealContext(options) { sc =>
val table = Table.Spec(ShakespeareFromTable.table)
val scoll = sc
.bigQueryTable(table)
.filter(r => extractCorpus(r) == "kinglear")
.map(extractWordCount)
.top(5)(Ordering.by(_._2))
.flatten

scoll should containInAnyOrder(kinglearTop5)
}
}

it should "read avro values from table" in {
runWithRealContext(options) { sc =>
// BQ limitation: We can't give an avro reader schema
implicit val coder: Coder[GenericRecord] = avroGenericRecordCoder
val table = Table.Spec(ShakespeareFromTable.table)
val scoll = sc
.bigQueryTable[GenericRecord](table, Format.GenericRecord)
.filter(r => extractCorpus(r) == "kinglear")
.map(extractWordCount)
.top(5)(Ordering.by(_._2))
.flatten

scoll should containInAnyOrder(kinglearTop5)
}
}

it should "read storage values from table" in {
runWithRealContext(options) { sc =>
val table = Table.Spec(ShakespeareFromTable.table)
val scoll = sc
.bigQueryStorage(table)
.filter(r => extractCorpus(r) == "kinglear")
.map(extractWordCount)
.top(5)(Ordering.by(_._2))
.flatten

scoll should containInAnyOrder(kinglearTop5)
}
}

it should "read typed values from table" in {
runWithRealContext(options) { sc =>
val scoll = sc
.typedBigQuery[ShakespeareFromTable]()
.collect { case r if r.corpus == "kinglear" => r.word -> r.word_count }
.top(5)(Ordering.by(_._2))
.flatten

scoll should containInAnyOrder(kinglearTop5)
}
}
}

0 comments on commit 6da6ac2

Please sign in to comment.