Skip to content

Commit

Permalink
Merge pull request #84 from nrkno/feature/unpartitioned-bqfill
Browse files Browse the repository at this point in the history
feat: add partition type ot BQFill and BqFilledTable
  • Loading branch information
ingarabr authored Jun 5, 2023
2 parents 9b4983d + a849819 commit 4e9f436
Show file tree
Hide file tree
Showing 7 changed files with 130 additions and 23 deletions.
2 changes: 1 addition & 1 deletion build.sbt
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import com.typesafe.tools.mima.core._

// https://typelevel.org/sbt-typelevel/faq.html#what-is-a-base-version-anyway
ThisBuild / tlBaseVersion := "0.5" // your current series x.y
ThisBuild / tlBaseVersion := "0.6" // your current series x.y

ThisBuild / organization := "no.nrk.bigquery"
ThisBuild / organizationName := "NRK"
Expand Down
24 changes: 12 additions & 12 deletions core/src/main/scala/no/nrk/bigquery/BQFill.scala
Original file line number Diff line number Diff line change
Expand Up @@ -6,21 +6,21 @@ import java.time.LocalDate
*/
trait JobKeyBQ

case class BQFill(
case class BQFill[+P](
jobKey: JobKeyBQ,
tableDef: BQTableDef.Table[LocalDate],
tableDef: BQTableDef.Table[P],
query: BQSqlFrag,
executionDate: LocalDate
) {
val destination: BQPartitionId[LocalDate] =
tableDef.assertPartition(executionDate)
partitionValue: P
)(implicit P: TableOps[P]) {
val destination: BQPartitionId[P] =
tableDef.assertPartition(partitionValue)

@deprecated("use partitionValue", "0.6.x")
def executionDate: P = partitionValue
}

case class BQFilledTable(
case class BQFilledTable[+P](
jobKey: JobKeyBQ,
tableDef: BQTableDef.Table[LocalDate],
tableDef: BQTableDef.Table[P],
query: LocalDate => BQSqlFrag
) {
def withDate(executionDate: LocalDate): BQFill =
BQFill(jobKey, tableDef, query(executionDate), executionDate)
}
)
14 changes: 8 additions & 6 deletions core/src/main/scala/no/nrk/bigquery/BQShow.scala
Original file line number Diff line number Diff line change
Expand Up @@ -72,9 +72,8 @@ trait BQShowInstances {

implicit def bqShowsBQPartitionIds[
I[t] <: Iterable[t],
Pid <: BQPartitionId[
Any
]]: BQShow[I[Pid]] =
Pid <: BQPartitionId[Any]
]: BQShow[I[Pid]] =
partitions => BQSqlFrag.Combined(partitions.map(bqShowsBQPartitionId[Pid].bqShow).toSeq)

implicit def bqShowTableLike[T <: BQTableLike[Unit]]: BQShow[T] =
Expand All @@ -83,13 +82,16 @@ trait BQShowInstances {
implicit def bqShowTablesLike[I[t] <: Iterable[t], T <: BQTableLike[Unit]]: BQShow[I[T]] =
tables => BQSqlFrag.Combined(tables.map(_.assertPartition.bqShow).toSeq)

implicit def bqShowFill: BQShow[BQFill] =
implicit def bqShowFill[Fill <: BQFill[Any]]: BQShow[Fill] =
BQSqlFrag.FillRef.apply

implicit def bqShowBQFilledTable: BQShow[BQFilledTable] =
implicit def bqShowBQFilledTable[Fill <: BQFilledTable[Any]]: BQShow[Fill] =
BQSqlFrag.FilledTableRef.apply

implicit def bqShowFills[I[t] <: Iterable[t]]: BQShow[I[BQFill]] =
implicit def bqShowFills[
I[t] <: Iterable[t],
Fill <: BQFill[Any]
]: BQShow[I[Fill]] =
fills => BQSqlFrag.Combined(fills.map(bqShowFill.bqShow).toSeq)

implicit def bqShowBQLimit[T <: BQLimit]: BQShow[T] = {
Expand Down
5 changes: 3 additions & 2 deletions core/src/main/scala/no/nrk/bigquery/BQSqlFrag.scala
Original file line number Diff line number Diff line change
Expand Up @@ -138,8 +138,9 @@ object BQSqlFrag {
}
case class Combined(values: Seq[BQSqlFrag]) extends BQSqlFrag
case class PartitionRef(ref: BQPartitionId[Any]) extends BQSqlFrag
case class FillRef(fill: BQFill) extends BQSqlFrag
case class FilledTableRef(filledTable: BQFilledTable) extends BQSqlFrag

case class FillRef(fill: BQFill[Any]) extends BQSqlFrag
case class FilledTableRef(filledTable: BQFilledTable[Any]) extends BQSqlFrag

val Empty: BQSqlFrag = Frag("")

Expand Down
12 changes: 11 additions & 1 deletion core/src/main/scala/no/nrk/bigquery/internal/BQShowSyntax.scala
Original file line number Diff line number Diff line change
@@ -1,14 +1,18 @@
package no.nrk.bigquery.internal

import no.nrk.bigquery.{BQShow, BQSqlFrag}
import no.nrk.bigquery.{BQFill, BQFilledTable, BQShow, BQSqlFrag}
import cats.Foldable
import cats.syntax.all._

import java.time.LocalDate

trait BQShowSyntax {

implicit def bqShowInterpolator(sc: StringContext): BQShow.BQShowInterpolator = new BQShow.BQShowInterpolator(sc)
implicit def bqShowOps[A](a: A): BQShowOps[A] = new BQShowOps[A](a)
implicit def bqFragmentsOps[S[_]: Foldable, A](values: S[A]): FragmentsOps[S, A] = new FragmentsOps(values)
implicit def bqFilledTableLocalDateOps(fill: BQFilledTable[LocalDate]): BQFilledTableLocalDateOps =
new BQFilledTableLocalDateOps(fill)

}

Expand Down Expand Up @@ -45,3 +49,9 @@ class FragmentsOps[S[_]: Foldable, A](private val values: S[A]) {
BQSqlFrag.Combined(buf.result())
}
}

class BQFilledTableLocalDateOps(fill: BQFilledTable[LocalDate]) {

def withDate(partitionValue: LocalDate): BQFill[LocalDate] =
BQFill[LocalDate](fill.jobKey, fill.tableDef, fill.query(partitionValue), partitionValue)
}
94 changes: 94 additions & 0 deletions core/src/test/scala/no/nrk/bigquery/BQFillTest.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,94 @@
package no.nrk.bigquery

import com.google.cloud.bigquery.{Field, StandardSQLTypeName}
import munit.FunSuite
import no.nrk.bigquery.syntax._

import java.time.LocalDate

class BQFillTest extends FunSuite {
case class JobKey(value: String) extends JobKeyBQ

private def tableId(name: String) = BQTableId(BQDataset(ProjectId("p1"), "d1", None), name)

private val c1 = BQField("c1", StandardSQLTypeName.DATE, Field.Mode.REQUIRED)
private val source1: BQTableDef.Table[LocalDate] =
BQTableDef.Table(tableId("src_1"), BQSchema(List(c1)), BQPartitionType.DatePartitioned(c1.ident))
private val source2: BQTableDef.Table[LocalDate] =
BQTableDef.Table(tableId("src_2"), BQSchema(List(c1)), BQPartitionType.DatePartitioned(c1.ident))

test("using LocalDate partition fills") {
val executionDate = LocalDate.now()
val dest1: BQTableDef.Table[LocalDate] =
BQTableDef.Table(tableId("dest_1"), BQSchema(List(c1)), BQPartitionType.DatePartitioned(c1.ident))
val dest2: BQTableDef.Table[LocalDate] =
BQTableDef.Table(tableId("dest_2"), BQSchema(List(c1)), BQPartitionType.DatePartitioned(c1.ident))

val fillQuery1 = bqfr"select c1 from ${source1.assertPartition(executionDate)}"
val bqFill: BQFill[LocalDate] = BQFill(JobKey("j1"), dest1, fillQuery1, executionDate)

val fillQuery2 = bqfr"select c1 from ${source2.assertPartition(executionDate)}"
val bqFilledTable: BQFilledTable[LocalDate] = BQFilledTable(JobKey("j2"), dest2, _ => fillQuery2)

val query = bqfr"select c1 from $bqFill t1 join $bqFilledTable t2 on t1.${c1.ident} = t2${c1.ident}"
val (fillTables: List[BQTableId], fillJobKeys: List[JobKeyBQ]) = extractTableIdAndJobKeys(query)

assertEquals(fillJobKeys, List(JobKey("j1"), JobKey("j2")))
assertEquals(fillTables, List(tableId("dest_1"), tableId("dest_2")))
}

test("using unpartition fills") {
val dest1: BQTableDef.Table[Unit] =
BQTableDef.Table(tableId("dest_1"), BQSchema(List(c1)), BQPartitionType.NotPartitioned)
val dest2: BQTableDef.Table[Unit] =
BQTableDef.Table(tableId("dest_2"), BQSchema(List(c1)), BQPartitionType.NotPartitioned)

val fillQuery1 = bqfr"select c1 from ${source1.unpartitioned}"
val bqFill: BQFill[Unit] = BQFill(JobKey("j1"), dest1, fillQuery1, ())

val fillQuery2 = bqfr"select c1 from ${source2.unpartitioned}"
val bqFilledTable: BQFilledTable[Unit] = BQFilledTable(JobKey("j2"), dest2, _ => fillQuery2)

val query = bqfr"select c1 from $bqFill t1 join $bqFilledTable t2 on t1.${c1.ident} = t2${c1.ident}"
val (fillTables: List[BQTableId], fillJobKeys: List[JobKeyBQ]) = extractTableIdAndJobKeys(query)

assertEquals(fillJobKeys, List(JobKey("j1"), JobKey("j2")))
assertEquals(fillTables, List(tableId("dest_1"), tableId("dest_2")))
}

test("using mixed partition types in fills") {
val executionDate = LocalDate.now()
val dest1: BQTableDef.Table[LocalDate] =
BQTableDef.Table(tableId("dest_1"), BQSchema(List(c1)), BQPartitionType.DatePartitioned(c1.ident))
val dest2: BQTableDef.Table[Unit] =
BQTableDef.Table(tableId("dest_2"), BQSchema(List(c1)), BQPartitionType.NotPartitioned)

val fillQuery1 = bqfr"select c1 from ${source1.assertPartition(executionDate)}"
val bqFill: BQFill[LocalDate] = BQFill(JobKey("j1"), dest1, fillQuery1, executionDate)

val fillQuery2 = bqfr"select c1 from ${source2.unpartitioned}"
val bqFilledTable: BQFilledTable[Unit] = BQFilledTable(JobKey("j2"), dest2, _ => fillQuery2)

val query = bqfr"select c1 from $bqFill t1 join $bqFilledTable t2 on t1.${c1.ident} = t2${c1.ident}"
val (fillTables: List[BQTableId], fillJobKeys: List[JobKeyBQ]) = extractTableIdAndJobKeys(query)

assertEquals(fillJobKeys, List(JobKey("j1"), JobKey("j2")))
assertEquals(fillTables, List(tableId("dest_1"), tableId("dest_2")))
}

private def extractTableIdAndJobKeys(fragment: BQSqlFrag): (List[BQTableId], List[JobKeyBQ]) = {
val fillTables = fragment
.collect {
case BQSqlFrag.FillRef(fill) => fill.tableDef.tableId
case BQSqlFrag.FilledTableRef(fill) => fill.tableDef.tableId
}
.sortBy(_.asString)
val fillJobKeys = fragment
.collect {
case BQSqlFrag.FillRef(fill) => fill.jobKey
case BQSqlFrag.FilledTableRef(fill) => fill.jobKey
}
.sortBy(_.toString)
(fillTables, fillJobKeys)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -162,7 +162,7 @@ abstract class BQSmokeTest(testClient: Resource[IO, BigQueryClient[IO]]) extends

protected def bqCheckFill(
testName: String
)(fill: BQFill)(implicit loc: Location): Unit =
)(fill: BQFill[Any])(implicit loc: Location): Unit =
test(s"bqCheck: $testName".tag(TestTags.Generated)) {
bqCheckFragment(
testName,
Expand Down

0 comments on commit 4e9f436

Please sign in to comment.