Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-24518][CORE] Using Hadoop credential provider API to store password #21548

Closed
wants to merge 3 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 10 additions & 1 deletion core/src/main/scala/org/apache/spark/SSLOptions.scala
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import java.io.File
import java.security.NoSuchAlgorithmException
import javax.net.ssl.SSLContext

import org.apache.hadoop.conf.Configuration
import org.eclipse.jetty.util.ssl.SslContextFactory

import org.apache.spark.internal.Logging
Expand Down Expand Up @@ -163,11 +164,16 @@ private[spark] object SSLOptions extends Logging {
* missing in SparkConf, the corresponding setting is used from the default configuration.
*
* @param conf Spark configuration object where the settings are collected from
* @param hadoopConf Hadoop configuration to get settings
* @param ns the namespace name
* @param defaults the default configuration
* @return [[org.apache.spark.SSLOptions]] object
*/
def parse(conf: SparkConf, ns: String, defaults: Option[SSLOptions] = None): SSLOptions = {
def parse(
conf: SparkConf,
hadoopConf: Configuration,
ns: String,
defaults: Option[SSLOptions] = None): SSLOptions = {
val enabled = conf.getBoolean(s"$ns.enabled", defaultValue = defaults.exists(_.enabled))

val port = conf.getWithSubstitution(s"$ns.port").map(_.toInt)
Expand All @@ -179,9 +185,11 @@ private[spark] object SSLOptions extends Logging {
.orElse(defaults.flatMap(_.keyStore))

val keyStorePassword = conf.getWithSubstitution(s"$ns.keyStorePassword")
.orElse(Option(hadoopConf.getPassword(s"$ns.keyStorePassword")).map(new String(_)))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Needs charset (also in others).

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@vanzin the return value of hadoopConf#getPassword is char array, so there's no way to specify the charset here.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

new String takes a charset. (In fact the constructor you're calling should be deprecated...)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hi @vanzin , I checked jdk8 doc again, I don't find a String constructor which takes both char array and charset as parameters.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh, my bad, that's a char array, not a byte array. All is good then.

.orElse(defaults.flatMap(_.keyStorePassword))

val keyPassword = conf.getWithSubstitution(s"$ns.keyPassword")
.orElse(Option(hadoopConf.getPassword(s"$ns.keyPassword")).map(new String(_)))
.orElse(defaults.flatMap(_.keyPassword))

val keyStoreType = conf.getWithSubstitution(s"$ns.keyStoreType")
Expand All @@ -194,6 +202,7 @@ private[spark] object SSLOptions extends Logging {
.orElse(defaults.flatMap(_.trustStore))

val trustStorePassword = conf.getWithSubstitution(s"$ns.trustStorePassword")
.orElse(Option(hadoopConf.getPassword(s"$ns.trustStorePassword")).map(new String(_)))
.orElse(defaults.flatMap(_.trustStorePassword))

val trustStoreType = conf.getWithSubstitution(s"$ns.trustStoreType")
Expand Down
9 changes: 6 additions & 3 deletions core/src/main/scala/org/apache/spark/SecurityManager.scala
Original file line number Diff line number Diff line change
Expand Up @@ -19,11 +19,11 @@ package org.apache.spark

import java.net.{Authenticator, PasswordAuthentication}
import java.nio.charset.StandardCharsets.UTF_8
import javax.net.ssl._

import org.apache.hadoop.io.Text
import org.apache.hadoop.security.{Credentials, UserGroupInformation}

import org.apache.spark.deploy.SparkHadoopUtil
import org.apache.spark.internal.Logging
import org.apache.spark.internal.config._
import org.apache.spark.launcher.SparkLauncher
Expand Down Expand Up @@ -111,11 +111,14 @@ private[spark] class SecurityManager(
)
}

private val hadoopConf = SparkHadoopUtil.get.newConfiguration(sparkConf)
// the default SSL configuration - it will be used by all communication layers unless overwritten
private val defaultSSLOptions = SSLOptions.parse(sparkConf, "spark.ssl", defaults = None)
private val defaultSSLOptions =
SSLOptions.parse(sparkConf, hadoopConf, "spark.ssl", defaults = None)

def getSSLOptions(module: String): SSLOptions = {
val opts = SSLOptions.parse(sparkConf, s"spark.ssl.$module", Some(defaultSSLOptions))
val opts =
SSLOptions.parse(sparkConf, hadoopConf, s"spark.ssl.$module", Some(defaultSSLOptions))
logDebug(s"Created SSL options for $module: $opts")
opts
}
Expand Down
75 changes: 69 additions & 6 deletions core/src/test/scala/org/apache/spark/SSLOptionsSuite.scala
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,11 @@
package org.apache.spark

import java.io.File
import java.util.UUID
import javax.net.ssl.SSLContext

import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.security.alias.{CredentialProvider, CredentialProviderFactory}
import org.scalatest.BeforeAndAfterAll

import org.apache.spark.util.SparkConfWithEnv
Expand All @@ -40,6 +43,7 @@ class SSLOptionsSuite extends SparkFunSuite with BeforeAndAfterAll {
.toSet

val conf = new SparkConf
val hadoopConf = new Configuration()
conf.set("spark.ssl.enabled", "true")
conf.set("spark.ssl.keyStore", keyStorePath)
conf.set("spark.ssl.keyStorePassword", "password")
Expand All @@ -49,7 +53,7 @@ class SSLOptionsSuite extends SparkFunSuite with BeforeAndAfterAll {
conf.set("spark.ssl.enabledAlgorithms", algorithms.mkString(","))
conf.set("spark.ssl.protocol", "TLSv1.2")

val opts = SSLOptions.parse(conf, "spark.ssl")
val opts = SSLOptions.parse(conf, hadoopConf, "spark.ssl")

assert(opts.enabled === true)
assert(opts.trustStore.isDefined === true)
Expand All @@ -70,6 +74,7 @@ class SSLOptionsSuite extends SparkFunSuite with BeforeAndAfterAll {
val trustStorePath = new File(this.getClass.getResource("/truststore").toURI).getAbsolutePath

val conf = new SparkConf
val hadoopConf = new Configuration()
conf.set("spark.ssl.enabled", "true")
conf.set("spark.ssl.keyStore", keyStorePath)
conf.set("spark.ssl.keyStorePassword", "password")
Expand All @@ -80,8 +85,8 @@ class SSLOptionsSuite extends SparkFunSuite with BeforeAndAfterAll {
"TLS_RSA_WITH_AES_128_CBC_SHA, TLS_RSA_WITH_AES_256_CBC_SHA")
conf.set("spark.ssl.protocol", "SSLv3")

val defaultOpts = SSLOptions.parse(conf, "spark.ssl", defaults = None)
val opts = SSLOptions.parse(conf, "spark.ssl.ui", defaults = Some(defaultOpts))
val defaultOpts = SSLOptions.parse(conf, hadoopConf, "spark.ssl", defaults = None)
val opts = SSLOptions.parse(conf, hadoopConf, "spark.ssl.ui", defaults = Some(defaultOpts))

assert(opts.enabled === true)
assert(opts.trustStore.isDefined === true)
Expand All @@ -103,6 +108,7 @@ class SSLOptionsSuite extends SparkFunSuite with BeforeAndAfterAll {
val trustStorePath = new File(this.getClass.getResource("/truststore").toURI).getAbsolutePath

val conf = new SparkConf
val hadoopConf = new Configuration()
conf.set("spark.ssl.enabled", "true")
conf.set("spark.ssl.ui.enabled", "false")
conf.set("spark.ssl.ui.port", "4242")
Expand All @@ -117,8 +123,8 @@ class SSLOptionsSuite extends SparkFunSuite with BeforeAndAfterAll {
conf.set("spark.ssl.ui.enabledAlgorithms", "ABC, DEF")
conf.set("spark.ssl.protocol", "SSLv3")

val defaultOpts = SSLOptions.parse(conf, "spark.ssl", defaults = None)
val opts = SSLOptions.parse(conf, "spark.ssl.ui", defaults = Some(defaultOpts))
val defaultOpts = SSLOptions.parse(conf, hadoopConf, "spark.ssl", defaults = None)
val opts = SSLOptions.parse(conf, hadoopConf, "spark.ssl.ui", defaults = Some(defaultOpts))

assert(opts.enabled === false)
assert(opts.port === Some(4242))
Expand All @@ -139,14 +145,71 @@ class SSLOptionsSuite extends SparkFunSuite with BeforeAndAfterAll {
val conf = new SparkConfWithEnv(Map(
"ENV1" -> "val1",
"ENV2" -> "val2"))
val hadoopConf = new Configuration()

conf.set("spark.ssl.enabled", "true")
conf.set("spark.ssl.keyStore", "${env:ENV1}")
conf.set("spark.ssl.trustStore", "${env:ENV2}")

val opts = SSLOptions.parse(conf, "spark.ssl", defaults = None)
val opts = SSLOptions.parse(conf, hadoopConf, "spark.ssl", defaults = None)
assert(opts.keyStore === Some(new File("val1")))
assert(opts.trustStore === Some(new File("val2")))
}

test("get password from Hadoop credential provider") {
val keyStorePath = new File(this.getClass.getResource("/keystore").toURI).getAbsolutePath
val trustStorePath = new File(this.getClass.getResource("/truststore").toURI).getAbsolutePath

val conf = new SparkConf
val hadoopConf = new Configuration()
val tmpPath = s"localjceks://file${sys.props("java.io.tmpdir")}/test-" +
s"${UUID.randomUUID().toString}.jceks"
val provider = createCredentialProvider(tmpPath, hadoopConf)

conf.set("spark.ssl.enabled", "true")
conf.set("spark.ssl.keyStore", keyStorePath)
storePassword(provider, "spark.ssl.keyStorePassword", "password")
storePassword(provider, "spark.ssl.keyPassword", "password")
conf.set("spark.ssl.trustStore", trustStorePath)
storePassword(provider, "spark.ssl.trustStorePassword", "password")
conf.set("spark.ssl.enabledAlgorithms",
"TLS_RSA_WITH_AES_128_CBC_SHA, TLS_RSA_WITH_AES_256_CBC_SHA")
conf.set("spark.ssl.protocol", "SSLv3")

val defaultOpts = SSLOptions.parse(conf, hadoopConf, "spark.ssl", defaults = None)
val opts = SSLOptions.parse(conf, hadoopConf, "spark.ssl.ui", defaults = Some(defaultOpts))

assert(opts.enabled === true)
assert(opts.trustStore.isDefined === true)
assert(opts.trustStore.get.getName === "truststore")
assert(opts.trustStore.get.getAbsolutePath === trustStorePath)
assert(opts.keyStore.isDefined === true)
assert(opts.keyStore.get.getName === "keystore")
assert(opts.keyStore.get.getAbsolutePath === keyStorePath)
assert(opts.trustStorePassword === Some("password"))
assert(opts.keyStorePassword === Some("password"))
assert(opts.keyPassword === Some("password"))
assert(opts.protocol === Some("SSLv3"))
assert(opts.enabledAlgorithms ===
Set("TLS_RSA_WITH_AES_128_CBC_SHA", "TLS_RSA_WITH_AES_256_CBC_SHA"))
}

private def createCredentialProvider(tmpPath: String, conf: Configuration): CredentialProvider = {
conf.set(CredentialProviderFactory.CREDENTIAL_PROVIDER_PATH, tmpPath)

val provider = CredentialProviderFactory.getProviders(conf).get(0)
if (provider == null) {
throw new IllegalStateException(s"Fail to get credential provider with path $tmpPath")
}

provider
}

private def storePassword(
provider: CredentialProvider,
passwordKey: String,
password: String): Unit = {
provider.createCredentialEntry(passwordKey, password.toCharArray)
provider.flush()
}
}
23 changes: 22 additions & 1 deletion docs/security.md
Original file line number Diff line number Diff line change
Expand Up @@ -177,7 +177,7 @@ ACLs can be configured for either users or groups. Configuration entries accept
lists as input, meaning multiple users or groups can be given the desired privileges. This can be
used if you run on a shared cluster and have a set of administrators or developers who need to
monitor applications they may not have started themselves. A wildcard (`*`) added to specific ACL
means that all users will have the respective pivilege. By default, only the user submitting the
means that all users will have the respective privilege. By default, only the user submitting the
application is added to the ACLs.

Group membership is established by using a configurable group mapping provider. The mapper is
Expand Down Expand Up @@ -446,6 +446,27 @@ replaced with one of the above namespaces.
</tr>
</table>

Spark also supports retrieving `${ns}.keyPassword`, `${ns}.keyStorePassword` and `${ns}.trustStorePassword` from
[Hadoop Credential Providers](https://hadoop.apache.org/docs/current/hadoop-project-dist/hadoop-common/CredentialProviderAPI.html).
User could store password into credential file and make it accessible by different components, like:

```
hadoop credential create spark.ssl.keyPassword -value password \
-provider jceks://[email protected]:9001/user/backup/ssl.jceks
```

To configure the location of the credential provider, set the `hadoop.security.credential.provider.path`
config option in the Hadoop configuration used by Spark, like:

```
<property>
<name>hadoop.security.credential.provider.path</name>
<value>jceks://[email protected]:9001/user/backup/ssl.jceks</value>
</property>
```

Or via SparkConf "spark.hadoop.hadoop.security.credential.provider.path=jceks://[email protected]:9001/user/backup/ssl.jceks".

## Preparing the key stores

Key stores can be generated by `keytool` program. The reference documentation for this tool for
Expand Down