diff --git a/.gitignore b/.gitignore index 339dd15065..162b898b96 100644 --- a/.gitignore +++ b/.gitignore @@ -28,6 +28,7 @@ project/plugins/project/ .metals/ metals.sbt .vscode/ +*.iml # scio .bigquery/ diff --git a/build.sbt b/build.sbt index ec2e13db95..281ce71c6c 100644 --- a/build.sbt +++ b/build.sbt @@ -436,6 +436,19 @@ ThisBuild / mimaBinaryIssueFilters ++= Seq( // relax type hierarchy for batch stream ProblemFilters.exclude[IncompatibleMethTypeProblem]( "com.spotify.scio.grpc.GrpcBatchDoFn.asyncLookup" + ), + // BQ api v1 update + ProblemFilters.exclude[IncompatibleResultTypeProblem]( + "com.spotify.scio.bigquery.BigQueryStorageTap.*" + ), + ProblemFilters.exclude[IncompatibleMethTypeProblem]( + "com.spotify.scio.bigquery.BigQueryStorageTap.*" + ), + ProblemFilters.exclude[IncompatibleMethTypeProblem]( + "com.spotify.scio.bigquery.BigQueryTaps.*" + ), + ProblemFilters.exclude[IncompatibleResultTypeProblem]( + "com.spotify.scio.bigquery.StorageUtil.tableReadOptions" ) ) @@ -979,7 +992,7 @@ lazy val `scio-google-cloud-platform` = project "com.google.api" % "gax-grpc" % gcpBom.key.value, "com.google.api-client" % "google-api-client" % gcpBom.key.value, "com.google.api.grpc" % "grpc-google-cloud-pubsub-v1" % gcpBom.key.value, - "com.google.api.grpc" % "proto-google-cloud-bigquerystorage-v1beta1" % gcpBom.key.value, + "com.google.api.grpc" % "proto-google-cloud-bigquerystorage-v1" % gcpBom.key.value, "com.google.api.grpc" % "proto-google-cloud-bigtable-admin-v2" % gcpBom.key.value, "com.google.api.grpc" % "proto-google-cloud-bigtable-v2" % gcpBom.key.value, "com.google.api.grpc" % "proto-google-cloud-datastore-v1" % gcpBom.key.value, diff --git a/integration/src/test/scala/com/spotify/scio/bigquery/types/BigQueryStorageIT.scala b/integration/src/test/scala/com/spotify/scio/bigquery/types/BigQueryStorageIT.scala index 0f12f40135..42b2f514c0 100644 --- a/integration/src/test/scala/com/spotify/scio/bigquery/types/BigQueryStorageIT.scala +++ b/integration/src/test/scala/com/spotify/scio/bigquery/types/BigQueryStorageIT.scala @@ -17,7 +17,7 @@ package com.spotify.scio.bigquery.types -import com.google.cloud.bigquery.storage.v1beta1.ReadOptions.TableReadOptions +import com.google.cloud.bigquery.storage.v1.ReadSession.TableReadOptions import com.google.protobuf.ByteString import com.spotify.scio._ import com.spotify.scio.bigquery.BigQueryTaps._ diff --git a/scio-google-cloud-platform/src/main/scala/com/spotify/scio/bigquery/StorageUtil.scala b/scio-google-cloud-platform/src/main/scala/com/spotify/scio/bigquery/StorageUtil.scala index ff6f306177..1581f7159e 100644 --- a/scio-google-cloud-platform/src/main/scala/com/spotify/scio/bigquery/StorageUtil.scala +++ b/scio-google-cloud-platform/src/main/scala/com/spotify/scio/bigquery/StorageUtil.scala @@ -18,7 +18,7 @@ package com.spotify.scio.bigquery import com.google.api.services.bigquery.model.{TableFieldSchema, TableSchema} -import com.google.cloud.bigquery.storage.v1beta1.ReadOptions.TableReadOptions +import com.google.cloud.bigquery.storage.v1.ReadSession.TableReadOptions import org.apache.avro.Schema import org.apache.avro.Schema.Type diff --git a/scio-google-cloud-platform/src/main/scala/com/spotify/scio/bigquery/client/BigQuery.scala b/scio-google-cloud-platform/src/main/scala/com/spotify/scio/bigquery/client/BigQuery.scala index bebbadfba6..2c26de53de 100644 --- a/scio-google-cloud-platform/src/main/scala/com/spotify/scio/bigquery/client/BigQuery.scala +++ b/scio-google-cloud-platform/src/main/scala/com/spotify/scio/bigquery/client/BigQuery.scala @@ -31,7 +31,7 @@ import com.google.api.services.bigquery.model._ import com.google.auth.Credentials import com.google.auth.http.HttpCredentialsAdapter import com.google.auth.oauth2.{GoogleCredentials, ImpersonatedCredentials} -import com.google.cloud.bigquery.storage.v1beta1.{BigQueryStorageClient, BigQueryStorageSettings} +import com.google.cloud.bigquery.storage.v1.{BigQueryReadClient, BigQueryReadSettings} import com.google.cloud.hadoop.util.ChainingHttpRequestInitializer import com.spotify.scio.bigquery.{Table => STable} import com.spotify.scio.bigquery.client.BigQuery.Client @@ -318,18 +318,18 @@ object BigQuery { .build() } - lazy val storage: BigQueryStorageClient = { - val settings = BigQueryStorageSettings + lazy val storage: BigQueryReadClient = { + val settings = BigQueryReadSettings .newBuilder() .setCredentialsProvider(FixedCredentialsProvider.create(credentials)) .setTransportChannelProvider( - BigQueryStorageSettings + BigQueryReadSettings .defaultGrpcTransportProviderBuilder() .setHeaderProvider(FixedHeaderProvider.create("user-agent", "scio")) .build() ) .build() - BigQueryStorageClient.create(settings) + BigQueryReadClient.create(settings) } } } diff --git a/scio-google-cloud-platform/src/main/scala/com/spotify/scio/bigquery/client/TableOps.scala b/scio-google-cloud-platform/src/main/scala/com/spotify/scio/bigquery/client/TableOps.scala index 5470947a42..5ee8972d7a 100644 --- a/scio-google-cloud-platform/src/main/scala/com/spotify/scio/bigquery/client/TableOps.scala +++ b/scio-google-cloud-platform/src/main/scala/com/spotify/scio/bigquery/client/TableOps.scala @@ -19,9 +19,11 @@ package com.spotify.scio.bigquery.client import com.google.api.client.googleapis.json.GoogleJsonResponseException import com.google.api.services.bigquery.model._ -import com.google.cloud.bigquery.storage.v1beta1.ReadOptions.TableReadOptions -import com.google.cloud.bigquery.storage.v1beta1.Storage._ -import com.google.cloud.bigquery.storage.v1beta1.TableReferenceProto +import com.google.cloud.bigquery.storage.v1.CreateReadSessionRequest +import com.google.cloud.bigquery.storage.v1.DataFormat +import com.google.cloud.bigquery.storage.v1.ReadRowsRequest +import com.google.cloud.bigquery.storage.v1.ReadSession +import com.google.cloud.bigquery.storage.v1.ReadSession.TableReadOptions import com.google.cloud.hadoop.util.ApiErrorExtractor import com.spotify.scio.bigquery.client.BigQuery.Client import com.spotify.scio.bigquery.{BigQuerySysProps, StorageUtil, Table => STable, TableRow} @@ -72,29 +74,27 @@ final private[client] class TableOps(client: Client) { } def storageAvroRows(table: STable, readOptions: TableReadOptions): Iterator[GenericRecord] = { - val tableRefProto = TableReferenceProto.TableReference + val tableProjectId = Option(table.ref.getProjectId).getOrElse(client.project) + val tableUrn = + s"projects/${tableProjectId}/datasets/${table.ref.getDatasetId}/tables/${table.ref.getTableId}" + + val readSessionProto = ReadSession .newBuilder() - .setDatasetId(table.ref.getDatasetId) - .setTableId(table.ref.getTableId) - .setProjectId(Option(table.ref.getProjectId).getOrElse(client.project)) + .setTable(tableUrn) + .setReadOptions(readOptions) + .setDataFormat(DataFormat.AVRO) val request = CreateReadSessionRequest .newBuilder() - .setTableReference(tableRefProto) - .setReadOptions(readOptions) .setParent(s"projects/${client.project}") - .setRequestedStreams(1) - .setFormat(DataFormat.AVRO) + .setReadSession(readSessionProto) + .setMaxStreamCount(1) .build() val session = client.storage.createReadSession(request) val readRowsRequest = ReadRowsRequest .newBuilder() - .setReadPosition( - StreamPosition - .newBuilder() - .setStream(session.getStreams(0)) - ) + .setReadStream(session.getStreams(0).getName) .build() val schema = new Schema.Parser().parse(session.getAvroSchema.getSchema) @@ -137,18 +137,22 @@ final private[client] class TableOps(client: Client) { Cache.SchemaCache ) { val tableRef = bq.BigQueryHelpers.parseTableSpec(tableSpec) - val tableRefProto = TableReferenceProto.TableReference + val tableProjectId = Option(tableRef.getProjectId).getOrElse(client.project) + val tableUrn = + s"projects/${tableProjectId}/datasets/${tableRef.getDatasetId}/tables/${tableRef.getTableId}" + + val readSessionProto = ReadSession .newBuilder() - .setProjectId(Option(tableRef.getProjectId).getOrElse(client.project)) - .setDatasetId(tableRef.getDatasetId) - .setTableId(tableRef.getTableId) + .setTable(tableUrn) + .setReadOptions(StorageUtil.tableReadOptions(selectedFields, rowRestriction)) + .setDataFormat(DataFormat.AVRO) val request = CreateReadSessionRequest .newBuilder() - .setTableReference(tableRefProto) - .setReadOptions(StorageUtil.tableReadOptions(selectedFields, rowRestriction)) .setParent(s"projects/${client.project}") + .setReadSession(readSessionProto) .build() + val session = client.storage.createReadSession(request) new Schema.Parser().parse(session.getAvroSchema.getSchema) } diff --git a/scio-google-cloud-platform/src/main/scala/com/spotify/scio/bigquery/taps.scala b/scio-google-cloud-platform/src/main/scala/com/spotify/scio/bigquery/taps.scala index 8c83dd24dd..acdeb28868 100644 --- a/scio-google-cloud-platform/src/main/scala/com/spotify/scio/bigquery/taps.scala +++ b/scio-google-cloud-platform/src/main/scala/com/spotify/scio/bigquery/taps.scala @@ -17,7 +17,7 @@ package com.spotify.scio.bigquery -import com.google.cloud.bigquery.storage.v1beta1.ReadOptions.TableReadOptions +import com.google.cloud.bigquery.storage.v1.ReadSession.TableReadOptions import com.google.api.services.bigquery.model.{TableReference, TableSchema} import com.spotify.scio.ScioContext import com.spotify.scio.avro._