Skip to content

Commit

Permalink
MergeQuery should try to prune target table as much as possible
Browse files Browse the repository at this point in the history
  • Loading branch information
HenningKoller committed Nov 27, 2023
1 parent f5b2c38 commit 75b1bf8
Showing 1 changed file with 71 additions and 60 deletions.
131 changes: 71 additions & 60 deletions core/src/main/scala/no/nrk/bigquery/mergeQuery.scala
Original file line number Diff line number Diff line change
Expand Up @@ -6,79 +6,90 @@

package no.nrk.bigquery

import cats.data.NonEmptyList
import no.nrk.bigquery.syntax.*
import no.nrk.bigquery.syntax._

import scala.annotation.tailrec

object mergeQuery {
def into[Pid <: BQPartitionId[Any]](
source: Pid,
target: Pid,
def into[Pid <: BQPartitionId[Any]](source: Pid, target: Pid, primaryKey: Ident, morePrimaryKeys: Ident*): BQSqlFrag =
(source.wholeTable, target.wholeTable) match {
case (from: BQTableDef[Any], to: BQTableDef[Any]) =>
into(from, to, primaryKey, morePrimaryKeys *)
case (from, into) =>
sys.error(s"Cannot merge $from into $into")
}

def into[P](
source: BQTableDef[P],
target: BQTableDef[P],
primaryKey: Ident,
morePrimaryKeys: Ident*
): BQSqlFrag =
intoTargets(source, NonEmptyList.one(target), primaryKey, morePrimaryKeys)
// compare schema without comments, since those may be dynamic and it doesnt matter anyway
if (BQType.fromBQSchema(source.schema) != BQType.fromBQSchema(target.schema)) {
sys.error(s"Cannot merge $source into $target")
} else {
val partitionField = target.partitionType match {
case BQPartitionType.DatePartitioned(field) => Some(field)
case BQPartitionType.RangePartitioned(field, _) => Some(field)
case _ => None
}

def intoTargets[Pid <: BQPartitionId[Any]](
source: Pid,
targets: NonEmptyList[Pid],
primaryKey: Ident,
morePrimaryKeys: Seq[Ident]
): BQSqlFrag = {
val targetTable = targets.head.wholeTable
val primaryKeys: Seq[Ident] = {
val partitionField: Option[Ident] =
targetTable.partitionType match {
case BQPartitionType.DatePartitioned(field) => Some(field)
case _ => None
}
val primaryKeys: Seq[Ident] =
List(
List(primaryKey),
morePrimaryKeys.toList,
partitionField.toList
).flatten.distinct

List(
List(primaryKey),
morePrimaryKeys.toList,
partitionField.toList
).flatten.distinct
}
val allFields: List[BQField] =
source.schema.fields

val allFields: List[BQField] =
(source.wholeTable, targetTable) match {
// compare schema without comments, since those may be dynamic and it doesnt matter anyway
case (from: BQTableDef[Any], into: BQTableDef[Any])
if BQType
.fromBQSchema(from.schema) == BQType.fromBQSchema(into.schema) =>
from.schema.fields
case (from, into) =>
sys.error(s"Cannot merge $from into $into")
}
val allFieldNames: List[Ident] =
allFields.map(_.ident)

val allFieldNames: List[Ident] =
allFields.map(_.ident)
val isPrimaryKey = primaryKeys.toSet
val (declareStruct, prunePartitions) = partitionPruningFrags(source)

val isPrimaryKey = primaryKeys.toSet

// note: we need to specify whole table for target table. partition info will be inferred from source
bqsql"""
|MERGE ${targetTable.unpartitioned} AS T
|USING $source AS S
|ON ${primaryKeys.toList
.map(keyEqualsFragment(allFields))
.mkFragment("\n AND ")}
|AND ${targets.map(p => p.partitionQuery(Option("T."))).mkFragment("(", "OR", ")")}
|WHEN MATCHED THEN UPDATE SET
|${allFieldNames
.filterNot(isPrimaryKey)
.map(nonKey => bqfr" T.$nonKey = S.$nonKey")
.mkFragment(",\n")}
|WHEN NOT MATCHED THEN
| INSERT (
|${allFieldNames.map(field => bqfr" $field").mkFragment(",\n")}
| )
| VALUES (
|${allFieldNames.map(field => bqfr" S.$field").mkFragment(",\n")}
| )
bqsql"""
|$declareStruct
|
|MERGE ${target.wholeTable} AS T
|USING ${source.wholeTable} AS S
|ON ${primaryKeys.toList
.map(keyEqualsFragment(allFields))
.mkFragment("\n AND ")}
|AND $prunePartitions
|WHEN MATCHED THEN UPDATE SET
|${allFieldNames
.filterNot(isPrimaryKey)
.map(nonKey => bqfr" T.$nonKey = S.$nonKey")
.mkFragment(",\n")}
|WHEN NOT MATCHED THEN
| INSERT (
|${allFieldNames.map(field => bqfr" $field").mkFragment(",\n")}
| )
| VALUES (
|${allFieldNames.map(field => bqfr" S.$field").mkFragment(",\n")}
| )
""".stripMargin
}
}

def partitionPruningFrags[P](source: BQTableDef[P]): (BQSqlFrag, BQSqlFrag) =
(source.partitionType match {
case BQPartitionType.DatePartitioned(field) => Some((field, BQType.DATE))
case BQPartitionType.MonthPartitioned(field) => Some((field, BQType.DATE))
case BQPartitionType.RangePartitioned(field, _) => Some((field, BQType.INT64))
case _ => None
}).fold((BQSqlFrag.Empty, BQSqlFrag.Empty)) { case (field, fieldType) =>
(
bqsql"""
|DECLARE partitions STRUCT<minP $fieldType, maxP $fieldType>;
|SET partitions = (SELECT STRUCT(MIN($field) AS minP , MAX($field) AS maxP) FROM ${source.wholeTable});
""".stripMargin,
bqsql"T.$field BETWEEN partitions.minP AND partitions.maxP"
)
}

def keyEqualsFragment(
allFields: List[BQField]
Expand Down

0 comments on commit 75b1bf8

Please sign in to comment.