Skip to content

Commit

Permalink
rdar://136301817 loadS3Authz is not triggered when sc created first (a…
Browse files Browse the repository at this point in the history
…pache#2063)

This PR moves `loadS3Authz` to `SparkContext` so that the unified auth
can be effective for `SparkContext` use cases
  • Loading branch information
kazuyukitanimura authored and GitHub Enterprise committed Sep 23, 2024
1 parent 97a5259 commit 840d605
Show file tree
Hide file tree
Showing 6 changed files with 74 additions and 44 deletions.
32 changes: 32 additions & 0 deletions core/src/main/scala/org/apache/spark/SparkContext.scala
Original file line number Diff line number Diff line change
Expand Up @@ -381,6 +381,8 @@ class SparkContext(config: SparkConf) extends Logging {
Utils.setLogLevel(Level.toLevel(upperCased))
}

SparkContext.loadS3Authz(config)

try {
_conf = config.clone()
_conf.get(SPARK_LOG_LEVEL).foreach { level =>
Expand Down Expand Up @@ -2811,6 +2813,36 @@ object SparkContext extends Logging {
}
}

private def loadS3Authz(conf: SparkConf): Unit = {
if (conf.getBoolean("spark.s3authz.enabled", isS3AuthzEnabled)) {
conf.set(
"spark.hadoop.fs.s3a.custom.signers",
"CustomS3Signer" +
":com.apple.authz.shaded.acs.unified.authz.ext.s3a.UnifiedAuthzSTSCredsSigner" +
":com.apple.authz.shaded.acs.unified.authz.ext.s3a.UnifiedAuthSignerInitializer")
logWarning("""[Unified Auth] Setting "spark.hadoop.fs.s3a.custom.signers"""")
conf.set("spark.hadoop.fs.s3a.s3.signing-algorithm", "CustomS3Signer")
logWarning("""[Unified Auth] Setting "spark.hadoop.fs.s3a.s3.signing-algorithm"""")
logWarning("[Unified Auth] Enabled")
}
}

/**
* Whether UC-Spark-Authz extension is enabled
*/
private[spark] def isUCAuthzEnabled: Boolean = {
val v = System.getenv("UCAUTHZ")
v != null && v.toBoolean
}

/**
* Whether Unified Authz S3 custom signer is enabled
*/
private[spark] def isS3AuthzEnabled: Boolean = {
val v = System.getenv("S3AUTHZ")
isUCAuthzEnabled && (v == null || v.toBoolean)
}

/** Return the current active [[SparkContext]] if any. */
private[spark] def getActive: Option[SparkContext] = {
SPARK_CONTEXT_CONSTRUCTOR_LOCK.synchronized {
Expand Down
4 changes: 3 additions & 1 deletion python/pyspark/sql/tests/test_session.py
Original file line number Diff line number Diff line change
Expand Up @@ -355,7 +355,9 @@ def test_s3authz_default(self):
session = None
try:
session = SparkSession.builder.master("local").getOrCreate()
self.assertEqual(session.conf.get("spark.hadoop.fs.s3a.s3.signing-algorithm", None), None)
self.assertEqual(
session.conf.get("spark.hadoop.fs.s3a.s3.signing-algorithm", None),
"CustomS3Signer" if os.environ["UCAUTHZ"].lower() == "true" else None)
finally:
if session is not None:
session.stop()
Expand Down
34 changes: 34 additions & 0 deletions rio.yml
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,14 @@ builds:
- ENABLE_COMET=false build/sbt-nonroot -Phadoop-3 "core/test"
- ENABLE_COMET=true build/sbt-nonroot -Phadoop-3 "core/test"

- name: apple-spark-2.12_3.4.3-snapshot-core-ucauthz
branchName: branch-3.4.3-apple
build:
template: freestyle:v4:build
steps:
- UCAUTHZ=true ENABLE_COMET=false build/sbt-nonroot -Phadoop-3 "core/test"
- UCAUTHZ=true ENABLE_COMET=true build/sbt-nonroot -Phadoop-3 "core/test"

- name: apple-spark-2.13_3.4.3-snapshot-core
branchName: branch-3.4.3-apple
build:
Expand All @@ -87,6 +95,15 @@ builds:
- ENABLE_COMET=false build/sbt-nonroot -Pscala-2.13 -Phadoop-3 "core/test"
- ENABLE_COMET=true build/sbt-nonroot -Pscala-2.13 -Phadoop-3 "core/test"

- name: apple-spark-2.13_3.4.3-snapshot-core-ucauthz
branchName: branch-3.4.3-apple
build:
template: freestyle:v4:build
steps:
- dev/change-scala-version.sh 2.13
- UCAUTHZ=true ENABLE_COMET=false build/sbt-nonroot -Pscala-2.13 -Phadoop-3 "core/test"
- UCAUTHZ=true ENABLE_COMET=true build/sbt-nonroot -Pscala-2.13 -Phadoop-3 "core/test"

- name: apple-spark-2.12_3.4.3-snapshot-core-pr
branchName: branch-3.4.3-apple
build:
Expand All @@ -95,6 +112,14 @@ builds:
- ENABLE_COMET=false build/sbt-nonroot -Phadoop-3 "core/test"
- ENABLE_COMET=true build/sbt-nonroot -Phadoop-3 "core/test"

- name: apple-spark-2.12_3.4.3-snapshot-core-ucauthz-pr
branchName: branch-3.4.3-apple
build:
template: freestyle:v4:prb
steps:
- UCAUTHZ=true ENABLE_COMET=false build/sbt-nonroot -Phadoop-3 "core/test"
- UCAUTHZ=true ENABLE_COMET=true build/sbt-nonroot -Phadoop-3 "core/test"

- name: apple-spark-2.13_3.4.3-snapshot-core-pr
branchName: branch-3.4.3-apple
build:
Expand All @@ -104,6 +129,15 @@ builds:
- ENABLE_COMET=false build/sbt-nonroot -Pscala-2.13 -Phadoop-3 "core/test"
- ENABLE_COMET=true build/sbt-nonroot -Pscala-2.13 -Phadoop-3 "core/test"

- name: apple-spark-2.13_3.4.3-snapshot-core-ucauthz-pr
branchName: branch-3.4.3-apple
build:
template: freestyle:v4:prb
steps:
- dev/change-scala-version.sh 2.13
- UCAUTHZ=true ENABLE_COMET=false build/sbt-nonroot -Pscala-2.13 -Phadoop-3 "core/test"
- UCAUTHZ=true ENABLE_COMET=true build/sbt-nonroot -Pscala-2.13 -Phadoop-3 "core/test"

- name: apple-spark-2.12_3.4.3-snapshot-streaming
branchName: branch-3.4.3-apple
build:
Expand Down
30 changes: 1 addition & 29 deletions sql/core/src/main/scala/org/apache/spark/sql/SparkSession.scala
Original file line number Diff line number Diff line change
Expand Up @@ -1007,7 +1007,6 @@ object SparkSession extends Logging {
def getOrCreate(): SparkSession = synchronized {
val sparkConf = new SparkConf()
options.foreach { case (k, v) => sparkConf.set(k, v) }
loadS3Authz(sparkConf)

if (!sparkConf.get(EXECUTOR_ALLOW_SPARK_CONTEXT)) {
assertOnDriver()
Expand Down Expand Up @@ -1306,17 +1305,6 @@ object SparkSession extends Logging {
}
}

private def loadS3Authz(conf: SparkConf): Unit = {
if (conf.getBoolean("spark.s3authz.enabled", isS3AuthzEnabled)) {
conf.set(
"spark.hadoop.fs.s3a.custom.signers",
"CustomS3Signer" +
":com.apple.authz.shaded.acs.unified.authz.ext.s3a.UnifiedAuthzSTSCredsSigner" +
":com.apple.authz.shaded.acs.unified.authz.ext.s3a.UnifiedAuthSignerInitializer")
conf.set("spark.hadoop.fs.s3a.s3.signing-algorithm", "CustomS3Signer")
}
}

private def loadCometExtension(sparkContext: SparkContext): Seq[String] = {
if (sparkContext.getConf.getBoolean("spark.comet.enabled", isCometEnabled)) {
Seq("org.apache.comet.CometSparkSessionExtensions")
Expand All @@ -1326,7 +1314,7 @@ object SparkSession extends Logging {
}

private def loadUCAuthzExtension(sparkContext: SparkContext): Seq[String] = {
if (sparkContext.getConf.getBoolean("spark.ucauthz.enabled", isUCAuthzEnabled)) {
if (sparkContext.getConf.getBoolean("spark.ucauthz.enabled", SparkContext.isUCAuthzEnabled)) {
Seq("com.apple.acs.illuminata.authorizer.UCSparkSQLExtension")
} else {
Seq.empty
Expand Down Expand Up @@ -1394,22 +1382,6 @@ object SparkSession extends Logging {
v == null || v.toBoolean
}

/**
* Whether UC-Spark-Authz extension is enabled
*/
def isUCAuthzEnabled: Boolean = {
val v = System.getenv("UCAUTHZ")
v != null && v.toBoolean
}

/**
* Whether Unified Authz S3 custom signer is enabled
*/
def isS3AuthzEnabled: Boolean = {
val v = System.getenv("S3AUTHZ")
isUCAuthzEnabled && (v == null || v.toBoolean)
}

/**
* Whether Iceberg extension is enabled
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ import org.scalactic.source.Position
import org.scalatest.{BeforeAndAfterAll, Suite, Tag}
import org.scalatest.concurrent.Eventually

import org.apache.spark.SparkFunSuite
import org.apache.spark.{SparkContext, SparkFunSuite}
import org.apache.spark.sql._
import org.apache.spark.sql.catalyst.FunctionIdentifier
import org.apache.spark.sql.catalyst.analysis.NoSuchTableException
Expand Down Expand Up @@ -282,12 +282,12 @@ private[sql] trait SQLTestUtilsBase
/**
* Whether UC-Spark-Authz extension is enabled
*/
protected def isUCAuthzEnabled: Boolean = SparkSession.isUCAuthzEnabled
protected def isUCAuthzEnabled: Boolean = SparkContext.isUCAuthzEnabled

/**
* Whether Unified Authz S3 custom signer is enabled
*/
protected def isS3AuthzEnabled: Boolean = SparkSession.isS3AuthzEnabled
protected def isS3AuthzEnabled: Boolean = SparkContext.isS3AuthzEnabled

protected override def withSQLConf(pairs: (String, String)*)(f: => Unit): Unit = {
SparkSession.setActiveSession(spark)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,6 @@ import org.apache.spark.internal.Logging
import org.apache.spark.internal.config
import org.apache.spark.internal.config.UI._
import org.apache.spark.sql.{DataFrame, Dataset, SparkSession, SQLContext}
import org.apache.spark.sql.SparkSession.{isS3AuthzEnabled, isUCAuthzEnabled}
import org.apache.spark.sql.catalyst.analysis.UnresolvedRelation
import org.apache.spark.sql.catalyst.catalog.ExternalCatalogWithListener
import org.apache.spark.sql.catalyst.expressions.CodegenObjectFactoryMode
Expand Down Expand Up @@ -100,22 +99,13 @@ object TestHive
}
}

if (isUCAuthzEnabled) {
if (SparkContext.isUCAuthzEnabled) {
val extensions = conf.getOption("spark.sql.extensions").toSeq ++
Seq("com.apple.acs.illuminata.authorizer.UCSparkSQLExtension")
conf
.set("spark.sql.extensions", extensions.mkString(","))
.set("spark.ucauthz.enabled", "true")
}
if (isS3AuthzEnabled) {
conf
.set("spark.hadoop.fs.s3a.custom.signers",
"CustomS3Signer" +
":com.apple.authz.shaded.acs.unified.authz.ext.s3a.UnifiedAuthzSTSCredsSigner" +
":com.apple.authz.shaded.acs.unified.authz.ext.s3a.UnifiedAuthSignerInitializer")
.set("spark.hadoop.fs.s3a.s3.signing-algorithm", "CustomS3Signer")
.set("spark.s3authz.enabled", "true")
}
conf
}
))
Expand Down

0 comments on commit 840d605

Please sign in to comment.