diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/OptionRequirements.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/OptionRequirements.scala new file mode 100644 index 0000000000000..eda43de0a9a5b --- /dev/null +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/OptionRequirements.scala @@ -0,0 +1,40 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.deploy.kubernetes + +private[spark] object OptionRequirements { + + def requireBothOrNeitherDefined( + opt1: Option[_], + opt2: Option[_], + errMessageWhenFirstIsMissing: String, + errMessageWhenSecondIsMissing: String): Unit = { + requireSecondIfFirstIsDefined(opt1, opt2, errMessageWhenSecondIsMissing) + requireSecondIfFirstIsDefined(opt2, opt1, errMessageWhenFirstIsMissing) + } + + def requireSecondIfFirstIsDefined( + opt1: Option[_], opt2: Option[_], errMessageWhenSecondIsMissing: String): Unit = { + opt1.foreach { _ => + require(opt2.isDefined, errMessageWhenSecondIsMissing) + } + } + + def requireNandDefined(opt1: Option[_], opt2: Option[_], errMessage: String): Unit = { + opt1.foreach { _ => require(opt2.isEmpty, errMessage) } + } +} diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/config.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/config.scala index ab442131ad271..759a7df505829 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/config.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/config.scala @@ -362,6 +362,8 @@ package object config extends Logging { .createOptional private[spark] val RESOURCE_STAGING_SERVER_SSL_NAMESPACE = "kubernetes.resourceStagingServer" + private[spark] val RESOURCE_STAGING_SERVER_INTERNAL_SSL_NAMESPACE = + "kubernetes.resourceStagingServer.internal" private[spark] val RESOURCE_STAGING_SERVER_CERT_PEM = ConfigBuilder(s"spark.ssl.$RESOURCE_STAGING_SERVER_SSL_NAMESPACE.serverCertPem") .doc("Certificate PEM file to use when having the resource staging server" + @@ -370,35 +372,70 @@ package object config extends Logging { .createOptional private[spark] val RESOURCE_STAGING_SERVER_CLIENT_CERT_PEM = ConfigBuilder(s"spark.ssl.$RESOURCE_STAGING_SERVER_SSL_NAMESPACE.clientCertPem") - .doc("Certificate PEM file to use when the client contacts the resource staging server.") + .doc("Certificate PEM file to use when the client contacts the resource staging server." + + " This must strictly be a path to a file on the submitting machine's disk.") + .stringConf + .createOptional + private[spark] val RESOURCE_STAGING_SERVER_INTERNAL_CLIENT_CERT_PEM = + ConfigBuilder(s"spark.ssl.$RESOURCE_STAGING_SERVER_INTERNAL_SSL_NAMESPACE.clientCertPem") + .doc("Certificate PEM file to use when the init-container contacts the resource staging" + + " server. If this is not provided, it defaults to the value of" + + " spark.ssl.kubernetes.resourceStagingServer.clientCertPem. This can be a URI with" + + " a scheme of local:// which denotes that the file is pre-mounted on the init-container's" + + " disk. A uri without a scheme or a scheme of file:// will result in this file being" + + " mounted from the submitting machine's disk as a secret into the pods.") .stringConf .createOptional - private[spark] val RESOURCE_STAGING_SERVER_KEYSTORE_PASSWORD_FILE = ConfigBuilder(s"spark.ssl.$RESOURCE_STAGING_SERVER_SSL_NAMESPACE.keyStorePasswordFile") - .doc("File containing the keystore password for the Kubernetes dependency server.") + .doc("File containing the keystore password for the Kubernetes resource staging server.") .stringConf .createOptional private[spark] val RESOURCE_STAGING_SERVER_KEYSTORE_KEY_PASSWORD_FILE = ConfigBuilder(s"spark.ssl.$RESOURCE_STAGING_SERVER_SSL_NAMESPACE.keyPasswordFile") - .doc("File containing the key password for the Kubernetes dependency server.") + .doc("File containing the key password for the Kubernetes resource staging server.") .stringConf .createOptional private[spark] val RESOURCE_STAGING_SERVER_SSL_ENABLED = ConfigBuilder(s"spark.ssl.$RESOURCE_STAGING_SERVER_SSL_NAMESPACE.enabled") - .doc("Whether or not to use SSL when communicating with the dependency server.") + .doc("Whether or not to use SSL when communicating with the resource staging server.") + .booleanConf + .createOptional + private[spark] val RESOURCE_STAGING_SERVER_INTERNAL_SSL_ENABLED = + ConfigBuilder(s"spark.ssl.$RESOURCE_STAGING_SERVER_INTERNAL_SSL_NAMESPACE.enabled") + .doc("Whether or not to use SSL when communicating with the resource staging server from" + + " the init-container. If this is not provided, defaults to" + + " the value of spark.ssl.kubernetes.resourceStagingServer.enabled") .booleanConf .createOptional private[spark] val RESOURCE_STAGING_SERVER_TRUSTSTORE_FILE = ConfigBuilder(s"spark.ssl.$RESOURCE_STAGING_SERVER_SSL_NAMESPACE.trustStore") - .doc("File containing the trustStore to communicate with the Kubernetes dependency server.") + .doc("File containing the trustStore to communicate with the Kubernetes dependency server." + + " This must strictly be a path on the submitting machine's disk.") + .stringConf + .createOptional + private[spark] val RESOURCE_STAGING_SERVER_INTERNAL_TRUSTSTORE_FILE = + ConfigBuilder(s"spark.ssl.$RESOURCE_STAGING_SERVER_INTERNAL_SSL_NAMESPACE.trustStore") + .doc("File containing the trustStore to communicate with the Kubernetes dependency server" + + " from the init-container. If this is not provided, defaults to the value of" + + " spark.ssl.kubernetes.resourceStagingServer.trustStore. This can be a URI with a scheme" + + " of local:// indicating that the trustStore is pre-mounted on the init-container's" + + " disk. If no scheme, or a scheme of file:// is provided, this file is mounted from the" + + " submitting machine's disk as a Kubernetes secret into the pods.") .stringConf .createOptional private[spark] val RESOURCE_STAGING_SERVER_TRUSTSTORE_PASSWORD = ConfigBuilder(s"spark.ssl.$RESOURCE_STAGING_SERVER_SSL_NAMESPACE.trustStorePassword") - .doc("Password for the trustStore for talking to the dependency server.") + .doc("Password for the trustStore for communicating to the dependency server.") + .stringConf + .createOptional + private[spark] val RESOURCE_STAGING_SERVER_INTERNAL_TRUSTSTORE_PASSWORD = + ConfigBuilder(s"spark.ssl.$RESOURCE_STAGING_SERVER_INTERNAL_SSL_NAMESPACE.trustStorePassword") + .doc("Password for the trustStore for communicating to the dependency server from the" + + " init-container. If this is not provided, defaults to" + + " spark.ssl.kubernetes.resourceStagingServer.trustStorePassword.") .stringConf .createOptional private[spark] val RESOURCE_STAGING_SERVER_TRUSTSTORE_TYPE = @@ -406,11 +443,27 @@ package object config extends Logging { .doc("Type of trustStore for communicating with the dependency server.") .stringConf .createOptional + private[spark] val RESOURCE_STAGING_SERVER_INTERNAL_TRUSTSTORE_TYPE = + ConfigBuilder(s"spark.ssl.$RESOURCE_STAGING_SERVER_INTERNAL_SSL_NAMESPACE.trustStoreType") + .doc("Type of trustStore for communicating with the dependency server from the" + + " init-container. If this is not provided, defaults to" + + " spark.ssl.kubernetes.resourceStagingServer.trustStoreType") + .stringConf + .createOptional // Driver and Init-Container parameters for submission v2 private[spark] val RESOURCE_STAGING_SERVER_URI = ConfigBuilder("spark.kubernetes.resourceStagingServer.uri") - .doc("Base URI for the Spark resource staging server") + .doc("Base URI for the Spark resource staging server.") + .stringConf + .createOptional + + private[spark] val RESOURCE_STAGING_SERVER_INTERNAL_URI = + ConfigBuilder("spark.kubernetes.resourceStagingServer.internal.uri") + .doc("Base URI for the Spark resource staging server when the init-containers access it for" + + " downloading resources. If this is not provided, it defaults to the value provided in" + + " spark.kubernetes.resourceStagingServer.uri, the URI that the submission client uses to" + + " upload the resources from outside the cluster.") .stringConf .createOptional diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/constants.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/constants.scala index 8d0965078aaa8..ea11ca2ec8f21 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/constants.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/constants.scala @@ -115,6 +115,7 @@ package object constants { private[spark] val INIT_CONTAINER_SUBMITTED_FILES_SECRET_KEY = "downloadSubmittedFilesSecret" private[spark] val INIT_CONTAINER_STAGING_SERVER_TRUSTSTORE_SECRET_KEY = "trustStore" + private[spark] val INIT_CONTAINER_STAGING_SERVER_CLIENT_CERT_SECRET_KEY = "ssl-certificate" private[spark] val INIT_CONTAINER_CONFIG_MAP_KEY = "download-submitted-files" private[spark] val INIT_CONTAINER_DOWNLOAD_JARS_VOLUME_NAME = "download-jars-volume" private[spark] val INIT_CONTAINER_DOWNLOAD_FILES_VOLUME_NAME = "download-files" diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/submit/v2/DriverInitContainerComponentsProvider.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/submit/v2/DriverInitContainerComponentsProvider.scala index 7f6ae2ec47675..0a5e6cd216011 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/submit/v2/DriverInitContainerComponentsProvider.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/submit/v2/DriverInitContainerComponentsProvider.scala @@ -17,10 +17,11 @@ package org.apache.spark.deploy.kubernetes.submit.v2 import org.apache.spark.{SparkConf, SSLOptions} -import org.apache.spark.deploy.kubernetes.{InitContainerResourceStagingServerSecretPluginImpl, SparkPodInitContainerBootstrap, SparkPodInitContainerBootstrapImpl} +import org.apache.spark.deploy.kubernetes.{InitContainerResourceStagingServerSecretPluginImpl, OptionRequirements, SparkPodInitContainerBootstrap, SparkPodInitContainerBootstrapImpl} import org.apache.spark.deploy.kubernetes.config._ import org.apache.spark.deploy.kubernetes.constants._ import org.apache.spark.deploy.rest.kubernetes.v2.RetrofitClientFactoryImpl +import org.apache.spark.util.Utils /** * Interface that wraps the provision of everything the submission client needs to set up the @@ -47,10 +48,51 @@ private[spark] class DriverInitContainerComponentsProviderImpl( kubernetesAppId: String, sparkJars: Seq[String], sparkFiles: Seq[String], - resourceStagingServerSslOptions: SSLOptions) + resourceStagingServerExternalSslOptions: SSLOptions) extends DriverInitContainerComponentsProvider { private val maybeResourceStagingServerUri = sparkConf.get(RESOURCE_STAGING_SERVER_URI) + private val maybeResourceStagingServerInternalUri = + sparkConf.get(RESOURCE_STAGING_SERVER_INTERNAL_URI) + private val maybeResourceStagingServerInternalTrustStore = + sparkConf.get(RESOURCE_STAGING_SERVER_INTERNAL_TRUSTSTORE_FILE) + .orElse(sparkConf.get(RESOURCE_STAGING_SERVER_TRUSTSTORE_FILE)) + private val maybeResourceStagingServerInternalTrustStorePassword = + sparkConf.get(RESOURCE_STAGING_SERVER_INTERNAL_TRUSTSTORE_PASSWORD) + .orElse(sparkConf.get(RESOURCE_STAGING_SERVER_TRUSTSTORE_PASSWORD)) + private val maybeResourceStagingServerInternalTrustStoreType = + sparkConf.get(RESOURCE_STAGING_SERVER_INTERNAL_TRUSTSTORE_TYPE) + .orElse(sparkConf.get(RESOURCE_STAGING_SERVER_TRUSTSTORE_TYPE)) + private val maybeResourceStagingServerInternalClientCert = + sparkConf.get(RESOURCE_STAGING_SERVER_INTERNAL_CLIENT_CERT_PEM) + .orElse(sparkConf.get(RESOURCE_STAGING_SERVER_CLIENT_CERT_PEM)) + private val resourceStagingServerInternalSslEnabled = + sparkConf.get(RESOURCE_STAGING_SERVER_INTERNAL_SSL_ENABLED) + .orElse(sparkConf.get(RESOURCE_STAGING_SERVER_SSL_ENABLED)) + .getOrElse(false) + + OptionRequirements.requireNandDefined( + maybeResourceStagingServerInternalClientCert, + maybeResourceStagingServerInternalTrustStore, + "Cannot provide both a certificate file and a trustStore file for init-containers to" + + " use for contacting the resource staging server over TLS.") + + require(maybeResourceStagingServerInternalTrustStore.forall { trustStore => + Option(Utils.resolveURI(trustStore).getScheme).getOrElse("file") match { + case "file" | "local" => true + case _ => false + } + }, "TrustStore URI used for contacting the resource staging server from init containers must" + + " have no scheme, or scheme file://, or scheme local://.") + + require(maybeResourceStagingServerInternalClientCert.forall { trustStore => + Option(Utils.resolveURI(trustStore).getScheme).getOrElse("file") match { + case "file" | "local" => true + case _ => false + } + }, "Client cert file URI used for contacting the resource staging server from init containers" + + " must have no scheme, or scheme file://, or scheme local://.") + private val jarsDownloadPath = sparkConf.get(INIT_CONTAINER_JARS_DOWNLOAD_LOCATION) private val filesDownloadPath = sparkConf.get(INIT_CONTAINER_FILES_DOWNLOAD_LOCATION) private val maybeSecretName = maybeResourceStagingServerUri.map { _ => @@ -71,14 +113,20 @@ private[spark] class DriverInitContainerComponentsProviderImpl( filesResourceId <- maybeSubmittedResourceIds.map(_.filesResourceId) } yield { new SubmittedDependencyInitContainerConfigPluginImpl( - stagingServerUri, + // Configure the init-container with the internal URI over the external URI. + maybeResourceStagingServerInternalUri.getOrElse(stagingServerUri), jarsResourceId, filesResourceId, INIT_CONTAINER_SUBMITTED_JARS_SECRET_KEY, INIT_CONTAINER_SUBMITTED_FILES_SECRET_KEY, INIT_CONTAINER_STAGING_SERVER_TRUSTSTORE_SECRET_KEY, - INIT_CONTAINER_SECRET_VOLUME_MOUNT_PATH, - resourceStagingServerSslOptions) + INIT_CONTAINER_STAGING_SERVER_CLIENT_CERT_SECRET_KEY, + resourceStagingServerInternalSslEnabled, + maybeResourceStagingServerInternalTrustStore, + maybeResourceStagingServerInternalClientCert, + maybeResourceStagingServerInternalTrustStorePassword, + maybeResourceStagingServerInternalTrustStoreType, + INIT_CONTAINER_SECRET_VOLUME_MOUNT_PATH) } new SparkInitContainerConfigMapBuilderImpl( sparkJars, @@ -113,7 +161,7 @@ private[spark] class DriverInitContainerComponentsProviderImpl( stagingServerUri, sparkJars, sparkFiles, - resourceStagingServerSslOptions, + resourceStagingServerExternalSslOptions, RetrofitClientFactoryImpl) } } @@ -133,7 +181,9 @@ private[spark] class DriverInitContainerComponentsProviderImpl( INIT_CONTAINER_SUBMITTED_JARS_SECRET_KEY, INIT_CONTAINER_SUBMITTED_FILES_SECRET_KEY, INIT_CONTAINER_STAGING_SERVER_TRUSTSTORE_SECRET_KEY, - resourceStagingServerSslOptions) + INIT_CONTAINER_STAGING_SERVER_CLIENT_CERT_SECRET_KEY, + maybeResourceStagingServerInternalTrustStore, + maybeResourceStagingServerInternalClientCert) } } diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/submit/v2/SubmittedDependencyInitContainerConfigPlugin.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/submit/v2/SubmittedDependencyInitContainerConfigPlugin.scala index bc9abc4eaba81..1b086e60d3d0d 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/submit/v2/SubmittedDependencyInitContainerConfigPlugin.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/submit/v2/SubmittedDependencyInitContainerConfigPlugin.scala @@ -16,9 +16,10 @@ */ package org.apache.spark.deploy.kubernetes.submit.v2 -import org.apache.spark.SSLOptions +import org.apache.spark.SparkException import org.apache.spark.deploy.kubernetes.config._ -import org.apache.spark.deploy.kubernetes.constants._ +import org.apache.spark.internal.config.OptionalConfigEntry +import org.apache.spark.util.Utils private[spark] trait SubmittedDependencyInitContainerConfigPlugin { /** @@ -34,36 +35,62 @@ private[spark] trait SubmittedDependencyInitContainerConfigPlugin { } private[spark] class SubmittedDependencyInitContainerConfigPluginImpl( - resourceStagingServerUri: String, + internalResourceStagingServerUri: String, jarsResourceId: String, filesResourceId: String, jarsSecretKey: String, filesSecretKey: String, trustStoreSecretKey: String, - secretsVolumeMountPath: String, - resourceStagingServiceSslOptions: SSLOptions) + clientCertSecretKey: String, + resourceStagingServerSslEnabled: Boolean, + maybeInternalTrustStoreUri: Option[String], + maybeInternalClientCertUri: Option[String], + maybeInternalTrustStorePassword: Option[String], + maybeInternalTrustStoreType: Option[String], + secretsVolumeMountPath: String) extends SubmittedDependencyInitContainerConfigPlugin { override def configurationsToFetchSubmittedDependencies(): Map[String, String] = { Map[String, String]( - RESOURCE_STAGING_SERVER_URI.key -> resourceStagingServerUri, + RESOURCE_STAGING_SERVER_URI.key -> internalResourceStagingServerUri, INIT_CONTAINER_DOWNLOAD_JARS_RESOURCE_IDENTIFIER.key -> jarsResourceId, INIT_CONTAINER_DOWNLOAD_JARS_SECRET_LOCATION.key -> s"$secretsVolumeMountPath/$jarsSecretKey", INIT_CONTAINER_DOWNLOAD_FILES_RESOURCE_IDENTIFIER.key -> filesResourceId, INIT_CONTAINER_DOWNLOAD_FILES_SECRET_LOCATION.key -> s"$secretsVolumeMountPath/$filesSecretKey", - RESOURCE_STAGING_SERVER_SSL_ENABLED.key -> - resourceStagingServiceSslOptions.enabled.toString) ++ - resourceStagingServiceSslOptions.trustStore.map { _ => - (RESOURCE_STAGING_SERVER_TRUSTSTORE_FILE.key, - s"$secretsVolumeMountPath/$trustStoreSecretKey") - }.toMap ++ - resourceStagingServiceSslOptions.trustStorePassword.map { password => + RESOURCE_STAGING_SERVER_SSL_ENABLED.key -> resourceStagingServerSslEnabled.toString) ++ + resolveSecretPath( + maybeInternalTrustStoreUri, + trustStoreSecretKey, + RESOURCE_STAGING_SERVER_TRUSTSTORE_FILE, + "TrustStore URI") ++ + resolveSecretPath( + maybeInternalClientCertUri, + clientCertSecretKey, + RESOURCE_STAGING_SERVER_CLIENT_CERT_PEM, + "Client certificate URI") ++ + maybeInternalTrustStorePassword.map { password => (RESOURCE_STAGING_SERVER_TRUSTSTORE_PASSWORD.key, password) }.toMap ++ - resourceStagingServiceSslOptions.trustStoreType.map { storeType => + maybeInternalTrustStoreType.map { storeType => (RESOURCE_STAGING_SERVER_TRUSTSTORE_TYPE.key, storeType) }.toMap } + + private def resolveSecretPath( + maybeUri: Option[String], + secretKey: String, + configEntry: OptionalConfigEntry[String], + uriType: String): Map[String, String] = { + maybeUri.map(Utils.resolveURI).map { uri => + val resolvedPath = Option(uri.getScheme).getOrElse("file") match { + case "file" => s"$secretsVolumeMountPath/$secretKey" + case "local" => uri.getPath + case invalid => throw new SparkException(s"$uriType has invalid scheme $invalid must be" + + s" local://, file://, or empty.") + } + (configEntry.key, resolvedPath) + }.toMap + } } diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/submit/v2/SubmittedDependencySecretBuilder.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/submit/v2/SubmittedDependencySecretBuilder.scala index b8fa43d0573f7..1a33757e45aa0 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/submit/v2/SubmittedDependencySecretBuilder.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/submit/v2/SubmittedDependencySecretBuilder.scala @@ -16,12 +16,14 @@ */ package org.apache.spark.deploy.kubernetes.submit.v2 +import java.io.File + import com.google.common.base.Charsets import com.google.common.io.{BaseEncoding, Files} import io.fabric8.kubernetes.api.model.{Secret, SecretBuilder} import scala.collection.JavaConverters._ -import org.apache.spark.SSLOptions +import org.apache.spark.util.Utils private[spark] trait SubmittedDependencySecretBuilder { /** @@ -32,28 +34,30 @@ private[spark] trait SubmittedDependencySecretBuilder { } private[spark] class SubmittedDependencySecretBuilderImpl( - secretName: String, - jarsResourceSecret: String, - filesResourceSecret: String, - jarsSecretKey: String, - filesSecretKey: String, - trustStoreSecretKey: String, - resourceStagingServerSslOptions: SSLOptions) + secretName: String, + jarsResourceSecret: String, + filesResourceSecret: String, + jarsSecretKey: String, + filesSecretKey: String, + trustStoreSecretKey: String, + clientCertSecretKey: String, + internalTrustStoreUri: Option[String], + internalClientCertUri: Option[String]) extends SubmittedDependencySecretBuilder { override def build(): Secret = { - val trustStoreBase64 = resourceStagingServerSslOptions.trustStore.map { trustStoreFile => - require(trustStoreFile.isFile, "Dependency server trustStore provided at" + - trustStoreFile.getAbsolutePath + " does not exist or is not a file.") - (trustStoreSecretKey, BaseEncoding.base64().encode(Files.toByteArray(trustStoreFile))) - }.toMap + val trustStoreBase64 = convertFileToBase64IfSubmitterLocal( + trustStoreSecretKey, internalTrustStoreUri) + val clientCertBase64 = convertFileToBase64IfSubmitterLocal( + clientCertSecretKey, internalClientCertUri) val jarsSecretBase64 = BaseEncoding.base64().encode(jarsResourceSecret.getBytes(Charsets.UTF_8)) val filesSecretBase64 = BaseEncoding.base64().encode( filesResourceSecret.getBytes(Charsets.UTF_8)) val secretData = Map( jarsSecretKey -> jarsSecretBase64, filesSecretKey -> filesSecretBase64) ++ - trustStoreBase64 + trustStoreBase64 ++ + clientCertBase64 val kubernetesSecret = new SecretBuilder() .withNewMetadata() .withName(secretName) @@ -62,4 +66,16 @@ private[spark] class SubmittedDependencySecretBuilderImpl( .build() kubernetesSecret } + + private def convertFileToBase64IfSubmitterLocal(secretKey: String, secretUri: Option[String]) + : Map[String, String] = { + secretUri.filter { trustStore => + Option(Utils.resolveURI(trustStore).getScheme).getOrElse("file") == "file" + }.map { uri => + val file = new File(Utils.resolveURI(uri).getPath) + require(file.isFile, "Dependency server trustStore provided at" + + file.getAbsolutePath + " does not exist or is not a file.") + (secretKey, BaseEncoding.base64().encode(Files.toByteArray(file))) + }.toMap + } } diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/rest/kubernetes/v2/ResourceStagingServerSslOptionsProvider.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/rest/kubernetes/v2/ResourceStagingServerSslOptionsProvider.scala index 6b88426d00e72..0dd0b08433def 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/rest/kubernetes/v2/ResourceStagingServerSslOptionsProvider.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/rest/kubernetes/v2/ResourceStagingServerSslOptionsProvider.scala @@ -23,7 +23,8 @@ import com.google.common.base.Charsets import com.google.common.io.Files import org.apache.commons.lang3.RandomStringUtils -import org.apache.spark.{SecurityManager => SparkSecurityManager, SparkConf, SparkException, SSLOptions} +import org.apache.spark.{SecurityManager, SparkConf, SparkException, SSLOptions} +import org.apache.spark.deploy.kubernetes.OptionRequirements import org.apache.spark.deploy.kubernetes.config._ import org.apache.spark.deploy.rest.kubernetes.v1.PemsToKeyStoreConverter import org.apache.spark.internal.Logging @@ -38,7 +39,7 @@ private[spark] class ResourceStagingServerSslOptionsProviderImpl(sparkConf: Spar private val SECURE_RANDOM = new SecureRandom() def getSslOptions: SSLOptions = { - val baseSslOptions = new SparkSecurityManager(sparkConf) + val baseSslOptions = new SecurityManager(sparkConf) .getSSLOptions(RESOURCE_STAGING_SERVER_SSL_NAMESPACE) val maybeKeyPem = sparkConf.get(RESOURCE_STAGING_SERVER_KEY_PEM) val maybeServerCertPem = sparkConf.get(RESOURCE_STAGING_SERVER_CERT_PEM) @@ -47,39 +48,47 @@ private[spark] class ResourceStagingServerSslOptionsProviderImpl(sparkConf: Spar val maybeClientCertPem = sparkConf.get(RESOURCE_STAGING_SERVER_CLIENT_CERT_PEM) logSslConfigurations( - baseSslOptions, - maybeKeyPem, - maybeServerCertPem, - maybeKeyStorePasswordFile, - maybeKeyPasswordFile, - maybeClientCertPem) - - requireNandDefined(baseSslOptions.keyStore, maybeKeyPem, - "Shouldn't provide both key PEM and keyStore files for TLS.") - requireNandDefined(baseSslOptions.keyStore, maybeServerCertPem, - "Shouldn't provide both certificate PEM and keyStore files for TLS.") - requireNandDefined(baseSslOptions.keyStorePassword, maybeKeyStorePasswordFile, - "Shouldn't provide both the keyStore password value and the keyStore password file.") - requireNandDefined(baseSslOptions.keyPassword, maybeKeyPasswordFile, - "Shouldn't provide both the keyStore key password value and the keyStore key password file.") - requireBothOrNeitherDefined( - maybeKeyPem, - maybeServerCertPem, - "When providing a certificate PEM file, the key PEM file must also be provided.", - "When providing a key PEM file, the certificate PEM file must also be provided.") - requireNandDefined(baseSslOptions.trustStore, maybeClientCertPem, - "Shouldn't provide both the trustStore and a client certificate PEM file.") + baseSslOptions, + maybeKeyPem, + maybeServerCertPem, + maybeKeyStorePasswordFile, + maybeKeyPasswordFile, + maybeClientCertPem) + + OptionRequirements.requireNandDefined( + baseSslOptions.keyStore, + maybeKeyPem, + "Shouldn't provide both key PEM and keyStore files for TLS.") + OptionRequirements.requireNandDefined( + baseSslOptions.keyStore, + maybeServerCertPem, + "Shouldn't provide both certificate PEM and keyStore files for TLS.") + OptionRequirements.requireNandDefined( + baseSslOptions.keyStorePassword, + maybeKeyStorePasswordFile, + "Shouldn't provide both the keyStore password value and the keyStore password file.") + OptionRequirements.requireNandDefined( + baseSslOptions.keyPassword, + maybeKeyPasswordFile, + "Shouldn't provide both a keyStore key password value and a keyStore key password file.") + OptionRequirements.requireBothOrNeitherDefined( + maybeKeyPem, + maybeServerCertPem, + "When providing a certificate PEM file, the key PEM file must also be provided.", + "When providing a key PEM file, the certificate PEM file must also be provided.") + OptionRequirements.requireNandDefined(baseSslOptions.trustStore, maybeClientCertPem, + "Shouldn't provide both the trustStore and a client certificate PEM file.") val resolvedKeyStorePassword = baseSslOptions.keyStorePassword - .orElse(maybeKeyStorePasswordFile.map { keyStorePasswordFile => - safeFileToString(keyStorePasswordFile, "KeyStore password file") - }) - .orElse(maybeKeyPem.map { _ => randomPassword()}) + .orElse(maybeKeyStorePasswordFile.map { keyStorePasswordFile => + safeFileToString(keyStorePasswordFile, "KeyStore password file") + }) + .orElse(maybeKeyPem.map { _ => randomPassword()}) val resolvedKeyStoreKeyPassword = baseSslOptions.keyPassword - .orElse(maybeKeyPasswordFile.map { keyPasswordFile => - safeFileToString(keyPasswordFile, "KeyStore key password file") - }) - .orElse(maybeKeyPem.map { _ => randomPassword()}) + .orElse(maybeKeyPasswordFile.map { keyPasswordFile => + safeFileToString(keyPasswordFile, "KeyStore key password file") + }) + .orElse(maybeKeyPem.map { _ => randomPassword()}) val resolvedKeyStore = baseSslOptions.keyStore.orElse { for { keyPem <- maybeKeyPem @@ -90,16 +99,16 @@ private[spark] class ResourceStagingServerSslOptionsProviderImpl(sparkConf: Spar val keyPemFile = new File(keyPem) val certPemFile = new File(certPem) PemsToKeyStoreConverter.convertPemsToTempKeyStoreFile( - keyPemFile, - certPemFile, - "key", - keyStorePassword, - keyPassword, - baseSslOptions.keyStoreType) + keyPemFile, + certPemFile, + "key", + keyStorePassword, + keyPassword, + baseSslOptions.keyStoreType) } } val resolvedTrustStorePassword = baseSslOptions.trustStorePassword - .orElse(maybeClientCertPem.map( _ => "defaultTrustStorePassword")) + .orElse(maybeClientCertPem.map( _ => "defaultTrustStorePassword")) val resolvedTrustStore = baseSslOptions.trustStore.orElse { for { clientCertPem <- maybeClientCertPem @@ -107,16 +116,16 @@ private[spark] class ResourceStagingServerSslOptionsProviderImpl(sparkConf: Spar } yield { val certPemFile = new File(clientCertPem) PemsToKeyStoreConverter.convertCertPemToTempTrustStoreFile( - certPemFile, - trustStorePassword, - baseSslOptions.trustStoreType) + certPemFile, + trustStorePassword, + baseSslOptions.trustStoreType) } } baseSslOptions.copy( - keyStore = resolvedKeyStore, - keyStorePassword = resolvedKeyStorePassword, - keyPassword = resolvedKeyStoreKeyPassword, - trustStore = resolvedTrustStore) + keyStore = resolvedKeyStore, + keyStorePassword = resolvedKeyStorePassword, + keyPassword = resolvedKeyStoreKeyPassword, + trustStore = resolvedTrustStore) } private def logSslConfigurations( @@ -140,26 +149,6 @@ private[spark] class ResourceStagingServerSslOptionsProviderImpl(sparkConf: Spar logDebug(s"Client-side certificate PEM: ${maybeClientCertPem.getOrElse("N/A")}") } - private def requireBothOrNeitherDefined( - opt1: Option[_], - opt2: Option[_], - errMessageWhenFirstIsMissing: String, - errMessageWhenSecondIsMissing: String): Unit = { - requireSecondIfFirstIsDefined(opt1, opt2, errMessageWhenSecondIsMissing) - requireSecondIfFirstIsDefined(opt2, opt1, errMessageWhenFirstIsMissing) - } - - private def requireSecondIfFirstIsDefined( - opt1: Option[_], opt2: Option[_], errMessageWhenSecondIsMissing: String): Unit = { - opt1.foreach { _ => - require(opt2.isDefined, errMessageWhenSecondIsMissing) - } - } - - private def requireNandDefined(opt1: Option[_], opt2: Option[_], errMessage: String): Unit = { - opt1.foreach { _ => require(opt2.isEmpty, errMessage) } - } - private def safeFileToString(filePath: String, fileType: String): String = { val file = new File(filePath) if (!file.isFile) { diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/kubernetes/KubernetesClusterSchedulerBackend.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/kubernetes/KubernetesClusterSchedulerBackend.scala index 0dd875b307a6d..5627f7c20de3d 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/kubernetes/KubernetesClusterSchedulerBackend.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/kubernetes/KubernetesClusterSchedulerBackend.scala @@ -322,7 +322,8 @@ private[spark] class KubernetesClusterSchedulerBackend( .addToRequests("cpu", executorCpuQuantity) .addToLimits("cpu", executorCpuQuantity) .endResources() - .withEnv(requiredEnv.asJava) + .addAllToEnv(requiredEnv.asJava) + .addToEnv(executorExtraClasspathEnv.toSeq: _*) .withPorts(requiredPorts.asJava) .endContainer() .endSpec() diff --git a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/kubernetes/submit/v2/SubmittedDependencyInitContainerConfigPluginSuite.scala b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/kubernetes/submit/v2/SubmittedDependencyInitContainerConfigPluginSuite.scala index 11a671085c201..09b41dc1bcaaf 100644 --- a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/kubernetes/submit/v2/SubmittedDependencyInitContainerConfigPluginSuite.scala +++ b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/kubernetes/submit/v2/SubmittedDependencyInitContainerConfigPluginSuite.scala @@ -23,20 +23,18 @@ import org.apache.spark.deploy.kubernetes.config._ class SubmittedDependencyInitContainerConfigPluginSuite extends SparkFunSuite { private val STAGING_SERVER_URI = "http://localhost:9000" + private val STAGING_SERVER_INTERNAL_URI = "http://internalHost:9000" private val JARS_RESOURCE_ID = "jars-id" private val FILES_RESOURCE_ID = "files-id" private val JARS_SECRET_KEY = "jars" private val FILES_SECRET_KEY = "files" private val TRUSTSTORE_SECRET_KEY = "trustStore" - private val SECRETS_VOLUME_MOUNT_PATH = "/var/data/" + private val CLIENT_CERT_SECRET_KEY = "client-cert" + private val SECRETS_VOLUME_MOUNT_PATH = "/var/data" private val TRUSTSTORE_PASSWORD = "trustStore" private val TRUSTSTORE_FILE = "/mnt/secrets/trustStore.jks" + private val CLIENT_CERT_URI = "local:///mnt/secrets/client-cert.pem" private val TRUSTSTORE_TYPE = "jks" - private val RESOURCE_STAGING_SERVICE_SSL_OPTIONS = SSLOptions( - enabled = true, - trustStore = Some(new File(TRUSTSTORE_FILE)), - trustStorePassword = Some(TRUSTSTORE_PASSWORD), - trustStoreType = Some(TRUSTSTORE_TYPE)) test("Plugin should provide configuration for fetching uploaded dependencies") { val configPluginUnderTest = new SubmittedDependencyInitContainerConfigPluginImpl( @@ -46,8 +44,13 @@ class SubmittedDependencyInitContainerConfigPluginSuite extends SparkFunSuite { JARS_SECRET_KEY, FILES_SECRET_KEY, TRUSTSTORE_SECRET_KEY, - SECRETS_VOLUME_MOUNT_PATH, - SSLOptions()) + CLIENT_CERT_SECRET_KEY, + false, + None, + None, + None, + None, + SECRETS_VOLUME_MOUNT_PATH) val addedConfigurations = configPluginUnderTest.configurationsToFetchSubmittedDependencies() val expectedConfigurations = Map( RESOURCE_STAGING_SERVER_URI.key -> STAGING_SERVER_URI, @@ -65,19 +68,24 @@ class SubmittedDependencyInitContainerConfigPluginSuite extends SparkFunSuite { val configPluginUnderTest = new SubmittedDependencyInitContainerConfigPluginImpl( STAGING_SERVER_URI, JARS_RESOURCE_ID, - FILES_RESOURCE_ID, - JARS_SECRET_KEY, + FILES_RESOURCE_ID, JARS_SECRET_KEY, FILES_SECRET_KEY, TRUSTSTORE_SECRET_KEY, - SECRETS_VOLUME_MOUNT_PATH, - RESOURCE_STAGING_SERVICE_SSL_OPTIONS) + CLIENT_CERT_SECRET_KEY, + true, + Some(TRUSTSTORE_FILE), + Some(CLIENT_CERT_URI), + Some(TRUSTSTORE_PASSWORD), + Some(TRUSTSTORE_TYPE), + SECRETS_VOLUME_MOUNT_PATH) val addedConfigurations = configPluginUnderTest.configurationsToFetchSubmittedDependencies() val expectedSslConfigurations = Map( RESOURCE_STAGING_SERVER_SSL_ENABLED.key -> "true", RESOURCE_STAGING_SERVER_TRUSTSTORE_FILE.key -> s"$SECRETS_VOLUME_MOUNT_PATH/$TRUSTSTORE_SECRET_KEY", RESOURCE_STAGING_SERVER_TRUSTSTORE_PASSWORD.key -> TRUSTSTORE_PASSWORD, - RESOURCE_STAGING_SERVER_TRUSTSTORE_TYPE.key -> TRUSTSTORE_TYPE) + RESOURCE_STAGING_SERVER_TRUSTSTORE_TYPE.key -> TRUSTSTORE_TYPE, + RESOURCE_STAGING_SERVER_CLIENT_CERT_PEM.key -> "/mnt/secrets/client-cert.pem") assert(expectedSslConfigurations.toSet.subsetOf(addedConfigurations.toSet)) } } diff --git a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/kubernetes/submit/v2/SubmittedDependencySecretBuilderSuite.scala b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/kubernetes/submit/v2/SubmittedDependencySecretBuilderSuite.scala index 189d87e27a28a..358edbecf8708 100644 --- a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/kubernetes/submit/v2/SubmittedDependencySecretBuilderSuite.scala +++ b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/kubernetes/submit/v2/SubmittedDependencySecretBuilderSuite.scala @@ -35,7 +35,9 @@ class SubmittedDependencySecretBuilderSuite extends SparkFunSuite { private val JARS_SECRET_KEY = "jars-secret-key" private val FILES_SECRET_KEY = "files-secret-key" private val TRUSTSTORE_SECRET_KEY = "truststore-secret-key" + private val CLIENT_CERT_SECRET_KEY = "client-cert" private val TRUSTSTORE_STRING_CONTENTS = "trustStore-contents" + private val CLIENT_CERT_STRING_CONTENTS = "client-certificate-contents" test("Building the secret without a trustStore") { val builder = new SubmittedDependencySecretBuilderImpl( @@ -45,7 +47,9 @@ class SubmittedDependencySecretBuilderSuite extends SparkFunSuite { JARS_SECRET_KEY, FILES_SECRET_KEY, TRUSTSTORE_SECRET_KEY, - SSLOptions()) + CLIENT_CERT_SECRET_KEY, + None, + None) val secret = builder.build() assert(secret.getMetadata.getName === SECRET_NAME) val secretDecodedData = decodeSecretData(secret) @@ -60,10 +64,12 @@ class SubmittedDependencySecretBuilderSuite extends SparkFunSuite { } test("Building the secret with a trustStore") { - val tempTrustStoreDir = Utils.createTempDir(namePrefix = "temp-truststores") + val tempSslDir = Utils.createTempDir(namePrefix = "temp-ssl-tests") try { - val trustStoreFile = new File(tempTrustStoreDir, "trustStore.jks") + val trustStoreFile = new File(tempSslDir, "trustStore.jks") Files.write(TRUSTSTORE_STRING_CONTENTS, trustStoreFile, Charsets.UTF_8) + val clientCertFile = new File(tempSslDir, "cert.pem") + Files.write(CLIENT_CERT_STRING_CONTENTS, clientCertFile, Charsets.UTF_8) val builder = new SubmittedDependencySecretBuilderImpl( SECRET_NAME, JARS_SECRET, @@ -71,13 +77,33 @@ class SubmittedDependencySecretBuilderSuite extends SparkFunSuite { JARS_SECRET_KEY, FILES_SECRET_KEY, TRUSTSTORE_SECRET_KEY, - SSLOptions(trustStore = Some(trustStoreFile))) + CLIENT_CERT_SECRET_KEY, + Some(trustStoreFile.getAbsolutePath), + Some(clientCertFile.getAbsolutePath)) val secret = builder.build() - val secretDecodedData = decodeSecretData(secret) - assert(secretDecodedData(TRUSTSTORE_SECRET_KEY) === TRUSTSTORE_STRING_CONTENTS) + val decodedSecretData = decodeSecretData(secret) + assert(decodedSecretData(TRUSTSTORE_SECRET_KEY) === TRUSTSTORE_STRING_CONTENTS) + assert(decodedSecretData(CLIENT_CERT_SECRET_KEY) === CLIENT_CERT_STRING_CONTENTS) } finally { - tempTrustStoreDir.delete() + tempSslDir.delete() } } + test("If trustStore and certificate are container-local, don't add secret entries") { + val builder = new SubmittedDependencySecretBuilderImpl( + SECRET_NAME, + JARS_SECRET, + FILES_SECRET, + JARS_SECRET_KEY, + FILES_SECRET_KEY, + TRUSTSTORE_SECRET_KEY, + CLIENT_CERT_SECRET_KEY, + Some("local:///mnt/secrets/trustStore.jks"), + Some("local:///mnt/secrets/cert.pem")) + val secret = builder.build() + val decodedSecretData = decodeSecretData(secret) + assert(!decodedSecretData.contains(TRUSTSTORE_SECRET_KEY)) + assert(!decodedSecretData.contains(CLIENT_CERT_SECRET_KEY)) + } + }