Skip to content

Commit

Permalink
New BQSqlFrag.TableRef (#120)
Browse files Browse the repository at this point in the history
* New BQSqlFrag.TableRef.
Use view partitionId to assert partition on underlying TableRef, if view is date partitioned.

* New case class WholeTable, so we can reference a table and keep partitioning info in query.

* Match on BQPartitionType instead

* BQSqlFrag.TableRef.asString should produce tableId with backticks

* Remove FilledTableRef base case
  • Loading branch information
HenningKoller authored Jul 17, 2023
1 parent 76bd5af commit 8bb5d5f
Show file tree
Hide file tree
Showing 4 changed files with 35 additions and 8 deletions.
2 changes: 2 additions & 0 deletions core/src/main/scala/no/nrk/bigquery/BQShow.scala
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,8 @@ trait BQShowInstances {
implicit def bqShowTablesLike[I[t] <: Iterable[t], T <: BQTableLike[Unit]]: BQShow[I[T]] =
tables => BQSqlFrag.Combined(tables.map(_.assertPartition.bqShow).toSeq)

implicit def bqShowWholeTable[P]: BQShow[WholeTable[P]] = x => BQSqlFrag.TableRef(x.table)

implicit def bqShowFill[Fill <: BQFill[Any]]: BQShow[Fill] =
BQSqlFrag.FillRef.apply

Expand Down
35 changes: 27 additions & 8 deletions core/src/main/scala/no/nrk/bigquery/BQSqlFrag.scala
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ sealed trait BQSqlFrag {
case x @ BQSqlFrag.PartitionRef(_) => x
case x @ BQSqlFrag.FillRef(_) => x
case x @ BQSqlFrag.FilledTableRef(_) => x
case x @ BQSqlFrag.TableRef(_) => x
}

final def ++(other: BQSqlFrag): BQSqlFrag =
Expand Down Expand Up @@ -55,6 +56,8 @@ sealed trait BQSqlFrag {
.PartitionRef(fill.tableDef.unpartitioned.assertPartition)
.asString

case BQSqlFrag.TableRef(table) => table.tableId.asFragment.asString

case BQSqlFrag.PartitionRef(partitionId) =>
partitionId match {
case x @ BQPartitionId.MonthPartitioned(_, _) => x.asSubQuery.asString
Expand All @@ -81,6 +84,7 @@ sealed trait BQSqlFrag {
case BQSqlFrag.PartitionRef(_) => Nil
case BQSqlFrag.FillRef(_) => Nil
case BQSqlFrag.FilledTableRef(_) => Nil
case BQSqlFrag.TableRef(_) => Nil
}

def extractInnerBody(frag: BQSqlFrag): Option[BQSqlFrag] =
Expand Down Expand Up @@ -115,21 +119,35 @@ sealed trait BQSqlFrag {
final def allReferencedAsPartitions: Seq[BQPartitionId[Any]] =
allReferencedAsPartitions(expandAndExcludeViews = true)
final def allReferencedAsPartitions(expandAndExcludeViews: Boolean): Seq[BQPartitionId[Any]] = {
def pf: PartialFunction[BQSqlFrag, List[BQPartitionId[Any]]] = {
case BQSqlFrag.PartitionRef(ref) =>
ref.wholeTable match {
case tableDef: BQTableDef.View[_] if expandAndExcludeViews => tableDef.query.collect(pf).flatten
case _ => List(ref)
def pf(outerRef: Option[BQPartitionId[Any]]): PartialFunction[BQSqlFrag, List[BQPartitionId[Any]]] = {
case BQSqlFrag.PartitionRef(partitionRef) =>
partitionRef.wholeTable match {
case tableDef: BQTableDef.View[_] if expandAndExcludeViews =>
tableDef.query.collect(pf(Some(partitionRef))).flatten
case _ => List(partitionRef)
}

case BQSqlFrag.TableRef(table) =>
(table.partitionType, outerRef) match {
case (partitionType: BQPartitionType.DatePartitioned, Some(partitionRef: BQPartitionId.DatePartitioned)) =>
List(table.withTableType(partitionType).assertPartition(partitionRef.partition))
case (_, _) => List(table.unpartitioned.assertPartition)
}

case BQSqlFrag.FillRef(fill) => List(fill.destination)
case BQSqlFrag.FilledTableRef(fill) => List(fill.tableDef.unpartitioned.assertPartition)
case BQSqlFrag.FilledTableRef(fill) =>
(fill.tableDef.partitionType, outerRef) match {
case (partitionType: BQPartitionType.DatePartitioned, Some(partitionRef: BQPartitionId.DatePartitioned)) =>
List(fill.tableDef.withTableType(partitionType).assertPartition(partitionRef.partition))
case (_, _) => List(fill.tableDef.unpartitioned.assertPartition)
}
}

this.collect(pf).flatten.distinct
this.collect(pf(None)).flatten.distinct
}

final def allReferencedTables: Seq[BQTableLike[Any]] =
allReferencedAsPartitions(expandAndExcludeViews = true)
allReferencedAsPartitions
.map(_.wholeTable)
.filterNot(tableLike => tableLike.isInstanceOf[BQTableDef.View[_]])

Expand All @@ -155,6 +173,7 @@ object BQSqlFrag {
)
}
case class Combined(values: Seq[BQSqlFrag]) extends BQSqlFrag
case class TableRef(table: BQTableLike[Any]) extends BQSqlFrag
case class PartitionRef(ref: BQPartitionId[Any]) extends BQSqlFrag

case class FillRef(fill: BQFill[Any]) extends BQSqlFrag
Expand Down
3 changes: 3 additions & 0 deletions core/src/main/scala/no/nrk/bigquery/BQTableLike.scala
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ sealed trait BQTableLike[+P] {
def partitionType: BQPartitionType[P]
def withTableType[PP](tpe: BQPartitionType[PP]): BQTableLike[PP]
def unpartitioned: BQTableLike[Unit]
def wholeTable: WholeTable[P] = WholeTable(this)
}

object BQTableLike {
Expand Down Expand Up @@ -96,6 +97,8 @@ sealed trait BQTableDef[+P] extends BQTableLike[P] {
labels.verify(tableId)
}

case class WholeTable[+P](table: BQTableLike[P])

object BQTableDef {

/** @param schema
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -439,6 +439,9 @@ object BQSmokeTest {
(p, Nil)
}

case BQSqlFrag.TableRef(table) =>
recurse(table.unpartitioned.assertPartition.bqShow)

case BQSqlFrag.FilledTableRef(filledTable) =>
recurse(filledTable.tableDef.unpartitioned.assertPartition.bqShow)

Expand Down

0 comments on commit 8bb5d5f

Please sign in to comment.