From c9ef82282989e70a947f5a19e70ee9d3df0ff077 Mon Sep 17 00:00:00 2001 From: Erlend Hamnaberg Date: Fri, 17 Nov 2023 10:05:26 +0100 Subject: [PATCH] BQJobName is insufficient * Introduce BQJobId to be able to better control where a BQ job is run * Add an optional BQClientDefaults to make it possible to control this with defaults if not set on a per job basis. * This is a breaking change --- build.sbt | 2 +- .../no/nrk/bigquery/BQClientDefaults.scala | 9 ++ .../main/scala/no/nrk/bigquery/BQJobId.scala | 23 ++++ .../scala/no/nrk/bigquery/BQJobName.scala | 17 +-- .../no/nrk/bigquery/BigQueryClient.scala | 121 ++++++++---------- .../no/nrk/bigquery/PartitionLoader.scala | 16 +-- .../bigquery/internal/BQNameConversions.scala | 15 +++ .../main/scala/no/nrk/bigquery/syntax.scala | 4 +- .../no/nrk/bigquery/testing/BQSmokeTest.scala | 4 +- .../nrk/bigquery/testing/BQUdfSmokeTest.scala | 2 +- .../bigquery/testing/BigQueryTestClient.scala | 16 +-- .../scala/no/nrk/bigquery/RoundtripTest.scala | 2 +- 12 files changed, 126 insertions(+), 105 deletions(-) create mode 100644 core/src/main/scala/no/nrk/bigquery/BQClientDefaults.scala create mode 100644 core/src/main/scala/no/nrk/bigquery/BQJobId.scala create mode 100644 core/src/main/scala/no/nrk/bigquery/internal/BQNameConversions.scala diff --git a/build.sbt b/build.sbt index 5c3f259d..5a87b810 100644 --- a/build.sbt +++ b/build.sbt @@ -1,7 +1,7 @@ import com.typesafe.tools.mima.core._ // https://typelevel.org/sbt-typelevel/faq.html#what-is-a-base-version-anyway -ThisBuild / tlBaseVersion := "0.12" // your current series x.y +ThisBuild / tlBaseVersion := "0.13" // your current series x.y ThisBuild / organization := "no.nrk.bigquery" ThisBuild / organizationName := "NRK" diff --git a/core/src/main/scala/no/nrk/bigquery/BQClientDefaults.scala b/core/src/main/scala/no/nrk/bigquery/BQClientDefaults.scala new file mode 100644 index 00000000..f26232ac --- /dev/null +++ b/core/src/main/scala/no/nrk/bigquery/BQClientDefaults.scala @@ -0,0 +1,9 @@ +/* + * Copyright 2020 NRK + * + * SPDX-License-Identifier: MIT + */ + +package no.nrk.bigquery + +final case class BQClientDefaults(projectId: ProjectId, locationId: LocationId) diff --git a/core/src/main/scala/no/nrk/bigquery/BQJobId.scala b/core/src/main/scala/no/nrk/bigquery/BQJobId.scala new file mode 100644 index 00000000..a12fe2fe --- /dev/null +++ b/core/src/main/scala/no/nrk/bigquery/BQJobId.scala @@ -0,0 +1,23 @@ +/* + * Copyright 2020 NRK + * + * SPDX-License-Identifier: MIT + */ + +package no.nrk.bigquery + +final case class BQJobId(projectId: Option[ProjectId], locationId: Option[LocationId], name: BQJobName) { + def withProjectId(projectId: ProjectId) = copy(projectId = Some(projectId)) + def withLocationId(locationId: LocationId) = copy(locationId = Some(locationId)) + private[bigquery] def withDefaults(defaults: Option[BQClientDefaults]) = + if (projectId.isEmpty && locationId.isEmpty) { + copy(projectId = defaults.map(_.projectId), locationId = defaults.map(_.locationId)) + } else this + + def +(str: String): BQJobId = copy(name = name + str) +} + +object BQJobId { + def auto(implicit enclosing: sourcecode.Enclosing) = + BQJobId(None, None, BQJobName(enclosing.value)) +} diff --git a/core/src/main/scala/no/nrk/bigquery/BQJobName.scala b/core/src/main/scala/no/nrk/bigquery/BQJobName.scala index cad18a38..b1859368 100644 --- a/core/src/main/scala/no/nrk/bigquery/BQJobName.scala +++ b/core/src/main/scala/no/nrk/bigquery/BQJobName.scala @@ -6,25 +6,11 @@ package no.nrk.bigquery -import cats.effect.Sync -import com.google.cloud.bigquery.JobId - -import java.util.UUID - /** For getting an overview we tag all bigquery jobs (including queries) with a name so we can track the price and * duration of individual queries without manual inspection */ +//todo: consider merging with BQJobId case class BQJobName private (value: String) extends AnyVal { - def freshJobId[F[_]]( - locationId: Option[LocationId] - )(implicit F: Sync[F]): F[JobId] = - F.delay( - JobId - .newBuilder() - .setJob(s"$value-${UUID.randomUUID}") - .setLocation(locationId.map(_.value).orNull) - .build() - ) def +(str: String): BQJobName = BQJobName(value + str) } @@ -33,6 +19,7 @@ object BQJobName { /** use a macro to automatically name a job based on the name of the context in which `auto` is called. A typical name * is `no_nrk_recommendations_datahub_ecommerce_ECommerceETL_bqFetchRowsForDate` */ + @deprecated(message = "Use BQJobId.auto instead", since = "0.13.0") def auto(implicit enclosing: sourcecode.Enclosing): BQJobName = apply(enclosing.value) diff --git a/core/src/main/scala/no/nrk/bigquery/BigQueryClient.scala b/core/src/main/scala/no/nrk/bigquery/BigQueryClient.scala index 1ba549ea..66fb975b 100644 --- a/core/src/main/scala/no/nrk/bigquery/BigQueryClient.scala +++ b/core/src/main/scala/no/nrk/bigquery/BigQueryClient.scala @@ -43,7 +43,8 @@ import scala.jdk.CollectionConverters._ class BigQueryClient[F[_]]( bigQuery: BigQuery, val reader: BigQueryReadClient, - val metricOps: MetricsOps[F] + val metricOps: MetricsOps[F], + defaults: Option[BQClientDefaults] )(implicit F: Async[F], lf: LoggerFactory[F]) { private val logger = lf.getLogger private implicit def showJob[J <: JobInfo]: Show[J] = Show.show(Jsonify.job) @@ -51,33 +52,25 @@ class BigQueryClient[F[_]]( def underlying: BigQuery = bigQuery def synchronousQuery[A]( - jobName: BQJobName, + jobId: BQJobId, query: BQQuery[A] ): Stream[F, A] = - synchronousQuery(jobName, query, legacySql = false) + synchronousQuery(jobId, query, legacySql = false) + def synchronousQuery[A]( - jobName: BQJobName, + jobId: BQJobId, query: BQQuery[A], legacySql: Boolean ): Stream[F, A] = - synchronousQuery(jobName, query, legacySql, Nil) + synchronousQuery(jobId, query, legacySql, Nil) def synchronousQuery[A]( - jobName: BQJobName, + jobId: BQJobId, query: BQQuery[A], legacySql: Boolean, jobOptions: Seq[JobOption] ): Stream[F, A] = - synchronousQuery(jobName, query, legacySql, jobOptions, logStream = true) - - def synchronousQuery[A]( - jobName: BQJobName, - query: BQQuery[A], - legacySql: Boolean, - jobOptions: Seq[JobOption], - logStream: Boolean - ): Stream[F, A] = - synchronousQuery(jobName, query, legacySql, jobOptions, logStream, None) + synchronousQuery(jobId, query, legacySql, jobOptions, logStream = true) /** Synchronous query to BQ. * @@ -90,22 +83,20 @@ class BigQueryClient[F[_]]( * }}} */ def synchronousQuery[A]( - jobName: BQJobName, + jobId: BQJobId, query: BQQuery[A], legacySql: Boolean, jobOptions: Seq[JobOption], - logStream: Boolean, - locationId: Option[LocationId] + logStream: Boolean ): Stream[F, A] = Stream .resource( synchronousQueryExecute( - jobName, + jobId, query.sql, legacySql, jobOptions, - logStream, - locationId + logStream ) ) .flatMap { case (_, rowStream) => @@ -123,12 +114,11 @@ class BigQueryClient[F[_]]( } protected def synchronousQueryExecute( - jobName: BQJobName, + jobId: BQJobId, query: BQSqlFrag, legacySql: Boolean, jobOptions: Seq[JobOption], - logStream: Boolean, - locationId: Option[LocationId] + logStream: Boolean ): Resource[F, (avro.Schema, Stream[F, GenericRecord])] = { val runQuery: F[Job] = { @@ -136,7 +126,7 @@ class BigQueryClient[F[_]]( .newBuilder(query.asStringWithUDFs) .setUseLegacySql(legacySql) .build - submitJob(jobName, locationId)(jobId => + submitJob(jobId)(jobId => F.blocking( Option( bigQuery.create(JobInfo.of(jobId, queryRequest), jobOptions: _*) @@ -145,7 +135,7 @@ class BigQueryClient[F[_]]( case Some(job) => F.pure(job) case None => F.raiseError( - new Exception(s"Unexpected: got no job after submitting $jobName") + new Exception(s"Unexpected: got no job after submitting ${jobId.name}") ) } } @@ -229,13 +219,13 @@ class BigQueryClient[F[_]]( } def loadJson[A: Encoder, P: TableOps]( - jobName: BQJobName, + jobId: BQJobId, table: BQTableDef.Table[P], partition: P, stream: fs2.Stream[F, A], writeDisposition: WriteDisposition ): F[Option[LoadStatistics]] = loadJson( - jobName = jobName, + jobId = jobId, table = table, partition = partition, stream = stream, @@ -244,7 +234,7 @@ class BigQueryClient[F[_]]( ) def loadJson[A: Encoder, P: TableOps]( - jobName: BQJobName, + jobId: BQJobId, table: BQTableDef.Table[P], partition: P, stream: fs2.Stream[F, A], @@ -252,7 +242,7 @@ class BigQueryClient[F[_]]( logStream: Boolean ): F[Option[LoadStatistics]] = loadJson( - jobName = jobName, + jobId = jobId, table = table, partition = partition, stream = stream, @@ -265,7 +255,7 @@ class BigQueryClient[F[_]]( * None, if `chunkedStream` is empty */ def loadJson[A: Encoder, P: TableOps]( - jobName: BQJobName, + jobId: BQJobId, table: BQTableDef.Table[P], partition: P, stream: fs2.Stream[F, A], @@ -273,7 +263,7 @@ class BigQueryClient[F[_]]( logStream: Boolean, chunkSize: Int ): F[Option[LoadStatistics]] = - submitJob(jobName, table.tableId.dataset.location) { jobId => + submitJob(jobId) { jobId => val partitionId = table.assertPartition(partition) val formatOptions = FormatOptions.json() val schema = table.schema @@ -345,49 +335,38 @@ class BigQueryClient[F[_]]( tmpDataset: BQDataset): Resource[F, BQTableDef.Table[Param]] = Resource.make(createTempTable(table, tmpDataset))(tmp => delete(tmp.tableId).attempt.void) - def submitQuery[P](jobName: BQJobName, query: BQSqlFrag): F[Job] = - submitQuery(jobName, query, None) - - def submitQuery[P]( - jobName: BQJobName, - query: BQSqlFrag, - locationId: Option[LocationId] - ): F[Job] = submitQuery(jobName, query, locationId, None) + def submitQuery[P](jobId: BQJobId, query: BQSqlFrag): F[Job] = + submitQuery(jobId, query, None) def submitQuery[P]( - jobName: BQJobName, + id: BQJobId, query: BQSqlFrag, - locationId: Option[LocationId], destination: Option[BQPartitionId[P]] ): F[Job] = - submitQuery(jobName, query, locationId, destination, None) + submitQuery(id, query, destination, None) def submitQuery[P]( - jobName: BQJobName, + id: BQJobId, query: BQSqlFrag, - locationId: Option[LocationId], destination: Option[BQPartitionId[P]], writeDisposition: Option[WriteDisposition] ): F[Job] = submitQuery( - jobName, + id, query, - locationId, destination, writeDisposition, None ) def submitQuery[P]( - jobName: BQJobName, + id: BQJobId, query: BQSqlFrag, - locationId: Option[LocationId], destination: Option[BQPartitionId[P]], writeDisposition: Option[WriteDisposition], timePartitioning: Option[TimePartitioning] ): F[Job] = submitQuery( - jobName, + id, query, - locationId, destination, writeDisposition, timePartitioning, @@ -397,15 +376,14 @@ class BigQueryClient[F[_]]( /** Submit any SQL statement to BQ, perfect for BQ to BQ insertions or data mutation */ def submitQuery[P]( - jobName: BQJobName, + id: BQJobId, query: BQSqlFrag, - locationId: Option[LocationId], destination: Option[BQPartitionId[P]], writeDisposition: Option[WriteDisposition], timePartitioning: Option[TimePartitioning], jobOptions: Seq[JobOption] ): F[Job] = - submitJob(jobName, locationId) { jobId => + submitJob(id) { jobId => val jobConfiguration = { val b = QueryJobConfiguration.newBuilder(query.asStringWithUDFs) destination.foreach(partitionId => b.setDestinationTable(partitionId.asTableId.underlying)) @@ -423,13 +401,13 @@ class BigQueryClient[F[_]]( case Some(job) => F.pure(job) case None => F.raiseError( - new Exception(s"Unexpected: got no job after submitting $jobName") + new Exception(s"Unexpected: got no job after submitting ${id.name}") ) } /** Submit a job to BQ, wait for it to finish, log results, track as dependency */ - def submitJob(jobName: BQJobName, location: Option[LocationId])( + def submitJob(jobId: BQJobId)( runJob: JobId => F[Option[Job]] ): F[Option[Job]] = { val loggedJob: JobId => F[Option[Job]] = id => @@ -464,9 +442,8 @@ class BigQueryClient[F[_]]( F.pure(None) } - jobName - .freshJobId(location) - .flatMap(id => BQMetrics(metricOps, jobName)(loggedJob(id))) + freshJobId(jobId) + .flatMap(id => BQMetrics(metricOps, jobId.name)(loggedJob(id))) } def getTable( @@ -496,11 +473,10 @@ class BigQueryClient[F[_]]( } def dryRun( - jobName: BQJobName, - query: BQSqlFrag, - location: Option[LocationId] + id: BQJobId, + query: BQSqlFrag ): F[Job] = - jobName.freshJobId(location).flatMap { jobId => + freshJobId(id).flatMap { jobId => val jobInfo = JobInfo.of( jobId, QueryJobConfiguration @@ -573,6 +549,18 @@ class BigQueryClient[F[_]]( def delete(udfId: UDF.UDFId.PersistentId): F[Boolean] = F.blocking(bigQuery.delete(RoutineId.of(udfId.dataset.project.value, udfId.dataset.id, udfId.name.value))) + private def freshJobId(id: BQJobId): F[JobId] = { + val withDefaults = id.withDefaults(defaults) + + F.delay( + JobId + .newBuilder() + .setJob(s"${withDefaults.name.value}-${UUID.randomUUID}") + .setLocation(withDefaults.locationId.map(_.value).orNull) + .setProject(withDefaults.projectId.map(_.value).orNull) + .build() + ) + } } object BigQueryClient { @@ -637,12 +625,13 @@ object BigQueryClient { def resource[F[_]: Async: LoggerFactory]( credentials: Credentials, metricsOps: MetricsOps[F], - configure: Option[BigQueryOptions.Builder => BigQueryOptions.Builder] = None + configure: Option[BigQueryOptions.Builder => BigQueryOptions.Builder] = None, + clientDefaults: Option[BQClientDefaults] = None ): Resource[F, BigQueryClient[F]] = for { bq <- Resource.eval( BigQueryClient.fromCredentials(credentials, configure) ) bqRead <- BigQueryClient.readerResource(credentials) - } yield new BigQueryClient(bq, bqRead, metricsOps) + } yield new BigQueryClient(bq, bqRead, metricsOps, clientDefaults) } diff --git a/core/src/main/scala/no/nrk/bigquery/PartitionLoader.scala b/core/src/main/scala/no/nrk/bigquery/PartitionLoader.scala index 61236905..71639ace 100644 --- a/core/src/main/scala/no/nrk/bigquery/PartitionLoader.scala +++ b/core/src/main/scala/no/nrk/bigquery/PartitionLoader.scala @@ -79,7 +79,7 @@ private[bigquery] object PartitionLoader { if (requireRowNums) client .synchronousQuery( - BQJobName.auto, + BQJobId.auto, rowCountQuery(table, field, startDate) ) .compile @@ -92,7 +92,7 @@ private[bigquery] object PartitionLoader { case view: BQTableDef.View[LocalDate] => client .synchronousQuery( - BQJobName.auto, + BQJobId.auto, allPartitionsQueries .fromTableData[LocalDate](view.unpartitioned, field) ) @@ -100,7 +100,7 @@ private[bigquery] object PartitionLoader { case _ => client .synchronousQuery( - BQJobName.auto, + BQJobId.auto, allPartitionsQuery(table, startDate), legacySql = true ) @@ -171,7 +171,7 @@ private[bigquery] object PartitionLoader { if (requireRowNums) client .synchronousQuery( - BQJobName.auto, + BQJobId.auto, rowCountQuery(table, field, start) ) .compile @@ -185,12 +185,12 @@ private[bigquery] object PartitionLoader { val query = allPartitionsQueries .fromTableData[YearMonth](view.unpartitioned, field) client - .synchronousQuery(BQJobName.auto, query) + .synchronousQuery(BQJobId.auto, query) .map(partitionDate => (partitionDate, None, None)) case _ => client .synchronousQuery( - BQJobName.auto, + BQJobId.auto, allPartitionsQuery(table, start), legacySql = true ) @@ -261,7 +261,7 @@ private[bigquery] object PartitionLoader { ): F[Vector[(BQPartitionId.Sharded, PartitionMetadata)]] = client .synchronousQuery( - BQJobName.auto, + BQJobId.auto, allPartitionsQuery(startDate, table), legacySql = true ) @@ -315,7 +315,7 @@ private[bigquery] object PartitionLoader { ): F[Option[(BQPartitionId.NotPartitioned, PartitionMetadata)]] = client .synchronousQuery( - BQJobName.auto, + BQJobId.auto, partitionQuery(table.tableId), legacySql = true ) diff --git a/core/src/main/scala/no/nrk/bigquery/internal/BQNameConversions.scala b/core/src/main/scala/no/nrk/bigquery/internal/BQNameConversions.scala new file mode 100644 index 00000000..0c338ba5 --- /dev/null +++ b/core/src/main/scala/no/nrk/bigquery/internal/BQNameConversions.scala @@ -0,0 +1,15 @@ +/* + * Copyright 2020 NRK + * + * SPDX-License-Identifier: MIT + */ + +package no.nrk.bigquery.internal + +import no.nrk.bigquery.{BQJobId, BQJobName} + +trait BQNameConversions { + + implicit def bqNameToBQJobId(name: BQJobName): BQJobId = BQJobId(None, None, name) + +} diff --git a/core/src/main/scala/no/nrk/bigquery/syntax.scala b/core/src/main/scala/no/nrk/bigquery/syntax.scala index b4066986..a1608dbc 100644 --- a/core/src/main/scala/no/nrk/bigquery/syntax.scala +++ b/core/src/main/scala/no/nrk/bigquery/syntax.scala @@ -6,6 +6,6 @@ package no.nrk.bigquery -import no.nrk.bigquery.internal.{BQLiteralSyntax, BQShowSyntax} +import no.nrk.bigquery.internal.{BQLiteralSyntax, BQNameConversions, BQShowSyntax} -object syntax extends BQLiteralSyntax with BQShowSyntax +object syntax extends BQLiteralSyntax with BQShowSyntax with BQNameConversions diff --git a/testing/src/main/scala/no/nrk/bigquery/testing/BQSmokeTest.scala b/testing/src/main/scala/no/nrk/bigquery/testing/BQSmokeTest.scala index 4b306bab..b42039b6 100644 --- a/testing/src/main/scala/no/nrk/bigquery/testing/BQSmokeTest.scala +++ b/testing/src/main/scala/no/nrk/bigquery/testing/BQSmokeTest.scala @@ -259,7 +259,7 @@ object BQSmokeTest { s"Running $testName against BQ (could have been cached)" ) val run = bqClient - .dryRun(BQJobName("smoketest"), staticFrag, None) + .dryRun(BQJobId(None, None, BQJobName("smoketest")), staticFrag) .map(job => SchemaHelper.fromSchema( job.getStatistics[QueryStatistics]().getSchema @@ -281,7 +281,7 @@ object BQSmokeTest { val log = logger.warn(s"Running $testName becase $notStaticBecause") val runCheck = bqClient - .dryRun(BQJobName("smoketest"), frag, None) + .dryRun(BQJobId(None, None, BQJobName("smoketest")), frag) .guaranteeCase { case Outcome.Errored(_) if checkType != CheckType.Failing => IO(println(s"failed query: ${frag.asStringWithUDFs}")) diff --git a/testing/src/main/scala/no/nrk/bigquery/testing/BQUdfSmokeTest.scala b/testing/src/main/scala/no/nrk/bigquery/testing/BQUdfSmokeTest.scala index 314d8dd0..544c6048 100644 --- a/testing/src/main/scala/no/nrk/bigquery/testing/BQUdfSmokeTest.scala +++ b/testing/src/main/scala/no/nrk/bigquery/testing/BQUdfSmokeTest.scala @@ -79,7 +79,7 @@ object BQUdfSmokeTest { ) val run = bqClient - .synchronousQuery(BQJobName("smoketest"), BQQuery[Json](query)) + .synchronousQuery(BQJobId(None, None, BQJobName("smoketest")), BQQuery[Json](query)) .compile .lastOrError .guaranteeCase { diff --git a/testing/src/main/scala/no/nrk/bigquery/testing/BigQueryTestClient.scala b/testing/src/main/scala/no/nrk/bigquery/testing/BigQueryTestClient.scala index a307b8a4..144bd932 100644 --- a/testing/src/main/scala/no/nrk/bigquery/testing/BigQueryTestClient.scala +++ b/testing/src/main/scala/no/nrk/bigquery/testing/BigQueryTestClient.scala @@ -65,32 +65,30 @@ object BigQueryTestClient { cacheFrom: Resource[IO, BigQueryClient[IO]] ): Resource[IO, BigQueryClient[IO]] = cacheFrom.map(client => - new BigQueryClient(client.underlying, client.reader, client.metricOps) { + new BigQueryClient(client.underlying, client.reader, client.metricOps, None) { override protected def synchronousQueryExecute( - jobName: BQJobName, + jobId: BQJobId, query: BQSqlFrag, legacySql: Boolean, jobOptions: Seq[JobOption], - logStream: Boolean, - location: Option[LocationId] + logStream: Boolean ): Resource[IO, (avro.Schema, Stream[IO, GenericRecord])] = { val hash = java.util.Objects.hash(query, Boolean.box(legacySql), jobOptions) val hashedSchemaPath = - queryCachePath.resolve(s"${jobName.value}__$hash.json") + queryCachePath.resolve(s"${jobId.name.value}__$hash.json") val hashedRowsPath = - queryCachePath.resolve(s"${jobName.value}__$hash.avro") + queryCachePath.resolve(s"${jobId.name.value}__$hash.avro") def runAndStore: Resource[IO, (avro.Schema, Stream[IO, GenericRecord])] = for { tuple <- super .synchronousQueryExecute( - jobName, + jobId, query, legacySql, jobOptions, - logStream, - location + logStream ) (schema, rowStream) = tuple _ <- Resource.liftK(serializeSchema(hashedSchemaPath, schema)) diff --git a/testing/src/test/scala/no/nrk/bigquery/RoundtripTest.scala b/testing/src/test/scala/no/nrk/bigquery/RoundtripTest.scala index a56689e6..8e0fe22b 100644 --- a/testing/src/test/scala/no/nrk/bigquery/RoundtripTest.scala +++ b/testing/src/test/scala/no/nrk/bigquery/RoundtripTest.scala @@ -26,7 +26,7 @@ class RoundtripTest extends CatsEffectSuite { .cachingClient(BigQueryTestClient.testClient) .use( _.synchronousQuery( - BQJobName.auto, + BQJobId.auto, roundtripQuery(expectedValues.toList) ).compile.toVector )