Skip to content

Commit

Permalink
Differentiate between URI and SSL settings for in-cluster vs. submiss…
Browse files Browse the repository at this point in the history
…ion (apache#281)
  • Loading branch information
mccheah authored and ash211 committed May 19, 2017
1 parent 88306b2 commit 8f6f0a0
Show file tree
Hide file tree
Showing 10 changed files with 341 additions and 130 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark.deploy.kubernetes

private[spark] object OptionRequirements {

def requireBothOrNeitherDefined(
opt1: Option[_],
opt2: Option[_],
errMessageWhenFirstIsMissing: String,
errMessageWhenSecondIsMissing: String): Unit = {
requireSecondIfFirstIsDefined(opt1, opt2, errMessageWhenSecondIsMissing)
requireSecondIfFirstIsDefined(opt2, opt1, errMessageWhenFirstIsMissing)
}

def requireSecondIfFirstIsDefined(
opt1: Option[_], opt2: Option[_], errMessageWhenSecondIsMissing: String): Unit = {
opt1.foreach { _ =>
require(opt2.isDefined, errMessageWhenSecondIsMissing)
}
}

def requireNandDefined(opt1: Option[_], opt2: Option[_], errMessage: String): Unit = {
opt1.foreach { _ => require(opt2.isEmpty, errMessage) }
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -362,6 +362,8 @@ package object config extends Logging {
.createOptional

private[spark] val RESOURCE_STAGING_SERVER_SSL_NAMESPACE = "kubernetes.resourceStagingServer"
private[spark] val RESOURCE_STAGING_SERVER_INTERNAL_SSL_NAMESPACE =
"kubernetes.resourceStagingServer.internal"
private[spark] val RESOURCE_STAGING_SERVER_CERT_PEM =
ConfigBuilder(s"spark.ssl.$RESOURCE_STAGING_SERVER_SSL_NAMESPACE.serverCertPem")
.doc("Certificate PEM file to use when having the resource staging server" +
Expand All @@ -370,47 +372,98 @@ package object config extends Logging {
.createOptional
private[spark] val RESOURCE_STAGING_SERVER_CLIENT_CERT_PEM =
ConfigBuilder(s"spark.ssl.$RESOURCE_STAGING_SERVER_SSL_NAMESPACE.clientCertPem")
.doc("Certificate PEM file to use when the client contacts the resource staging server.")
.doc("Certificate PEM file to use when the client contacts the resource staging server." +
" This must strictly be a path to a file on the submitting machine's disk.")
.stringConf
.createOptional
private[spark] val RESOURCE_STAGING_SERVER_INTERNAL_CLIENT_CERT_PEM =
ConfigBuilder(s"spark.ssl.$RESOURCE_STAGING_SERVER_INTERNAL_SSL_NAMESPACE.clientCertPem")
.doc("Certificate PEM file to use when the init-container contacts the resource staging" +
" server. If this is not provided, it defaults to the value of" +
" spark.ssl.kubernetes.resourceStagingServer.clientCertPem. This can be a URI with" +
" a scheme of local:// which denotes that the file is pre-mounted on the init-container's" +
" disk. A uri without a scheme or a scheme of file:// will result in this file being" +
" mounted from the submitting machine's disk as a secret into the pods.")
.stringConf
.createOptional

private[spark] val RESOURCE_STAGING_SERVER_KEYSTORE_PASSWORD_FILE =
ConfigBuilder(s"spark.ssl.$RESOURCE_STAGING_SERVER_SSL_NAMESPACE.keyStorePasswordFile")
.doc("File containing the keystore password for the Kubernetes dependency server.")
.doc("File containing the keystore password for the Kubernetes resource staging server.")
.stringConf
.createOptional

private[spark] val RESOURCE_STAGING_SERVER_KEYSTORE_KEY_PASSWORD_FILE =
ConfigBuilder(s"spark.ssl.$RESOURCE_STAGING_SERVER_SSL_NAMESPACE.keyPasswordFile")
.doc("File containing the key password for the Kubernetes dependency server.")
.doc("File containing the key password for the Kubernetes resource staging server.")
.stringConf
.createOptional

private[spark] val RESOURCE_STAGING_SERVER_SSL_ENABLED =
ConfigBuilder(s"spark.ssl.$RESOURCE_STAGING_SERVER_SSL_NAMESPACE.enabled")
.doc("Whether or not to use SSL when communicating with the dependency server.")
.doc("Whether or not to use SSL when communicating with the resource staging server.")
.booleanConf
.createOptional
private[spark] val RESOURCE_STAGING_SERVER_INTERNAL_SSL_ENABLED =
ConfigBuilder(s"spark.ssl.$RESOURCE_STAGING_SERVER_INTERNAL_SSL_NAMESPACE.enabled")
.doc("Whether or not to use SSL when communicating with the resource staging server from" +
" the init-container. If this is not provided, defaults to" +
" the value of spark.ssl.kubernetes.resourceStagingServer.enabled")
.booleanConf
.createOptional
private[spark] val RESOURCE_STAGING_SERVER_TRUSTSTORE_FILE =
ConfigBuilder(s"spark.ssl.$RESOURCE_STAGING_SERVER_SSL_NAMESPACE.trustStore")
.doc("File containing the trustStore to communicate with the Kubernetes dependency server.")
.doc("File containing the trustStore to communicate with the Kubernetes dependency server." +
" This must strictly be a path on the submitting machine's disk.")
.stringConf
.createOptional
private[spark] val RESOURCE_STAGING_SERVER_INTERNAL_TRUSTSTORE_FILE =
ConfigBuilder(s"spark.ssl.$RESOURCE_STAGING_SERVER_INTERNAL_SSL_NAMESPACE.trustStore")
.doc("File containing the trustStore to communicate with the Kubernetes dependency server" +
" from the init-container. If this is not provided, defaults to the value of" +
" spark.ssl.kubernetes.resourceStagingServer.trustStore. This can be a URI with a scheme" +
" of local:// indicating that the trustStore is pre-mounted on the init-container's" +
" disk. If no scheme, or a scheme of file:// is provided, this file is mounted from the" +
" submitting machine's disk as a Kubernetes secret into the pods.")
.stringConf
.createOptional
private[spark] val RESOURCE_STAGING_SERVER_TRUSTSTORE_PASSWORD =
ConfigBuilder(s"spark.ssl.$RESOURCE_STAGING_SERVER_SSL_NAMESPACE.trustStorePassword")
.doc("Password for the trustStore for talking to the dependency server.")
.doc("Password for the trustStore for communicating to the dependency server.")
.stringConf
.createOptional
private[spark] val RESOURCE_STAGING_SERVER_INTERNAL_TRUSTSTORE_PASSWORD =
ConfigBuilder(s"spark.ssl.$RESOURCE_STAGING_SERVER_INTERNAL_SSL_NAMESPACE.trustStorePassword")
.doc("Password for the trustStore for communicating to the dependency server from the" +
" init-container. If this is not provided, defaults to" +
" spark.ssl.kubernetes.resourceStagingServer.trustStorePassword.")
.stringConf
.createOptional
private[spark] val RESOURCE_STAGING_SERVER_TRUSTSTORE_TYPE =
ConfigBuilder(s"spark.ssl.$RESOURCE_STAGING_SERVER_SSL_NAMESPACE.trustStoreType")
.doc("Type of trustStore for communicating with the dependency server.")
.stringConf
.createOptional
private[spark] val RESOURCE_STAGING_SERVER_INTERNAL_TRUSTSTORE_TYPE =
ConfigBuilder(s"spark.ssl.$RESOURCE_STAGING_SERVER_INTERNAL_SSL_NAMESPACE.trustStoreType")
.doc("Type of trustStore for communicating with the dependency server from the" +
" init-container. If this is not provided, defaults to" +
" spark.ssl.kubernetes.resourceStagingServer.trustStoreType")
.stringConf
.createOptional

// Driver and Init-Container parameters for submission v2
private[spark] val RESOURCE_STAGING_SERVER_URI =
ConfigBuilder("spark.kubernetes.resourceStagingServer.uri")
.doc("Base URI for the Spark resource staging server")
.doc("Base URI for the Spark resource staging server.")
.stringConf
.createOptional

private[spark] val RESOURCE_STAGING_SERVER_INTERNAL_URI =
ConfigBuilder("spark.kubernetes.resourceStagingServer.internal.uri")
.doc("Base URI for the Spark resource staging server when the init-containers access it for" +
" downloading resources. If this is not provided, it defaults to the value provided in" +
" spark.kubernetes.resourceStagingServer.uri, the URI that the submission client uses to" +
" upload the resources from outside the cluster.")
.stringConf
.createOptional

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -115,6 +115,7 @@ package object constants {
private[spark] val INIT_CONTAINER_SUBMITTED_FILES_SECRET_KEY =
"downloadSubmittedFilesSecret"
private[spark] val INIT_CONTAINER_STAGING_SERVER_TRUSTSTORE_SECRET_KEY = "trustStore"
private[spark] val INIT_CONTAINER_STAGING_SERVER_CLIENT_CERT_SECRET_KEY = "ssl-certificate"
private[spark] val INIT_CONTAINER_CONFIG_MAP_KEY = "download-submitted-files"
private[spark] val INIT_CONTAINER_DOWNLOAD_JARS_VOLUME_NAME = "download-jars-volume"
private[spark] val INIT_CONTAINER_DOWNLOAD_FILES_VOLUME_NAME = "download-files"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,11 @@
package org.apache.spark.deploy.kubernetes.submit.v2

import org.apache.spark.{SparkConf, SSLOptions}
import org.apache.spark.deploy.kubernetes.{InitContainerResourceStagingServerSecretPluginImpl, SparkPodInitContainerBootstrap, SparkPodInitContainerBootstrapImpl}
import org.apache.spark.deploy.kubernetes.{InitContainerResourceStagingServerSecretPluginImpl, OptionRequirements, SparkPodInitContainerBootstrap, SparkPodInitContainerBootstrapImpl}
import org.apache.spark.deploy.kubernetes.config._
import org.apache.spark.deploy.kubernetes.constants._
import org.apache.spark.deploy.rest.kubernetes.v2.RetrofitClientFactoryImpl
import org.apache.spark.util.Utils

/**
* Interface that wraps the provision of everything the submission client needs to set up the
Expand All @@ -47,10 +48,51 @@ private[spark] class DriverInitContainerComponentsProviderImpl(
kubernetesAppId: String,
sparkJars: Seq[String],
sparkFiles: Seq[String],
resourceStagingServerSslOptions: SSLOptions)
resourceStagingServerExternalSslOptions: SSLOptions)
extends DriverInitContainerComponentsProvider {

private val maybeResourceStagingServerUri = sparkConf.get(RESOURCE_STAGING_SERVER_URI)
private val maybeResourceStagingServerInternalUri =
sparkConf.get(RESOURCE_STAGING_SERVER_INTERNAL_URI)
private val maybeResourceStagingServerInternalTrustStore =
sparkConf.get(RESOURCE_STAGING_SERVER_INTERNAL_TRUSTSTORE_FILE)
.orElse(sparkConf.get(RESOURCE_STAGING_SERVER_TRUSTSTORE_FILE))
private val maybeResourceStagingServerInternalTrustStorePassword =
sparkConf.get(RESOURCE_STAGING_SERVER_INTERNAL_TRUSTSTORE_PASSWORD)
.orElse(sparkConf.get(RESOURCE_STAGING_SERVER_TRUSTSTORE_PASSWORD))
private val maybeResourceStagingServerInternalTrustStoreType =
sparkConf.get(RESOURCE_STAGING_SERVER_INTERNAL_TRUSTSTORE_TYPE)
.orElse(sparkConf.get(RESOURCE_STAGING_SERVER_TRUSTSTORE_TYPE))
private val maybeResourceStagingServerInternalClientCert =
sparkConf.get(RESOURCE_STAGING_SERVER_INTERNAL_CLIENT_CERT_PEM)
.orElse(sparkConf.get(RESOURCE_STAGING_SERVER_CLIENT_CERT_PEM))
private val resourceStagingServerInternalSslEnabled =
sparkConf.get(RESOURCE_STAGING_SERVER_INTERNAL_SSL_ENABLED)
.orElse(sparkConf.get(RESOURCE_STAGING_SERVER_SSL_ENABLED))
.getOrElse(false)

OptionRequirements.requireNandDefined(
maybeResourceStagingServerInternalClientCert,
maybeResourceStagingServerInternalTrustStore,
"Cannot provide both a certificate file and a trustStore file for init-containers to" +
" use for contacting the resource staging server over TLS.")

require(maybeResourceStagingServerInternalTrustStore.forall { trustStore =>
Option(Utils.resolveURI(trustStore).getScheme).getOrElse("file") match {
case "file" | "local" => true
case _ => false
}
}, "TrustStore URI used for contacting the resource staging server from init containers must" +
" have no scheme, or scheme file://, or scheme local://.")

require(maybeResourceStagingServerInternalClientCert.forall { trustStore =>
Option(Utils.resolveURI(trustStore).getScheme).getOrElse("file") match {
case "file" | "local" => true
case _ => false
}
}, "Client cert file URI used for contacting the resource staging server from init containers" +
" must have no scheme, or scheme file://, or scheme local://.")

private val jarsDownloadPath = sparkConf.get(INIT_CONTAINER_JARS_DOWNLOAD_LOCATION)
private val filesDownloadPath = sparkConf.get(INIT_CONTAINER_FILES_DOWNLOAD_LOCATION)
private val maybeSecretName = maybeResourceStagingServerUri.map { _ =>
Expand All @@ -71,14 +113,20 @@ private[spark] class DriverInitContainerComponentsProviderImpl(
filesResourceId <- maybeSubmittedResourceIds.map(_.filesResourceId)
} yield {
new SubmittedDependencyInitContainerConfigPluginImpl(
stagingServerUri,
// Configure the init-container with the internal URI over the external URI.
maybeResourceStagingServerInternalUri.getOrElse(stagingServerUri),
jarsResourceId,
filesResourceId,
INIT_CONTAINER_SUBMITTED_JARS_SECRET_KEY,
INIT_CONTAINER_SUBMITTED_FILES_SECRET_KEY,
INIT_CONTAINER_STAGING_SERVER_TRUSTSTORE_SECRET_KEY,
INIT_CONTAINER_SECRET_VOLUME_MOUNT_PATH,
resourceStagingServerSslOptions)
INIT_CONTAINER_STAGING_SERVER_CLIENT_CERT_SECRET_KEY,
resourceStagingServerInternalSslEnabled,
maybeResourceStagingServerInternalTrustStore,
maybeResourceStagingServerInternalClientCert,
maybeResourceStagingServerInternalTrustStorePassword,
maybeResourceStagingServerInternalTrustStoreType,
INIT_CONTAINER_SECRET_VOLUME_MOUNT_PATH)
}
new SparkInitContainerConfigMapBuilderImpl(
sparkJars,
Expand Down Expand Up @@ -113,7 +161,7 @@ private[spark] class DriverInitContainerComponentsProviderImpl(
stagingServerUri,
sparkJars,
sparkFiles,
resourceStagingServerSslOptions,
resourceStagingServerExternalSslOptions,
RetrofitClientFactoryImpl)
}
}
Expand All @@ -133,7 +181,9 @@ private[spark] class DriverInitContainerComponentsProviderImpl(
INIT_CONTAINER_SUBMITTED_JARS_SECRET_KEY,
INIT_CONTAINER_SUBMITTED_FILES_SECRET_KEY,
INIT_CONTAINER_STAGING_SERVER_TRUSTSTORE_SECRET_KEY,
resourceStagingServerSslOptions)
INIT_CONTAINER_STAGING_SERVER_CLIENT_CERT_SECRET_KEY,
maybeResourceStagingServerInternalTrustStore,
maybeResourceStagingServerInternalClientCert)
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,10 @@
*/
package org.apache.spark.deploy.kubernetes.submit.v2

import org.apache.spark.SSLOptions
import org.apache.spark.SparkException
import org.apache.spark.deploy.kubernetes.config._
import org.apache.spark.deploy.kubernetes.constants._
import org.apache.spark.internal.config.OptionalConfigEntry
import org.apache.spark.util.Utils

private[spark] trait SubmittedDependencyInitContainerConfigPlugin {
/**
Expand All @@ -34,36 +35,62 @@ private[spark] trait SubmittedDependencyInitContainerConfigPlugin {
}

private[spark] class SubmittedDependencyInitContainerConfigPluginImpl(
resourceStagingServerUri: String,
internalResourceStagingServerUri: String,
jarsResourceId: String,
filesResourceId: String,
jarsSecretKey: String,
filesSecretKey: String,
trustStoreSecretKey: String,
secretsVolumeMountPath: String,
resourceStagingServiceSslOptions: SSLOptions)
clientCertSecretKey: String,
resourceStagingServerSslEnabled: Boolean,
maybeInternalTrustStoreUri: Option[String],
maybeInternalClientCertUri: Option[String],
maybeInternalTrustStorePassword: Option[String],
maybeInternalTrustStoreType: Option[String],
secretsVolumeMountPath: String)
extends SubmittedDependencyInitContainerConfigPlugin {

override def configurationsToFetchSubmittedDependencies(): Map[String, String] = {
Map[String, String](
RESOURCE_STAGING_SERVER_URI.key -> resourceStagingServerUri,
RESOURCE_STAGING_SERVER_URI.key -> internalResourceStagingServerUri,
INIT_CONTAINER_DOWNLOAD_JARS_RESOURCE_IDENTIFIER.key -> jarsResourceId,
INIT_CONTAINER_DOWNLOAD_JARS_SECRET_LOCATION.key ->
s"$secretsVolumeMountPath/$jarsSecretKey",
INIT_CONTAINER_DOWNLOAD_FILES_RESOURCE_IDENTIFIER.key -> filesResourceId,
INIT_CONTAINER_DOWNLOAD_FILES_SECRET_LOCATION.key ->
s"$secretsVolumeMountPath/$filesSecretKey",
RESOURCE_STAGING_SERVER_SSL_ENABLED.key ->
resourceStagingServiceSslOptions.enabled.toString) ++
resourceStagingServiceSslOptions.trustStore.map { _ =>
(RESOURCE_STAGING_SERVER_TRUSTSTORE_FILE.key,
s"$secretsVolumeMountPath/$trustStoreSecretKey")
}.toMap ++
resourceStagingServiceSslOptions.trustStorePassword.map { password =>
RESOURCE_STAGING_SERVER_SSL_ENABLED.key -> resourceStagingServerSslEnabled.toString) ++
resolveSecretPath(
maybeInternalTrustStoreUri,
trustStoreSecretKey,
RESOURCE_STAGING_SERVER_TRUSTSTORE_FILE,
"TrustStore URI") ++
resolveSecretPath(
maybeInternalClientCertUri,
clientCertSecretKey,
RESOURCE_STAGING_SERVER_CLIENT_CERT_PEM,
"Client certificate URI") ++
maybeInternalTrustStorePassword.map { password =>
(RESOURCE_STAGING_SERVER_TRUSTSTORE_PASSWORD.key, password)
}.toMap ++
resourceStagingServiceSslOptions.trustStoreType.map { storeType =>
maybeInternalTrustStoreType.map { storeType =>
(RESOURCE_STAGING_SERVER_TRUSTSTORE_TYPE.key, storeType)
}.toMap
}

private def resolveSecretPath(
maybeUri: Option[String],
secretKey: String,
configEntry: OptionalConfigEntry[String],
uriType: String): Map[String, String] = {
maybeUri.map(Utils.resolveURI).map { uri =>
val resolvedPath = Option(uri.getScheme).getOrElse("file") match {
case "file" => s"$secretsVolumeMountPath/$secretKey"
case "local" => uri.getPath
case invalid => throw new SparkException(s"$uriType has invalid scheme $invalid must be" +
s" local://, file://, or empty.")
}
(configEntry.key, resolvedPath)
}.toMap
}
}
Loading

0 comments on commit 8f6f0a0

Please sign in to comment.