-
Notifications
You must be signed in to change notification settings - Fork 513
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Support Snowflake (#5500) #5502
base: main
Are you sure you want to change the base?
Conversation
b53021e
to
1cf2986
Compare
Codecov ReportAttention: Patch coverage is
Additional details and impacted files@@ Coverage Diff @@
## main #5502 +/- ##
==========================================
- Coverage 61.43% 61.25% -0.18%
==========================================
Files 312 315 +3
Lines 11103 11187 +84
Branches 762 765 +3
==========================================
+ Hits 6821 6853 +32
- Misses 4282 4334 +52 ☔ View full report in Codecov by Sentry. 🚨 Try these New Features:
|
d179ff2
to
9f6010d
Compare
(edit: that has been fixed) |
74d15ae
to
901625d
Compare
@RustedBones I finally had the time to fix this build :) |
Sorry for the delay. Will look at it this week ! |
|
||
package com.spotify.scio.snowflake | ||
|
||
trait SnowflakeAuthenticationOptions |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
trait SnowflakeAuthenticationOptions | |
sealed trait SnowflakeAuthenticationOptions |
To avoid the match may not be exhaustive
warning
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
fixup 9ede3f3
.withWarehouse(connectionOptions.warehouse) | ||
|
||
connectionOptions.schema | ||
.map(schema => datasourceBeforeSchema.withSchema(schema)) | ||
.getOrElse(datasourceBeforeSchema) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nit: to avoid using intermediate variable, you can use syntactic sugar from scala.util.chaining._
.
(can be done above too)
.withWarehouse(connectionOptions.warehouse) | |
connectionOptions.schema | |
.map(schema => datasourceBeforeSchema.withSchema(schema)) | |
.getOrElse(datasourceBeforeSchema) | |
.withWarehouse(connectionOptions.warehouse) | |
.pipe(ds => connectionOptions.schema.fold(ds)(ds.withSchema)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
TIL
fixup 25c5a76
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
With fold
: a9b56c4
new CsvMapper[T] { | ||
override def mapRow(parts: Array[String]): T = { | ||
val unsnowedParts = parts.map { | ||
case "\\N" => "" // needs to be mapped to an Option |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I never used Snowflake. Can you give some context for this case ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
SnowflakeIO
from Apache Beam uses COPY
from Snowflake, that simply exports to CSV, that is then read from storage. And this COPY
uses \N
to represent null
.
Thing is, kantan maps empty string with None
, so we provide it.
scio-snowflake/src/main/scala/com/spotify/scio/snowflake/SnowflakeOptions.scala
Outdated
Show resolved
Hide resolved
scio-snowflake/src/main/scala/com/spotify/scio/snowflake/SnowflakeIO.scala
Show resolved
Hide resolved
537f4f1
to
5ccff09
Compare
Hey @turb, can you test the latest change on a real Snowflake DB to make sure everything still works ? |
Still works! (on read) |
private[snowflake] def snowflakeIoId(opts: SnowflakeConnectionOptions, target: String): String = { | ||
// source params | ||
val params = Option(opts.database).map(db => s"db=$db") ++ | ||
Option(opts.warehouse).map(db => s"warehouse=$db") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Comparing this to other complex testId
implementations in our Scio IOs (example 1, 2), I think a format like this would fit better:
SnowflakeIO(url, target, warehouse?, db?)
wdyt @RustedBones
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is the same as JdbcIO
where we 'normalize' the connection url, hiding credentials. See https://docs.snowflake.com/en/developer-guide/jdbc/jdbc-parameters
Fix #5500
First draft of SnowflakeIO for scio, using the one from Beam.
It has:
ScioContext
andSCollection
implicitsNotes: