Skip to content

Commit

Permalink
Added snowflake extra options to resources (#12)
Browse files Browse the repository at this point in the history
Co-authored-by: Victor Prats <[email protected]>
  • Loading branch information
piffall and Victor Prats authored May 2, 2022
1 parent d88cbb1 commit 74871af
Show file tree
Hide file tree
Showing 5 changed files with 18 additions and 11 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,8 @@ case class SnowflakeJoinReader(reader: SnowflakeReader, joinData: DataFrame)(
reader.database,
reader.schema,
stagingTable,
SaveMode.Overwrite).write(joinData)
SaveMode.Overwrite,
reader.sfExtraOptions).write(joinData)
reader.copy(table = None, query = Some(query)).read()
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,8 @@ case class SnowflakeMerger(account: String,
schema: String,
sourceTable: String,
targetTable: String,
pkColumns: Seq[String])(implicit spark: SparkSession) {
pkColumns: Seq[String],
sfExtraOptions: Map[String, String] = Map())(implicit spark: SparkSession) {

val sfOptions = Map(
"sfURL" -> s"${account}.snowflakecomputing.com",
Expand All @@ -38,7 +39,7 @@ case class SnowflakeMerger(account: String,
|WHEN MATCHED THEN DELETE
|""".stripMargin

Utils.runQuery(sfOptions, deleteQuery)
Utils.runQuery(sfOptions ++ sfExtraOptions, deleteQuery)
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,9 @@ case class SnowflakeReader(account: String,
database: String,
schema: String,
table: Option[String] = None,
query: Option[String] = None)(implicit spark: SparkSession)
extends ResourceReader {
query: Option[String] = None,
sfExtraOptions: Map[String, String] = Map())(implicit spark: SparkSession)
extends ResourceReader {

val settings = (table, query) match {
case (Some(tableName), None) => ("dbtable", tableName)
Expand All @@ -35,7 +36,7 @@ case class SnowflakeReader(account: String,
override def read(): DataFrame = {
spark.read
.format("net.snowflake.spark.snowflake")
.options(sfOptions)
.options(sfOptions ++ sfExtraOptions)
.load()
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,9 @@ case class SnowflakeWriter(account: String,
schema: String,
table: String,
mode: SaveMode = SaveMode.Ignore,
sfExtraOptions: Map[String, String] = Map(),
preScript: Option[String] = None)(implicit spark: SparkSession)
extends ResourceWriter {
extends ResourceWriter {

val sfOptions = Map(
"sfURL" -> s"${account}.snowflakecomputing.com",
Expand All @@ -28,7 +29,7 @@ case class SnowflakeWriter(account: String,
override def write(data: DataFrame): Unit = {
data.write
.format("net.snowflake.spark.snowflake")
.options(sfOptions)
.options(sfOptions ++ sfExtraOptions)
.mode(mode)
.save()
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,8 @@ case class SnowflakeWriterMerger(writer: SnowflakeWriter, columns: Seq[String])(
writer.schema,
stagingTable,
targetTable,
columns).merge()
columns,
writer.sfExtraOptions).merge()
}

private def targetExists(): Boolean = {
Expand All @@ -45,8 +46,10 @@ case class SnowflakeWriterMerger(writer: SnowflakeWriter, columns: Seq[String])(
writer.warehouse,
writer.database,
"INFORMATION_SCHEMA",
query =
Some(s"SELECT COUNT(1) = 1 FROM TABLES WHERE TABLE_NAME = '${targetTable}'"))
query = Some(
s"SELECT COUNT(1) = 1 FROM TABLES WHERE TABLE_NAME = '${targetTable}'"),
sfExtraOptions = writer.sfExtraOptions
)

reader
.read()
Expand Down

0 comments on commit 74871af

Please sign in to comment.