Skip to content

Commit

Permalink
New BQSqlFrag.TableRef.
Browse files Browse the repository at this point in the history
Use view partitionId to assert partition on underlying TableRef, if view is date partitioned.
  • Loading branch information
HenningKoller committed Jul 12, 2023
1 parent 76bd5af commit e8f3946
Show file tree
Hide file tree
Showing 2 changed files with 31 additions and 8 deletions.
36 changes: 28 additions & 8 deletions core/src/main/scala/no/nrk/bigquery/BQSqlFrag.scala
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import cats.syntax.all._
import no.nrk.bigquery.syntax._
import no.nrk.bigquery.BQSqlFrag.asSubQuery

import java.time.LocalDate
import scala.annotation.tailrec

/* The result of building a BigQuery sql. The `Frag` part of the name was chosen because it can be a fragment and not a complete query */
Expand All @@ -18,6 +19,7 @@ sealed trait BQSqlFrag {
case x @ BQSqlFrag.PartitionRef(_) => x
case x @ BQSqlFrag.FillRef(_) => x
case x @ BQSqlFrag.FilledTableRef(_) => x
case x @ BQSqlFrag.TableRef(_) => x
}

final def ++(other: BQSqlFrag): BQSqlFrag =
Expand Down Expand Up @@ -55,6 +57,8 @@ sealed trait BQSqlFrag {
.PartitionRef(fill.tableDef.unpartitioned.assertPartition)
.asString

case BQSqlFrag.TableRef(table) => table.tableId.asString

case BQSqlFrag.PartitionRef(partitionId) =>
partitionId match {
case x @ BQPartitionId.MonthPartitioned(_, _) => x.asSubQuery.asString
Expand All @@ -81,6 +85,7 @@ sealed trait BQSqlFrag {
case BQSqlFrag.PartitionRef(_) => Nil
case BQSqlFrag.FillRef(_) => Nil
case BQSqlFrag.FilledTableRef(_) => Nil
case BQSqlFrag.TableRef(_) => Nil
}

def extractInnerBody(frag: BQSqlFrag): Option[BQSqlFrag] =
Expand Down Expand Up @@ -115,21 +120,35 @@ sealed trait BQSqlFrag {
final def allReferencedAsPartitions: Seq[BQPartitionId[Any]] =
allReferencedAsPartitions(expandAndExcludeViews = true)
final def allReferencedAsPartitions(expandAndExcludeViews: Boolean): Seq[BQPartitionId[Any]] = {
def pf: PartialFunction[BQSqlFrag, List[BQPartitionId[Any]]] = {
case BQSqlFrag.PartitionRef(ref) =>
ref.wholeTable match {
case tableDef: BQTableDef.View[_] if expandAndExcludeViews => tableDef.query.collect(pf).flatten
case _ => List(ref)
def pf(outerRef: Option[BQPartitionId[Any]]): PartialFunction[BQSqlFrag, List[BQPartitionId[Any]]] = {
case BQSqlFrag.PartitionRef(partitionRef) =>
partitionRef.wholeTable match {
case tableDef: BQTableDef.View[_] if expandAndExcludeViews =>
tableDef.query.collect(pf(Some(partitionRef))).flatten
case _ => List(partitionRef)
}

case BQSqlFrag.TableRef(table) =>
outerRef match {
case Some(partitionRef: BQPartitionId.DatePartitioned) => List(table.assertPartition(partitionRef.partition))
case _ => List(table.unpartitioned.assertPartition)
}

case BQSqlFrag.FillRef(fill) => List(fill.destination)
case BQSqlFrag.FilledTableRef(fill) => List(fill.tableDef.unpartitioned.assertPartition)
case BQSqlFrag.FilledTableRef(fill) =>
(fill.tableDef, outerRef) match {
case (t: BQTableDef.Table[LocalDate], Some(partitionRef: BQPartitionId.DatePartitioned)) =>
List(t.assertPartition(partitionRef.partition))
case _ => List(fill.tableDef.unpartitioned.assertPartition)
}
List(fill.tableDef.unpartitioned.assertPartition)
}

this.collect(pf).flatten.distinct
this.collect(pf(None)).flatten.distinct
}

final def allReferencedTables: Seq[BQTableLike[Any]] =
allReferencedAsPartitions(expandAndExcludeViews = true)
allReferencedAsPartitions
.map(_.wholeTable)
.filterNot(tableLike => tableLike.isInstanceOf[BQTableDef.View[_]])

Expand All @@ -155,6 +174,7 @@ object BQSqlFrag {
)
}
case class Combined(values: Seq[BQSqlFrag]) extends BQSqlFrag
case class TableRef(table: BQTableLike[LocalDate]) extends BQSqlFrag
case class PartitionRef(ref: BQPartitionId[Any]) extends BQSqlFrag

case class FillRef(fill: BQFill[Any]) extends BQSqlFrag
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -439,6 +439,9 @@ object BQSmokeTest {
(p, Nil)
}

case BQSqlFrag.TableRef(table) =>
recurse(table.unpartitioned.assertPartition.bqShow)

case BQSqlFrag.FilledTableRef(filledTable) =>
recurse(filledTable.tableDef.unpartitioned.assertPartition.bqShow)

Expand Down

0 comments on commit e8f3946

Please sign in to comment.