Skip to content

Commit

Permalink
Merge pull request #220 from nrkno/jobname-deprecation
Browse files Browse the repository at this point in the history
BQJobName is insufficient
  • Loading branch information
hamnis authored Nov 20, 2023
2 parents df9a725 + c9ef822 commit 2867b19
Show file tree
Hide file tree
Showing 12 changed files with 126 additions and 105 deletions.
2 changes: 1 addition & 1 deletion build.sbt
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import com.typesafe.tools.mima.core._

// https://typelevel.org/sbt-typelevel/faq.html#what-is-a-base-version-anyway
ThisBuild / tlBaseVersion := "0.12" // your current series x.y
ThisBuild / tlBaseVersion := "0.13" // your current series x.y

ThisBuild / organization := "no.nrk.bigquery"
ThisBuild / organizationName := "NRK"
Expand Down
9 changes: 9 additions & 0 deletions core/src/main/scala/no/nrk/bigquery/BQClientDefaults.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
/*
* Copyright 2020 NRK
*
* SPDX-License-Identifier: MIT
*/

package no.nrk.bigquery

final case class BQClientDefaults(projectId: ProjectId, locationId: LocationId)
23 changes: 23 additions & 0 deletions core/src/main/scala/no/nrk/bigquery/BQJobId.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
/*
* Copyright 2020 NRK
*
* SPDX-License-Identifier: MIT
*/

package no.nrk.bigquery

final case class BQJobId(projectId: Option[ProjectId], locationId: Option[LocationId], name: BQJobName) {
def withProjectId(projectId: ProjectId) = copy(projectId = Some(projectId))
def withLocationId(locationId: LocationId) = copy(locationId = Some(locationId))
private[bigquery] def withDefaults(defaults: Option[BQClientDefaults]) =
if (projectId.isEmpty && locationId.isEmpty) {
copy(projectId = defaults.map(_.projectId), locationId = defaults.map(_.locationId))
} else this

def +(str: String): BQJobId = copy(name = name + str)
}

object BQJobId {
def auto(implicit enclosing: sourcecode.Enclosing) =
BQJobId(None, None, BQJobName(enclosing.value))
}
17 changes: 2 additions & 15 deletions core/src/main/scala/no/nrk/bigquery/BQJobName.scala
Original file line number Diff line number Diff line change
Expand Up @@ -6,25 +6,11 @@

package no.nrk.bigquery

import cats.effect.Sync
import com.google.cloud.bigquery.JobId

import java.util.UUID

/** For getting an overview we tag all bigquery jobs (including queries) with a name so we can track the price and
* duration of individual queries without manual inspection
*/
//todo: consider merging with BQJobId
case class BQJobName private (value: String) extends AnyVal {
def freshJobId[F[_]](
locationId: Option[LocationId]
)(implicit F: Sync[F]): F[JobId] =
F.delay(
JobId
.newBuilder()
.setJob(s"$value-${UUID.randomUUID}")
.setLocation(locationId.map(_.value).orNull)
.build()
)
def +(str: String): BQJobName = BQJobName(value + str)
}

Expand All @@ -33,6 +19,7 @@ object BQJobName {
/** use a macro to automatically name a job based on the name of the context in which `auto` is called. A typical name
* is `no_nrk_recommendations_datahub_ecommerce_ECommerceETL_bqFetchRowsForDate`
*/
@deprecated(message = "Use BQJobId.auto instead", since = "0.13.0")
def auto(implicit enclosing: sourcecode.Enclosing): BQJobName =
apply(enclosing.value)

Expand Down
121 changes: 55 additions & 66 deletions core/src/main/scala/no/nrk/bigquery/BigQueryClient.scala
Original file line number Diff line number Diff line change
Expand Up @@ -43,41 +43,34 @@ import scala.jdk.CollectionConverters._
class BigQueryClient[F[_]](
bigQuery: BigQuery,
val reader: BigQueryReadClient,
val metricOps: MetricsOps[F]
val metricOps: MetricsOps[F],
defaults: Option[BQClientDefaults]
)(implicit F: Async[F], lf: LoggerFactory[F]) {
private val logger = lf.getLogger
private implicit def showJob[J <: JobInfo]: Show[J] = Show.show(Jsonify.job)

def underlying: BigQuery = bigQuery

def synchronousQuery[A](
jobName: BQJobName,
jobId: BQJobId,
query: BQQuery[A]
): Stream[F, A] =
synchronousQuery(jobName, query, legacySql = false)
synchronousQuery(jobId, query, legacySql = false)

def synchronousQuery[A](
jobName: BQJobName,
jobId: BQJobId,
query: BQQuery[A],
legacySql: Boolean
): Stream[F, A] =
synchronousQuery(jobName, query, legacySql, Nil)
synchronousQuery(jobId, query, legacySql, Nil)

def synchronousQuery[A](
jobName: BQJobName,
jobId: BQJobId,
query: BQQuery[A],
legacySql: Boolean,
jobOptions: Seq[JobOption]
): Stream[F, A] =
synchronousQuery(jobName, query, legacySql, jobOptions, logStream = true)

def synchronousQuery[A](
jobName: BQJobName,
query: BQQuery[A],
legacySql: Boolean,
jobOptions: Seq[JobOption],
logStream: Boolean
): Stream[F, A] =
synchronousQuery(jobName, query, legacySql, jobOptions, logStream, None)
synchronousQuery(jobId, query, legacySql, jobOptions, logStream = true)

/** Synchronous query to BQ.
*
Expand All @@ -90,22 +83,20 @@ class BigQueryClient[F[_]](
* }}}
*/
def synchronousQuery[A](
jobName: BQJobName,
jobId: BQJobId,
query: BQQuery[A],
legacySql: Boolean,
jobOptions: Seq[JobOption],
logStream: Boolean,
locationId: Option[LocationId]
logStream: Boolean
): Stream[F, A] =
Stream
.resource(
synchronousQueryExecute(
jobName,
jobId,
query.sql,
legacySql,
jobOptions,
logStream,
locationId
logStream
)
)
.flatMap { case (_, rowStream) =>
Expand All @@ -123,20 +114,19 @@ class BigQueryClient[F[_]](
}

protected def synchronousQueryExecute(
jobName: BQJobName,
jobId: BQJobId,
query: BQSqlFrag,
legacySql: Boolean,
jobOptions: Seq[JobOption],
logStream: Boolean,
locationId: Option[LocationId]
logStream: Boolean
): Resource[F, (avro.Schema, Stream[F, GenericRecord])] = {

val runQuery: F[Job] = {
val queryRequest = QueryJobConfiguration
.newBuilder(query.asStringWithUDFs)
.setUseLegacySql(legacySql)
.build
submitJob(jobName, locationId)(jobId =>
submitJob(jobId)(jobId =>
F.blocking(
Option(
bigQuery.create(JobInfo.of(jobId, queryRequest), jobOptions: _*)
Expand All @@ -145,7 +135,7 @@ class BigQueryClient[F[_]](
case Some(job) => F.pure(job)
case None =>
F.raiseError(
new Exception(s"Unexpected: got no job after submitting $jobName")
new Exception(s"Unexpected: got no job after submitting ${jobId.name}")
)
}
}
Expand Down Expand Up @@ -229,13 +219,13 @@ class BigQueryClient[F[_]](
}

def loadJson[A: Encoder, P: TableOps](
jobName: BQJobName,
jobId: BQJobId,
table: BQTableDef.Table[P],
partition: P,
stream: fs2.Stream[F, A],
writeDisposition: WriteDisposition
): F[Option[LoadStatistics]] = loadJson(
jobName = jobName,
jobId = jobId,
table = table,
partition = partition,
stream = stream,
Expand All @@ -244,15 +234,15 @@ class BigQueryClient[F[_]](
)

def loadJson[A: Encoder, P: TableOps](
jobName: BQJobName,
jobId: BQJobId,
table: BQTableDef.Table[P],
partition: P,
stream: fs2.Stream[F, A],
writeDisposition: WriteDisposition,
logStream: Boolean
): F[Option[LoadStatistics]] =
loadJson(
jobName = jobName,
jobId = jobId,
table = table,
partition = partition,
stream = stream,
Expand All @@ -265,15 +255,15 @@ class BigQueryClient[F[_]](
* None, if `chunkedStream` is empty
*/
def loadJson[A: Encoder, P: TableOps](
jobName: BQJobName,
jobId: BQJobId,
table: BQTableDef.Table[P],
partition: P,
stream: fs2.Stream[F, A],
writeDisposition: WriteDisposition,
logStream: Boolean,
chunkSize: Int
): F[Option[LoadStatistics]] =
submitJob(jobName, table.tableId.dataset.location) { jobId =>
submitJob(jobId) { jobId =>
val partitionId = table.assertPartition(partition)
val formatOptions = FormatOptions.json()
val schema = table.schema
Expand Down Expand Up @@ -345,49 +335,38 @@ class BigQueryClient[F[_]](
tmpDataset: BQDataset): Resource[F, BQTableDef.Table[Param]] =
Resource.make(createTempTable(table, tmpDataset))(tmp => delete(tmp.tableId).attempt.void)

def submitQuery[P](jobName: BQJobName, query: BQSqlFrag): F[Job] =
submitQuery(jobName, query, None)

def submitQuery[P](
jobName: BQJobName,
query: BQSqlFrag,
locationId: Option[LocationId]
): F[Job] = submitQuery(jobName, query, locationId, None)
def submitQuery[P](jobId: BQJobId, query: BQSqlFrag): F[Job] =
submitQuery(jobId, query, None)

def submitQuery[P](
jobName: BQJobName,
id: BQJobId,
query: BQSqlFrag,
locationId: Option[LocationId],
destination: Option[BQPartitionId[P]]
): F[Job] =
submitQuery(jobName, query, locationId, destination, None)
submitQuery(id, query, destination, None)

def submitQuery[P](
jobName: BQJobName,
id: BQJobId,
query: BQSqlFrag,
locationId: Option[LocationId],
destination: Option[BQPartitionId[P]],
writeDisposition: Option[WriteDisposition]
): F[Job] = submitQuery(
jobName,
id,
query,
locationId,
destination,
writeDisposition,
None
)

def submitQuery[P](
jobName: BQJobName,
id: BQJobId,
query: BQSqlFrag,
locationId: Option[LocationId],
destination: Option[BQPartitionId[P]],
writeDisposition: Option[WriteDisposition],
timePartitioning: Option[TimePartitioning]
): F[Job] = submitQuery(
jobName,
id,
query,
locationId,
destination,
writeDisposition,
timePartitioning,
Expand All @@ -397,15 +376,14 @@ class BigQueryClient[F[_]](
/** Submit any SQL statement to BQ, perfect for BQ to BQ insertions or data mutation
*/
def submitQuery[P](
jobName: BQJobName,
id: BQJobId,
query: BQSqlFrag,
locationId: Option[LocationId],
destination: Option[BQPartitionId[P]],
writeDisposition: Option[WriteDisposition],
timePartitioning: Option[TimePartitioning],
jobOptions: Seq[JobOption]
): F[Job] =
submitJob(jobName, locationId) { jobId =>
submitJob(id) { jobId =>
val jobConfiguration = {
val b = QueryJobConfiguration.newBuilder(query.asStringWithUDFs)
destination.foreach(partitionId => b.setDestinationTable(partitionId.asTableId.underlying))
Expand All @@ -423,13 +401,13 @@ class BigQueryClient[F[_]](
case Some(job) => F.pure(job)
case None =>
F.raiseError(
new Exception(s"Unexpected: got no job after submitting $jobName")
new Exception(s"Unexpected: got no job after submitting ${id.name}")
)
}

/** Submit a job to BQ, wait for it to finish, log results, track as dependency
*/
def submitJob(jobName: BQJobName, location: Option[LocationId])(
def submitJob(jobId: BQJobId)(
runJob: JobId => F[Option[Job]]
): F[Option[Job]] = {
val loggedJob: JobId => F[Option[Job]] = id =>
Expand Down Expand Up @@ -464,9 +442,8 @@ class BigQueryClient[F[_]](
F.pure(None)
}

jobName
.freshJobId(location)
.flatMap(id => BQMetrics(metricOps, jobName)(loggedJob(id)))
freshJobId(jobId)
.flatMap(id => BQMetrics(metricOps, jobId.name)(loggedJob(id)))
}

def getTable(
Expand Down Expand Up @@ -496,11 +473,10 @@ class BigQueryClient[F[_]](
}

def dryRun(
jobName: BQJobName,
query: BQSqlFrag,
location: Option[LocationId]
id: BQJobId,
query: BQSqlFrag
): F[Job] =
jobName.freshJobId(location).flatMap { jobId =>
freshJobId(id).flatMap { jobId =>
val jobInfo = JobInfo.of(
jobId,
QueryJobConfiguration
Expand Down Expand Up @@ -573,6 +549,18 @@ class BigQueryClient[F[_]](
def delete(udfId: UDF.UDFId.PersistentId): F[Boolean] =
F.blocking(bigQuery.delete(RoutineId.of(udfId.dataset.project.value, udfId.dataset.id, udfId.name.value)))

private def freshJobId(id: BQJobId): F[JobId] = {
val withDefaults = id.withDefaults(defaults)

F.delay(
JobId
.newBuilder()
.setJob(s"${withDefaults.name.value}-${UUID.randomUUID}")
.setLocation(withDefaults.locationId.map(_.value).orNull)
.setProject(withDefaults.projectId.map(_.value).orNull)
.build()
)
}
}

object BigQueryClient {
Expand Down Expand Up @@ -637,12 +625,13 @@ object BigQueryClient {
def resource[F[_]: Async: LoggerFactory](
credentials: Credentials,
metricsOps: MetricsOps[F],
configure: Option[BigQueryOptions.Builder => BigQueryOptions.Builder] = None
configure: Option[BigQueryOptions.Builder => BigQueryOptions.Builder] = None,
clientDefaults: Option[BQClientDefaults] = None
): Resource[F, BigQueryClient[F]] =
for {
bq <- Resource.eval(
BigQueryClient.fromCredentials(credentials, configure)
)
bqRead <- BigQueryClient.readerResource(credentials)
} yield new BigQueryClient(bq, bqRead, metricsOps)
} yield new BigQueryClient(bq, bqRead, metricsOps, clientDefaults)
}
Loading

0 comments on commit 2867b19

Please sign in to comment.