From 35134a0c0f0d36f6203836f9159f70e08c5514ef Mon Sep 17 00:00:00 2001
From: Alex Moore
Date: Fri, 5 Mar 2021 09:55:52 -0500
Subject: [PATCH 1/2] Adding sse_kms_key optional parameter and using in
Redshift UNLOAD operation
---
.../spark/redshift/Parameters.scala | 6 +++
.../spark/redshift/RedshiftRelation.scala | 9 ++--
.../spark/redshift/RedshiftSourceSuite.scala | 44 +++++++++++++++++++
3 files changed, 56 insertions(+), 3 deletions(-)
diff --git a/src/main/scala/com/databricks/spark/redshift/Parameters.scala b/src/main/scala/com/databricks/spark/redshift/Parameters.scala
index 875f5b75..71b645d5 100644
--- a/src/main/scala/com/databricks/spark/redshift/Parameters.scala
+++ b/src/main/scala/com/databricks/spark/redshift/Parameters.scala
@@ -30,6 +30,7 @@ private[redshift] object Parameters {
// * sortkeyspec has no default, but is optional
// * distkey has no default, but is optional unless using diststyle KEY
// * jdbcdriver has no default, but is optional
+ // * sse_kms_key has no default, but is optional
"forward_spark_s3_credentials" -> "false",
"tempformat" -> "AVRO",
@@ -285,5 +286,10 @@ private[redshift] object Parameters {
new BasicSessionCredentials(accessKey, secretAccessKey, sessionToken))
}
}
+
+ /**
+ * The AWS SSE-KMS key to use for encryption during UNLOAD operations instead of AWS's default encryption
+ */
+ def sseKmsKey: Option[String] = parameters.get("sse_kms_key")
}
}
diff --git a/src/main/scala/com/databricks/spark/redshift/RedshiftRelation.scala b/src/main/scala/com/databricks/spark/redshift/RedshiftRelation.scala
index 31dc11b2..02638638 100644
--- a/src/main/scala/com/databricks/spark/redshift/RedshiftRelation.scala
+++ b/src/main/scala/com/databricks/spark/redshift/RedshiftRelation.scala
@@ -130,7 +130,7 @@ private[redshift] case class RedshiftRelation(
} else {
// Unload data from Redshift into a temporary directory in S3:
val tempDir = params.createPerQueryTempDir()
- val unloadSql = buildUnloadStmt(requiredColumns, filters, tempDir, creds)
+ val unloadSql = buildUnloadStmt(requiredColumns, filters, tempDir, creds, params.sseKmsKey)
log.info(unloadSql)
val conn = jdbcWrapper.getConnector(params.jdbcDriver, params.jdbcUrl, params.credentials)
try {
@@ -176,7 +176,8 @@ private[redshift] case class RedshiftRelation(
requiredColumns: Array[String],
filters: Array[Filter],
tempDir: String,
- creds: AWSCredentialsProvider): String = {
+ creds: AWSCredentialsProvider,
+ sseKmsKey: Option[String]): String = {
assert(!requiredColumns.isEmpty)
// Always quote column names:
val columnList = requiredColumns.map(col => s""""$col"""").mkString(", ")
@@ -193,7 +194,9 @@ private[redshift] case class RedshiftRelation(
// the credentials passed via `credsString`.
val fixedUrl = Utils.fixS3Url(Utils.removeCredentialsFromURI(new URI(tempDir)).toString)
- s"UNLOAD ('$query') TO '$fixedUrl' WITH CREDENTIALS '$credsString' ESCAPE MANIFEST"
+ val sseKmsClause = sseKmsKey.map(key => s"KMS_KEY_ID '$key' ENCRYPTED").getOrElse("")
+
+ s"UNLOAD ('$query') TO '$fixedUrl' WITH CREDENTIALS '$credsString' ESCAPE MANIFEST $sseKmsClause"
}
private def pruneSchema(schema: StructType, columns: Array[String]): StructType = {
diff --git a/src/test/scala/com/databricks/spark/redshift/RedshiftSourceSuite.scala b/src/test/scala/com/databricks/spark/redshift/RedshiftSourceSuite.scala
index ac2a644a..8784d248 100644
--- a/src/test/scala/com/databricks/spark/redshift/RedshiftSourceSuite.scala
+++ b/src/test/scala/com/databricks/spark/redshift/RedshiftSourceSuite.scala
@@ -305,6 +305,50 @@ class RedshiftSourceSuite
mockRedshift.verifyThatExpectedQueriesWereIssued(Seq(expectedQuery))
}
+ test("DefaultSource adds SSE-KMS clause") {
+ // scalastyle:off
+ unloadedData =
+ """
+ |1|t
+ |1|f
+ |0|
+ |0|f
+ ||
+ """.stripMargin.trim
+ // scalastyle:on
+ val kmsKeyId = "abc123"
+ val expectedQuery = (
+ "UNLOAD \\('SELECT \"testbyte\", \"testbool\" FROM \"PUBLIC\".\"test_table\" '\\) " +
+ "TO '.*' " +
+ "WITH CREDENTIALS 'aws_access_key_id=test1;aws_secret_access_key=test2' " +
+ "ESCAPE MANIFEST " +
+ "KMS_KEY_ID 'abc123' ENCRYPTED").r
+ val mockRedshift =
+ new MockRedshift(defaultParams("url"), Map("test_table" -> TestUtils.testSchema))
+ // Construct the source with a custom schema
+ val source = new DefaultSource(mockRedshift.jdbcWrapper, _ => mockS3Client)
+ val params = defaultParams + ("sse_kms_key" -> kmsKeyId)
+ val relation = source.createRelation(testSqlContext, params, TestUtils.testSchema)
+ val resultSchema =
+ StructType(Seq(StructField("testbyte", ByteType), StructField("testbool", BooleanType)))
+
+ val rdd = relation.asInstanceOf[PrunedFilteredScan]
+ .buildScan(Array("testbyte", "testbool"), Array.empty[Filter])
+ .mapPartitions { iter =>
+ val fromRow = RowEncoder(resultSchema).resolveAndBind().fromRow _
+ iter.asInstanceOf[Iterator[InternalRow]].map(fromRow)
+ }
+ val prunedExpectedValues = Array(
+ Row(1.toByte, true),
+ Row(1.toByte, false),
+ Row(0.toByte, null),
+ Row(0.toByte, false),
+ Row(null, null))
+ assert(rdd.collect() === prunedExpectedValues)
+ mockRedshift.verifyThatConnectionsWereClosed()
+ mockRedshift.verifyThatExpectedQueriesWereIssued(Seq(expectedQuery))
+ }
+
test("DefaultSource supports preactions options to run queries before running COPY command") {
val mockRedshift = new MockRedshift(
defaultParams("url"),
From cef8c3a8d7d9db57af3ed868232fb55eed263b70 Mon Sep 17 00:00:00 2001
From: Alex Moore
Date: Fri, 5 Mar 2021 10:13:02 -0500
Subject: [PATCH 2/2] Update docs
---
README.md | 13 +++++++++++++
1 file changed, 13 insertions(+)
diff --git a/README.md b/README.md
index 2a299819..530e23ff 100644
--- a/README.md
+++ b/README.md
@@ -624,6 +624,19 @@ for other options).
Note that since these options are appended to the end of the COPY command, only options that make sense
at the end of the command can be used, but that should cover most possible use cases.
+
+
+ sse_kms_key |
+ No |
+ No default |
+
+ The KMS key ID to use for server-side encryption in S3 during the Redshift UNLOAD operation rather than AWS's default
+encryption. The Redshift IAM role must have access to the KMS key for writing with it, and the Spark IAM role must have access
+to the key for read operations. Reading the encrypted data requires no changes (AWS handles this under-the-hood) so long as
+Spark's IAM role has the proper access.
+See the Redshift docs
+for more information.
+ |
tempformat (Experimental) |