Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Update to BigQuery read API v1 interface #5431

Open
wants to merge 4 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ project/plugins/project/
.metals/
metals.sbt
.vscode/
*.iml

# scio
.bigquery/
Expand Down
15 changes: 14 additions & 1 deletion build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -436,6 +436,19 @@ ThisBuild / mimaBinaryIssueFilters ++= Seq(
// relax type hierarchy for batch stream
ProblemFilters.exclude[IncompatibleMethTypeProblem](
"com.spotify.scio.grpc.GrpcBatchDoFn.asyncLookup"
),
// BQ api v1 update
ProblemFilters.exclude[IncompatibleResultTypeProblem](
"com.spotify.scio.bigquery.BigQueryStorageTap.*"
),
ProblemFilters.exclude[IncompatibleMethTypeProblem](
"com.spotify.scio.bigquery.BigQueryStorageTap.*"
),
ProblemFilters.exclude[IncompatibleMethTypeProblem](
"com.spotify.scio.bigquery.BigQueryTaps.*"
),
ProblemFilters.exclude[IncompatibleResultTypeProblem](
"com.spotify.scio.bigquery.StorageUtil.tableReadOptions"
)
)

Expand Down Expand Up @@ -979,7 +992,7 @@ lazy val `scio-google-cloud-platform` = project
"com.google.api" % "gax-grpc" % gcpBom.key.value,
"com.google.api-client" % "google-api-client" % gcpBom.key.value,
"com.google.api.grpc" % "grpc-google-cloud-pubsub-v1" % gcpBom.key.value,
"com.google.api.grpc" % "proto-google-cloud-bigquerystorage-v1beta1" % gcpBom.key.value,
"com.google.api.grpc" % "proto-google-cloud-bigquerystorage-v1" % gcpBom.key.value,
"com.google.api.grpc" % "proto-google-cloud-bigtable-admin-v2" % gcpBom.key.value,
"com.google.api.grpc" % "proto-google-cloud-bigtable-v2" % gcpBom.key.value,
"com.google.api.grpc" % "proto-google-cloud-datastore-v1" % gcpBom.key.value,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@

package com.spotify.scio.bigquery.types

import com.google.cloud.bigquery.storage.v1beta1.ReadOptions.TableReadOptions
import com.google.cloud.bigquery.storage.v1.ReadSession.TableReadOptions
import com.google.protobuf.ByteString
import com.spotify.scio._
import com.spotify.scio.bigquery.BigQueryTaps._
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
package com.spotify.scio.bigquery

import com.google.api.services.bigquery.model.{TableFieldSchema, TableSchema}
import com.google.cloud.bigquery.storage.v1beta1.ReadOptions.TableReadOptions
import com.google.cloud.bigquery.storage.v1.ReadSession.TableReadOptions
import org.apache.avro.Schema
import org.apache.avro.Schema.Type

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ import com.google.api.services.bigquery.model._
import com.google.auth.Credentials
import com.google.auth.http.HttpCredentialsAdapter
import com.google.auth.oauth2.{GoogleCredentials, ImpersonatedCredentials}
import com.google.cloud.bigquery.storage.v1beta1.{BigQueryStorageClient, BigQueryStorageSettings}
import com.google.cloud.bigquery.storage.v1.{BigQueryReadClient, BigQueryReadSettings}
import com.google.cloud.hadoop.util.ChainingHttpRequestInitializer
import com.spotify.scio.bigquery.{Table => STable}
import com.spotify.scio.bigquery.client.BigQuery.Client
Expand Down Expand Up @@ -318,18 +318,18 @@ object BigQuery {
.build()
}

lazy val storage: BigQueryStorageClient = {
val settings = BigQueryStorageSettings
lazy val storage: BigQueryReadClient = {
val settings = BigQueryReadSettings
.newBuilder()
.setCredentialsProvider(FixedCredentialsProvider.create(credentials))
.setTransportChannelProvider(
BigQueryStorageSettings
BigQueryReadSettings
.defaultGrpcTransportProviderBuilder()
.setHeaderProvider(FixedHeaderProvider.create("user-agent", "scio"))
.build()
)
.build()
BigQueryStorageClient.create(settings)
BigQueryReadClient.create(settings)
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,11 @@ package com.spotify.scio.bigquery.client

import com.google.api.client.googleapis.json.GoogleJsonResponseException
import com.google.api.services.bigquery.model._
import com.google.cloud.bigquery.storage.v1beta1.ReadOptions.TableReadOptions
import com.google.cloud.bigquery.storage.v1beta1.Storage._
import com.google.cloud.bigquery.storage.v1beta1.TableReferenceProto
import com.google.cloud.bigquery.storage.v1.CreateReadSessionRequest
import com.google.cloud.bigquery.storage.v1.DataFormat
import com.google.cloud.bigquery.storage.v1.ReadRowsRequest
import com.google.cloud.bigquery.storage.v1.ReadSession
import com.google.cloud.bigquery.storage.v1.ReadSession.TableReadOptions
import com.google.cloud.hadoop.util.ApiErrorExtractor
import com.spotify.scio.bigquery.client.BigQuery.Client
import com.spotify.scio.bigquery.{BigQuerySysProps, StorageUtil, Table => STable, TableRow}
Expand Down Expand Up @@ -72,29 +74,27 @@ final private[client] class TableOps(client: Client) {
}

def storageAvroRows(table: STable, readOptions: TableReadOptions): Iterator[GenericRecord] = {
val tableRefProto = TableReferenceProto.TableReference
val tableProjectId = Option(table.ref.getProjectId).getOrElse(client.project)
val tableUrn =
s"projects/${tableProjectId}/datasets/${table.ref.getDatasetId}/tables/${table.ref.getTableId}"

val readSessionProto = ReadSession
.newBuilder()
.setDatasetId(table.ref.getDatasetId)
.setTableId(table.ref.getTableId)
.setProjectId(Option(table.ref.getProjectId).getOrElse(client.project))
.setTable(tableUrn)
.setReadOptions(readOptions)
.setDataFormat(DataFormat.AVRO)

val request = CreateReadSessionRequest
.newBuilder()
.setTableReference(tableRefProto)
.setReadOptions(readOptions)
.setParent(s"projects/${client.project}")
.setRequestedStreams(1)
.setFormat(DataFormat.AVRO)
.setReadSession(readSessionProto)
.setMaxStreamCount(1)
.build()

val session = client.storage.createReadSession(request)
val readRowsRequest = ReadRowsRequest
.newBuilder()
.setReadPosition(
StreamPosition
.newBuilder()
.setStream(session.getStreams(0))
)
.setReadStream(session.getStreams(0).getName)
.build()

val schema = new Schema.Parser().parse(session.getAvroSchema.getSchema)
Expand Down Expand Up @@ -137,18 +137,22 @@ final private[client] class TableOps(client: Client) {
Cache.SchemaCache
) {
val tableRef = bq.BigQueryHelpers.parseTableSpec(tableSpec)
val tableRefProto = TableReferenceProto.TableReference
val tableProjectId = Option(tableRef.getProjectId).getOrElse(client.project)
val tableUrn =
s"projects/${tableProjectId}/datasets/${tableRef.getDatasetId}/tables/${tableRef.getTableId}"

val readSessionProto = ReadSession
.newBuilder()
.setProjectId(Option(tableRef.getProjectId).getOrElse(client.project))
.setDatasetId(tableRef.getDatasetId)
.setTableId(tableRef.getTableId)
.setTable(tableUrn)
.setReadOptions(StorageUtil.tableReadOptions(selectedFields, rowRestriction))
.setDataFormat(DataFormat.AVRO)

val request = CreateReadSessionRequest
.newBuilder()
.setTableReference(tableRefProto)
.setReadOptions(StorageUtil.tableReadOptions(selectedFields, rowRestriction))
.setParent(s"projects/${client.project}")
.setReadSession(readSessionProto)
.build()

val session = client.storage.createReadSession(request)
new Schema.Parser().parse(session.getAvroSchema.getSchema)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@

package com.spotify.scio.bigquery

import com.google.cloud.bigquery.storage.v1beta1.ReadOptions.TableReadOptions
import com.google.cloud.bigquery.storage.v1.ReadSession.TableReadOptions
import com.google.api.services.bigquery.model.{TableReference, TableSchema}
import com.spotify.scio.ScioContext
import com.spotify.scio.avro._
Expand Down
Loading