diff --git a/build.sbt b/build.sbt index d436d25c..50f13328 100644 --- a/build.sbt +++ b/build.sbt @@ -1,7 +1,7 @@ import com.typesafe.tools.mima.core._ // https://typelevel.org/sbt-typelevel/faq.html#what-is-a-base-version-anyway -ThisBuild / tlBaseVersion := "0.5" // your current series x.y +ThisBuild / tlBaseVersion := "0.6" // your current series x.y ThisBuild / organization := "no.nrk.bigquery" ThisBuild / organizationName := "NRK" diff --git a/core/src/main/scala/no/nrk/bigquery/BQFill.scala b/core/src/main/scala/no/nrk/bigquery/BQFill.scala index ff9da57b..f0d38ee1 100644 --- a/core/src/main/scala/no/nrk/bigquery/BQFill.scala +++ b/core/src/main/scala/no/nrk/bigquery/BQFill.scala @@ -6,21 +6,21 @@ import java.time.LocalDate */ trait JobKeyBQ -case class BQFill( +case class BQFill[+P]( jobKey: JobKeyBQ, - tableDef: BQTableDef.Table[LocalDate], + tableDef: BQTableDef.Table[P], query: BQSqlFrag, - executionDate: LocalDate -) { - val destination: BQPartitionId[LocalDate] = - tableDef.assertPartition(executionDate) + partitionValue: P +)(implicit P: TableOps[P]) { + val destination: BQPartitionId[P] = + tableDef.assertPartition(partitionValue) + + @deprecated("use partitionValue", "0.6.x") + def executionDate: P = partitionValue } -case class BQFilledTable( +case class BQFilledTable[+P]( jobKey: JobKeyBQ, - tableDef: BQTableDef.Table[LocalDate], + tableDef: BQTableDef.Table[P], query: LocalDate => BQSqlFrag -) { - def withDate(executionDate: LocalDate): BQFill = - BQFill(jobKey, tableDef, query(executionDate), executionDate) -} +) diff --git a/core/src/main/scala/no/nrk/bigquery/BQShow.scala b/core/src/main/scala/no/nrk/bigquery/BQShow.scala index bc7c836b..724cb2f1 100644 --- a/core/src/main/scala/no/nrk/bigquery/BQShow.scala +++ b/core/src/main/scala/no/nrk/bigquery/BQShow.scala @@ -72,9 +72,8 @@ trait BQShowInstances { implicit def bqShowsBQPartitionIds[ I[t] <: Iterable[t], - Pid <: BQPartitionId[ - Any - ]]: BQShow[I[Pid]] = + Pid <: BQPartitionId[Any] + ]: BQShow[I[Pid]] = partitions => BQSqlFrag.Combined(partitions.map(bqShowsBQPartitionId[Pid].bqShow).toSeq) implicit def bqShowTableLike[T <: BQTableLike[Unit]]: BQShow[T] = @@ -83,13 +82,16 @@ trait BQShowInstances { implicit def bqShowTablesLike[I[t] <: Iterable[t], T <: BQTableLike[Unit]]: BQShow[I[T]] = tables => BQSqlFrag.Combined(tables.map(_.assertPartition.bqShow).toSeq) - implicit def bqShowFill: BQShow[BQFill] = + implicit def bqShowFill[Fill <: BQFill[Any]]: BQShow[Fill] = BQSqlFrag.FillRef.apply - implicit def bqShowBQFilledTable: BQShow[BQFilledTable] = + implicit def bqShowBQFilledTable[Fill <: BQFilledTable[Any]]: BQShow[Fill] = BQSqlFrag.FilledTableRef.apply - implicit def bqShowFills[I[t] <: Iterable[t]]: BQShow[I[BQFill]] = + implicit def bqShowFills[ + I[t] <: Iterable[t], + Fill <: BQFill[Any] + ]: BQShow[I[Fill]] = fills => BQSqlFrag.Combined(fills.map(bqShowFill.bqShow).toSeq) implicit def bqShowBQLimit[T <: BQLimit]: BQShow[T] = { diff --git a/core/src/main/scala/no/nrk/bigquery/BQSqlFrag.scala b/core/src/main/scala/no/nrk/bigquery/BQSqlFrag.scala index 46778447..e42f7d0c 100644 --- a/core/src/main/scala/no/nrk/bigquery/BQSqlFrag.scala +++ b/core/src/main/scala/no/nrk/bigquery/BQSqlFrag.scala @@ -138,8 +138,9 @@ object BQSqlFrag { } case class Combined(values: Seq[BQSqlFrag]) extends BQSqlFrag case class PartitionRef(ref: BQPartitionId[Any]) extends BQSqlFrag - case class FillRef(fill: BQFill) extends BQSqlFrag - case class FilledTableRef(filledTable: BQFilledTable) extends BQSqlFrag + + case class FillRef(fill: BQFill[Any]) extends BQSqlFrag + case class FilledTableRef(filledTable: BQFilledTable[Any]) extends BQSqlFrag val Empty: BQSqlFrag = Frag("") diff --git a/core/src/main/scala/no/nrk/bigquery/internal/BQShowSyntax.scala b/core/src/main/scala/no/nrk/bigquery/internal/BQShowSyntax.scala index 7f5eb0f9..784fd69c 100644 --- a/core/src/main/scala/no/nrk/bigquery/internal/BQShowSyntax.scala +++ b/core/src/main/scala/no/nrk/bigquery/internal/BQShowSyntax.scala @@ -1,14 +1,18 @@ package no.nrk.bigquery.internal -import no.nrk.bigquery.{BQShow, BQSqlFrag} +import no.nrk.bigquery.{BQFill, BQFilledTable, BQShow, BQSqlFrag} import cats.Foldable import cats.syntax.all._ +import java.time.LocalDate + trait BQShowSyntax { implicit def bqShowInterpolator(sc: StringContext): BQShow.BQShowInterpolator = new BQShow.BQShowInterpolator(sc) implicit def bqShowOps[A](a: A): BQShowOps[A] = new BQShowOps[A](a) implicit def bqFragmentsOps[S[_]: Foldable, A](values: S[A]): FragmentsOps[S, A] = new FragmentsOps(values) + implicit def bqFilledTableLocalDateOps(fill: BQFilledTable[LocalDate]): BQFilledTableLocalDateOps = + new BQFilledTableLocalDateOps(fill) } @@ -45,3 +49,9 @@ class FragmentsOps[S[_]: Foldable, A](private val values: S[A]) { BQSqlFrag.Combined(buf.result()) } } + +class BQFilledTableLocalDateOps(fill: BQFilledTable[LocalDate]) { + + def withDate(partitionValue: LocalDate): BQFill[LocalDate] = + BQFill[LocalDate](fill.jobKey, fill.tableDef, fill.query(partitionValue), partitionValue) +} diff --git a/core/src/test/scala/no/nrk/bigquery/BQFillTest.scala b/core/src/test/scala/no/nrk/bigquery/BQFillTest.scala new file mode 100644 index 00000000..eb45f6ce --- /dev/null +++ b/core/src/test/scala/no/nrk/bigquery/BQFillTest.scala @@ -0,0 +1,94 @@ +package no.nrk.bigquery + +import com.google.cloud.bigquery.{Field, StandardSQLTypeName} +import munit.FunSuite +import no.nrk.bigquery.syntax._ + +import java.time.LocalDate + +class BQFillTest extends FunSuite { + case class JobKey(value: String) extends JobKeyBQ + + private def tableId(name: String) = BQTableId(BQDataset(ProjectId("p1"), "d1", None), name) + + private val c1 = BQField("c1", StandardSQLTypeName.DATE, Field.Mode.REQUIRED) + private val source1: BQTableDef.Table[LocalDate] = + BQTableDef.Table(tableId("src_1"), BQSchema(List(c1)), BQPartitionType.DatePartitioned(c1.ident)) + private val source2: BQTableDef.Table[LocalDate] = + BQTableDef.Table(tableId("src_2"), BQSchema(List(c1)), BQPartitionType.DatePartitioned(c1.ident)) + + test("using LocalDate partition fills") { + val executionDate = LocalDate.now() + val dest1: BQTableDef.Table[LocalDate] = + BQTableDef.Table(tableId("dest_1"), BQSchema(List(c1)), BQPartitionType.DatePartitioned(c1.ident)) + val dest2: BQTableDef.Table[LocalDate] = + BQTableDef.Table(tableId("dest_2"), BQSchema(List(c1)), BQPartitionType.DatePartitioned(c1.ident)) + + val fillQuery1 = bqfr"select c1 from ${source1.assertPartition(executionDate)}" + val bqFill: BQFill[LocalDate] = BQFill(JobKey("j1"), dest1, fillQuery1, executionDate) + + val fillQuery2 = bqfr"select c1 from ${source2.assertPartition(executionDate)}" + val bqFilledTable: BQFilledTable[LocalDate] = BQFilledTable(JobKey("j2"), dest2, _ => fillQuery2) + + val query = bqfr"select c1 from $bqFill t1 join $bqFilledTable t2 on t1.${c1.ident} = t2${c1.ident}" + val (fillTables: List[BQTableId], fillJobKeys: List[JobKeyBQ]) = extractTableIdAndJobKeys(query) + + assertEquals(fillJobKeys, List(JobKey("j1"), JobKey("j2"))) + assertEquals(fillTables, List(tableId("dest_1"), tableId("dest_2"))) + } + + test("using unpartition fills") { + val dest1: BQTableDef.Table[Unit] = + BQTableDef.Table(tableId("dest_1"), BQSchema(List(c1)), BQPartitionType.NotPartitioned) + val dest2: BQTableDef.Table[Unit] = + BQTableDef.Table(tableId("dest_2"), BQSchema(List(c1)), BQPartitionType.NotPartitioned) + + val fillQuery1 = bqfr"select c1 from ${source1.unpartitioned}" + val bqFill: BQFill[Unit] = BQFill(JobKey("j1"), dest1, fillQuery1, ()) + + val fillQuery2 = bqfr"select c1 from ${source2.unpartitioned}" + val bqFilledTable: BQFilledTable[Unit] = BQFilledTable(JobKey("j2"), dest2, _ => fillQuery2) + + val query = bqfr"select c1 from $bqFill t1 join $bqFilledTable t2 on t1.${c1.ident} = t2${c1.ident}" + val (fillTables: List[BQTableId], fillJobKeys: List[JobKeyBQ]) = extractTableIdAndJobKeys(query) + + assertEquals(fillJobKeys, List(JobKey("j1"), JobKey("j2"))) + assertEquals(fillTables, List(tableId("dest_1"), tableId("dest_2"))) + } + + test("using mixed partition types in fills") { + val executionDate = LocalDate.now() + val dest1: BQTableDef.Table[LocalDate] = + BQTableDef.Table(tableId("dest_1"), BQSchema(List(c1)), BQPartitionType.DatePartitioned(c1.ident)) + val dest2: BQTableDef.Table[Unit] = + BQTableDef.Table(tableId("dest_2"), BQSchema(List(c1)), BQPartitionType.NotPartitioned) + + val fillQuery1 = bqfr"select c1 from ${source1.assertPartition(executionDate)}" + val bqFill: BQFill[LocalDate] = BQFill(JobKey("j1"), dest1, fillQuery1, executionDate) + + val fillQuery2 = bqfr"select c1 from ${source2.unpartitioned}" + val bqFilledTable: BQFilledTable[Unit] = BQFilledTable(JobKey("j2"), dest2, _ => fillQuery2) + + val query = bqfr"select c1 from $bqFill t1 join $bqFilledTable t2 on t1.${c1.ident} = t2${c1.ident}" + val (fillTables: List[BQTableId], fillJobKeys: List[JobKeyBQ]) = extractTableIdAndJobKeys(query) + + assertEquals(fillJobKeys, List(JobKey("j1"), JobKey("j2"))) + assertEquals(fillTables, List(tableId("dest_1"), tableId("dest_2"))) + } + + private def extractTableIdAndJobKeys(fragment: BQSqlFrag): (List[BQTableId], List[JobKeyBQ]) = { + val fillTables = fragment + .collect { + case BQSqlFrag.FillRef(fill) => fill.tableDef.tableId + case BQSqlFrag.FilledTableRef(fill) => fill.tableDef.tableId + } + .sortBy(_.asString) + val fillJobKeys = fragment + .collect { + case BQSqlFrag.FillRef(fill) => fill.jobKey + case BQSqlFrag.FilledTableRef(fill) => fill.jobKey + } + .sortBy(_.toString) + (fillTables, fillJobKeys) + } +} diff --git a/testing/src/main/scala/no/nrk/bigquery/testing/BQSmokeTest.scala b/testing/src/main/scala/no/nrk/bigquery/testing/BQSmokeTest.scala index 11934a88..0def4884 100644 --- a/testing/src/main/scala/no/nrk/bigquery/testing/BQSmokeTest.scala +++ b/testing/src/main/scala/no/nrk/bigquery/testing/BQSmokeTest.scala @@ -162,7 +162,7 @@ abstract class BQSmokeTest(testClient: Resource[IO, BigQueryClient[IO]]) extends protected def bqCheckFill( testName: String - )(fill: BQFill)(implicit loc: Location): Unit = + )(fill: BQFill[Any])(implicit loc: Location): Unit = test(s"bqCheck: $testName".tag(TestTags.Generated)) { bqCheckFragment( testName,