diff --git a/docs/running-on-kubernetes.md b/docs/running-on-kubernetes.md
index cbaab84379db0..65c8763b55d17 100644
--- a/docs/running-on-kubernetes.md
+++ b/docs/running-on-kubernetes.md
@@ -800,6 +800,22 @@ from the other deployment modes. See the [configuration page](configuration.html
the Driver process. The user can specify multiple of these to set multiple environment variables.
+
+ spark.kubernetes.driver.secrets.[SecretName] |
+ (none) |
+
+ Mounts the Kubernetes secret named SecretName onto the path specified by the value
+ in the driver Pod. The user can specify multiple instances of this for multiple secrets.
+ |
+
+
+ spark.kubernetes.executor.secrets.[SecretName] |
+ (none) |
+
+ Mounts the Kubernetes secret named SecretName onto the path specified by the value
+ in the executor Pods. The user can specify multiple instances of this for multiple secrets.
+ |
+
diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/config.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/config.scala
index 53a184cba7a4d..9dfd13e1817f8 100644
--- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/config.scala
+++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/config.scala
@@ -152,6 +152,9 @@ package object config extends Logging {
.stringConf
.createOptional
+ private[spark] val KUBERNETES_DRIVER_SECRETS_PREFIX = "spark.kubernetes.driver.secrets."
+ private[spark] val KUBERNETES_EXECUTOR_SECRETS_PREFIX = "spark.kubernetes.executor.secrets."
+
private[spark] val KUBERNETES_DRIVER_POD_NAME =
ConfigBuilder("spark.kubernetes.driver.pod.name")
.doc("Name of the driver pod.")
diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/submit/DriverConfigurationStepsOrchestrator.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/submit/DriverConfigurationStepsOrchestrator.scala
index b66da0b154698..1bb336fa616d0 100644
--- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/submit/DriverConfigurationStepsOrchestrator.scala
+++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/submit/DriverConfigurationStepsOrchestrator.scala
@@ -20,7 +20,7 @@ import org.apache.spark.SparkConf
import org.apache.spark.deploy.kubernetes.ConfigurationUtils
import org.apache.spark.deploy.kubernetes.config._
import org.apache.spark.deploy.kubernetes.constants._
-import org.apache.spark.deploy.kubernetes.submit.submitsteps.{BaseDriverConfigurationStep, DependencyResolutionStep, DriverConfigurationStep, DriverKubernetesCredentialsStep, InitContainerBootstrapStep, MountSmallLocalFilesStep, PythonStep}
+import org.apache.spark.deploy.kubernetes.submit.submitsteps._
import org.apache.spark.deploy.kubernetes.submit.submitsteps.initcontainer.InitContainerConfigurationStepsOrchestrator
import org.apache.spark.launcher.SparkLauncher
import org.apache.spark.util.Utils
@@ -83,6 +83,11 @@ private[spark] class DriverConfigurationStepsOrchestrator(
val allDriverLabels = driverCustomLabels ++ Map(
SPARK_APP_ID_LABEL -> kubernetesAppId,
SPARK_ROLE_LABEL -> SPARK_POD_DRIVER_ROLE)
+ val driverSecretNamesToMountPaths = ConfigurationUtils.parsePrefixedKeyValuePairs(
+ submissionSparkConf,
+ KUBERNETES_DRIVER_SECRETS_PREFIX,
+ "driver secrets")
+
val initialSubmissionStep = new BaseDriverConfigurationStep(
kubernetesAppId,
kubernetesResourceNamePrefix,
@@ -92,8 +97,10 @@ private[spark] class DriverConfigurationStepsOrchestrator(
mainClass,
appArgs,
submissionSparkConf)
+
val kubernetesCredentialsStep = new DriverKubernetesCredentialsStep(
submissionSparkConf, kubernetesResourceNamePrefix)
+
val pythonStep = mainAppResource match {
case PythonMainAppResource(mainPyResource) =>
Option(new PythonStep(mainPyResource, additionalPythonFiles, filesDownloadPath))
@@ -153,17 +160,27 @@ private[spark] class DriverConfigurationStepsOrchestrator(
} else {
(filesDownloadPath, Seq.empty[DriverConfigurationStep])
}
+
val dependencyResolutionStep = new DependencyResolutionStep(
sparkJars,
sparkFiles,
jarsDownloadPath,
localFilesDownloadPath)
+
+ val mountSecretsStep = if (driverSecretNamesToMountPaths.nonEmpty) {
+ val mountSecretsBootstrap = new MountSecretsBootstrapImpl(driverSecretNamesToMountPaths)
+ Some(new MountSecretsStep(mountSecretsBootstrap))
+ } else {
+ None
+ }
+
Seq(
initialSubmissionStep,
kubernetesCredentialsStep,
dependencyResolutionStep) ++
submittedDependenciesBootstrapSteps ++
- pythonStep.toSeq
+ pythonStep.toSeq ++
+ mountSecretsStep.toSeq
}
private def areAnyFilesNonContainerLocal(files: Seq[String]): Boolean = {
diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/submit/MountSecretsBootstrap.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/submit/MountSecretsBootstrap.scala
new file mode 100644
index 0000000000000..ae10c9390b221
--- /dev/null
+++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/submit/MountSecretsBootstrap.scala
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.deploy.kubernetes.submit
+
+import io.fabric8.kubernetes.api.model.{Container, ContainerBuilder, Pod, PodBuilder}
+
+/**
+ * Bootstraps a driver or executor pod with needed secrets mounted.
+ */
+private[spark] trait MountSecretsBootstrap {
+
+ /**
+ * Mounts Kubernetes secrets as secret volumes into the given container in the given pod.
+ *
+ * @param pod the pod into which the secret volumes are being added.
+ * @param container the container into which the secret volumes are being mounted.
+ * @return the updated pod and container with the secrets mounted.
+ */
+ def mountSecrets(pod: Pod, container: Container): (Pod, Container)
+}
+
+private[spark] class MountSecretsBootstrapImpl(
+ secretNamesToMountPaths: Map[String, String]) extends MountSecretsBootstrap {
+
+ override def mountSecrets(pod: Pod, container: Container): (Pod, Container) = {
+ var podBuilder = new PodBuilder(pod)
+ secretNamesToMountPaths.keys.foreach(name =>
+ podBuilder = podBuilder
+ .editOrNewSpec()
+ .addNewVolume()
+ .withName(secretVolumeName(name))
+ .withNewSecret()
+ .withSecretName(name)
+ .endSecret()
+ .endVolume()
+ .endSpec())
+
+ var containerBuilder = new ContainerBuilder(container)
+ secretNamesToMountPaths.foreach(namePath =>
+ containerBuilder = containerBuilder
+ .addNewVolumeMount()
+ .withName(secretVolumeName(namePath._1))
+ .withMountPath(namePath._2)
+ .endVolumeMount()
+ )
+
+ (podBuilder.build(), containerBuilder.build())
+ }
+
+ private def secretVolumeName(secretName: String): String = {
+ secretName + "-volume"
+ }
+}
diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/submit/submitsteps/MountSecretsStep.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/submit/submitsteps/MountSecretsStep.scala
new file mode 100644
index 0000000000000..d20865daba3e1
--- /dev/null
+++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/submit/submitsteps/MountSecretsStep.scala
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.deploy.kubernetes.submit.submitsteps
+
+import org.apache.spark.deploy.kubernetes.submit.MountSecretsBootstrap
+
+/**
+ * A driver configuration step for mounting user-specified secrets onto user-specified paths.
+ *
+ * @param mountSecretsBootstrap a utility actually handling mounting of the secrets.
+ */
+private[spark] class MountSecretsStep(
+ mountSecretsBootstrap: MountSecretsBootstrap) extends DriverConfigurationStep {
+
+ override def configureDriver(driverSpec: KubernetesDriverSpec): KubernetesDriverSpec = {
+ val (driverPodWithSecretsMounted, driverContainerWithSecretsMounted) =
+ mountSecretsBootstrap.mountSecrets(driverSpec.driverPod, driverSpec.driverContainer)
+ driverSpec.copy(
+ driverPod = driverPodWithSecretsMounted,
+ driverContainer = driverContainerWithSecretsMounted
+ )
+ }
+}
diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/kubernetes/ExecutorPodFactory.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/kubernetes/ExecutorPodFactory.scala
index 6355afa0a5041..caf1f45521da5 100644
--- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/kubernetes/ExecutorPodFactory.scala
+++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/kubernetes/ExecutorPodFactory.scala
@@ -16,17 +16,16 @@
*/
package org.apache.spark.scheduler.cluster.kubernetes
-import java.util.concurrent.atomic.AtomicLong
+import scala.collection.JavaConverters._
import io.fabric8.kubernetes.api.model.{ContainerBuilder, ContainerPortBuilder, EnvVar, EnvVarBuilder, EnvVarSourceBuilder, Pod, PodBuilder, QuantityBuilder}
import org.apache.commons.io.FilenameUtils
-import scala.collection.JavaConverters._
import org.apache.spark.{SparkConf, SparkException}
import org.apache.spark.deploy.kubernetes.{ConfigurationUtils, InitContainerResourceStagingServerSecretPlugin, PodWithDetachedInitContainer, SparkPodInitContainerBootstrap}
import org.apache.spark.deploy.kubernetes.config._
import org.apache.spark.deploy.kubernetes.constants._
-import org.apache.spark.deploy.kubernetes.submit.{InitContainerUtil, MountSmallFilesBootstrap}
+import org.apache.spark.deploy.kubernetes.submit.{InitContainerUtil, MountSecretsBootstrap, MountSmallFilesBootstrap}
import org.apache.spark.util.Utils
// Configures executor pods. Construct one of these with a SparkConf to set up properties that are
@@ -45,6 +44,7 @@ private[spark] trait ExecutorPodFactory {
private[spark] class ExecutorPodFactoryImpl(
sparkConf: SparkConf,
nodeAffinityExecutorPodModifier: NodeAffinityExecutorPodModifier,
+ mountSecretsBootstrap: Option[MountSecretsBootstrap],
mountSmallFilesBootstrap: Option[MountSmallFilesBootstrap],
executorInitContainerBootstrap: Option[SparkPodInitContainerBootstrap],
executorMountInitContainerSecretPlugin: Option[InitContainerResourceStagingServerSecretPlugin])
@@ -250,11 +250,18 @@ private[spark] class ExecutorPodFactoryImpl(
.build()
}
}.getOrElse(executorPod)
+
+ val (withMaybeSecretsMountedPod, withMaybeSecretsMountedContainer) =
+ mountSecretsBootstrap.map {bootstrap =>
+ bootstrap.mountSecrets(withMaybeShuffleConfigPod, withMaybeShuffleConfigExecutorContainer)
+ }.getOrElse((withMaybeShuffleConfigPod, withMaybeShuffleConfigExecutorContainer))
+
val (withMaybeSmallFilesMountedPod, withMaybeSmallFilesMountedContainer) =
mountSmallFilesBootstrap.map { bootstrap =>
bootstrap.mountSmallFilesSecret(
- withMaybeShuffleConfigPod, withMaybeShuffleConfigExecutorContainer)
- }.getOrElse((withMaybeShuffleConfigPod, withMaybeShuffleConfigExecutorContainer))
+ withMaybeSecretsMountedPod, withMaybeSecretsMountedContainer)
+ }.getOrElse((withMaybeSecretsMountedPod, withMaybeSecretsMountedContainer))
+
val (executorPodWithInitContainer, initBootstrappedExecutorContainer) =
executorInitContainerBootstrap.map { bootstrap =>
val podWithDetachedInitContainer = bootstrap.bootstrapInitContainerAndVolumes(
diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/kubernetes/KubernetesClusterManager.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/kubernetes/KubernetesClusterManager.scala
index f63d0aeabad3b..999824d1bc1d0 100644
--- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/kubernetes/KubernetesClusterManager.scala
+++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/kubernetes/KubernetesClusterManager.scala
@@ -21,10 +21,10 @@ import java.io.File
import io.fabric8.kubernetes.client.Config
import org.apache.spark.SparkContext
-import org.apache.spark.deploy.kubernetes.{InitContainerResourceStagingServerSecretPluginImpl, SparkKubernetesClientFactory, SparkPodInitContainerBootstrapImpl}
+import org.apache.spark.deploy.kubernetes.{ConfigurationUtils, InitContainerResourceStagingServerSecretPluginImpl, SparkKubernetesClientFactory, SparkPodInitContainerBootstrapImpl}
import org.apache.spark.deploy.kubernetes.config._
import org.apache.spark.deploy.kubernetes.constants._
-import org.apache.spark.deploy.kubernetes.submit.MountSmallFilesBootstrapImpl
+import org.apache.spark.deploy.kubernetes.submit.{MountSecretsBootstrapImpl, MountSmallFilesBootstrapImpl}
import org.apache.spark.internal.Logging
import org.apache.spark.scheduler.{ExternalClusterManager, SchedulerBackend, TaskScheduler, TaskSchedulerImpl}
@@ -51,6 +51,7 @@ private[spark] class KubernetesClusterManager extends ExternalClusterManager wit
sparkConf.get(EXECUTOR_INIT_CONTAINER_SECRET)
val maybeExecutorInitContainerSecretMountPath =
sparkConf.get(EXECUTOR_INIT_CONTAINER_SECRET_MOUNT_DIR)
+
val executorInitContainerSecretVolumePlugin = for {
initContainerSecretName <- maybeExecutorInitContainerSecretName
initContainerSecretMountPath <- maybeExecutorInitContainerSecretMountPath
@@ -59,10 +60,11 @@ private[spark] class KubernetesClusterManager extends ExternalClusterManager wit
initContainerSecretName,
initContainerSecretMountPath)
}
+
// Only set up the bootstrap if they've provided both the config map key and the config map
// name. The config map might not be provided if init-containers aren't being used to
// bootstrap dependencies.
- val executorInitContainerbootStrap = for {
+ val executorInitContainerBootstrap = for {
configMap <- maybeInitContainerConfigMap
configMapKey <- maybeInitContainerConfigMapKey
} yield {
@@ -75,12 +77,22 @@ private[spark] class KubernetesClusterManager extends ExternalClusterManager wit
configMap,
configMapKey)
}
+
val mountSmallFilesBootstrap = for {
secretName <- maybeSubmittedFilesSecret
secretMountPath <- maybeSubmittedFilesSecretMountPath
} yield {
new MountSmallFilesBootstrapImpl(secretName, secretMountPath)
}
+
+ val executorSecretNamesToMountPaths = ConfigurationUtils.parsePrefixedKeyValuePairs(sparkConf,
+ KUBERNETES_EXECUTOR_SECRETS_PREFIX, "executor secrets")
+ val mountSecretBootstrap = if (executorSecretNamesToMountPaths.nonEmpty) {
+ Some(new MountSecretsBootstrapImpl(executorSecretNamesToMountPaths))
+ } else {
+ None
+ }
+
if (maybeInitContainerConfigMap.isEmpty) {
logWarning("The executor's init-container config map was not specified. Executors will" +
" therefore not attempt to fetch remote or submitted dependencies.")
@@ -89,6 +101,7 @@ private[spark] class KubernetesClusterManager extends ExternalClusterManager wit
logWarning("The executor's init-container config map key was not specified. Executors will" +
" therefore not attempt to fetch remote or submitted dependencies.")
}
+
val kubernetesClient = SparkKubernetesClientFactory.createKubernetesClient(
KUBERNETES_MASTER_INTERNAL_URL,
Some(sparkConf.get(KUBERNETES_NAMESPACE)),
@@ -99,8 +112,9 @@ private[spark] class KubernetesClusterManager extends ExternalClusterManager wit
val executorPodFactory = new ExecutorPodFactoryImpl(
sparkConf,
NodeAffinityExecutorPodModifierImpl,
+ mountSecretBootstrap,
mountSmallFilesBootstrap,
- executorInitContainerbootStrap,
+ executorInitContainerBootstrap,
executorInitContainerSecretVolumePlugin)
new KubernetesClusterSchedulerBackend(
sc.taskScheduler.asInstanceOf[TaskSchedulerImpl],
diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/kubernetes/KubernetesClusterSchedulerBackend.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/kubernetes/KubernetesClusterSchedulerBackend.scala
index 20fe23fbcd92f..a0a7e71a66485 100644
--- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/kubernetes/KubernetesClusterSchedulerBackend.scala
+++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/kubernetes/KubernetesClusterSchedulerBackend.scala
@@ -22,16 +22,14 @@ import java.util.Collections
import java.util.concurrent.{ConcurrentHashMap, TimeUnit}
import java.util.concurrent.atomic.{AtomicInteger, AtomicLong, AtomicReference}
-import com.fasterxml.jackson.databind.ObjectMapper
-import com.fasterxml.jackson.module.scala.DefaultScalaModule
-import io.fabric8.kubernetes.api.model._
-import io.fabric8.kubernetes.client.{KubernetesClient, KubernetesClientException, Watcher}
-import io.fabric8.kubernetes.client.Watcher.Action
-import org.apache.commons.io.FilenameUtils
import scala.collection.{concurrent, mutable}
import scala.collection.JavaConverters._
import scala.concurrent.{ExecutionContext, Future}
+import io.fabric8.kubernetes.api.model._
+import io.fabric8.kubernetes.client.{KubernetesClient, KubernetesClientException, Watcher}
+import io.fabric8.kubernetes.client.Watcher.Action
+
import org.apache.spark.{SparkContext, SparkEnv, SparkException}
import org.apache.spark.deploy.kubernetes.ConfigurationUtils
import org.apache.spark.deploy.kubernetes.config._
diff --git a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/kubernetes/submit/DriverConfigurationStepsOrchestratorSuite.scala b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/kubernetes/submit/DriverConfigurationStepsOrchestratorSuite.scala
index c168e7b5407ba..6bad594629f76 100644
--- a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/kubernetes/submit/DriverConfigurationStepsOrchestratorSuite.scala
+++ b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/kubernetes/submit/DriverConfigurationStepsOrchestratorSuite.scala
@@ -18,7 +18,7 @@ package org.apache.spark.deploy.kubernetes.submit
import org.apache.spark.{SparkConf, SparkFunSuite}
import org.apache.spark.deploy.kubernetes.config._
-import org.apache.spark.deploy.kubernetes.submit.submitsteps.{BaseDriverConfigurationStep, DependencyResolutionStep, DriverConfigurationStep, DriverKubernetesCredentialsStep, InitContainerBootstrapStep, MountSmallLocalFilesStep, PythonStep}
+import org.apache.spark.deploy.kubernetes.submit.submitsteps._
private[spark] class DriverConfigurationStepsOrchestratorSuite extends SparkFunSuite {
@@ -29,6 +29,9 @@ private[spark] class DriverConfigurationStepsOrchestratorSuite extends SparkFunS
private val MAIN_CLASS = "org.apache.spark.examples.SparkPi"
private val APP_ARGS = Array("arg1", "arg2")
private val ADDITIONAL_PYTHON_FILES = Seq("local:///var/apps/python/py1.py")
+ private val SECRET_FOO = "foo"
+ private val SECRET_BAR = "bar"
+ private val SECRET_MOUNT_PATH = "/etc/secrets/driver"
test("Base submission steps without an init-container or python files.") {
val sparkConf = new SparkConf(false)
@@ -116,6 +119,29 @@ private[spark] class DriverConfigurationStepsOrchestratorSuite extends SparkFunS
classOf[MountSmallLocalFilesStep])
}
+ test("Submission steps with driver secrets to mount") {
+ val sparkConf = new SparkConf(false)
+ .set(s"$KUBERNETES_DRIVER_SECRETS_PREFIX$SECRET_FOO", SECRET_MOUNT_PATH)
+ .set(s"$KUBERNETES_DRIVER_SECRETS_PREFIX$SECRET_BAR", SECRET_MOUNT_PATH)
+ val mainAppResource = JavaMainAppResource("local:///var/apps/jars/main.jar")
+ val orchestrator = new DriverConfigurationStepsOrchestrator(
+ NAMESPACE,
+ APP_ID,
+ LAUNCH_TIME,
+ mainAppResource,
+ APP_NAME,
+ MAIN_CLASS,
+ APP_ARGS,
+ ADDITIONAL_PYTHON_FILES,
+ sparkConf)
+ validateStepTypes(
+ orchestrator,
+ classOf[BaseDriverConfigurationStep],
+ classOf[DriverKubernetesCredentialsStep],
+ classOf[DependencyResolutionStep],
+ classOf[MountSecretsStep])
+ }
+
private def validateStepTypes(
orchestrator: DriverConfigurationStepsOrchestrator,
types: Class[_ <: DriverConfigurationStep]*): Unit = {
diff --git a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/kubernetes/submit/MountSecretsBootstrapSuite.scala b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/kubernetes/submit/MountSecretsBootstrapSuite.scala
new file mode 100644
index 0000000000000..a23ee667004a2
--- /dev/null
+++ b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/kubernetes/submit/MountSecretsBootstrapSuite.scala
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.deploy.kubernetes.submit
+
+import io.fabric8.kubernetes.api.model.{ContainerBuilder, PodBuilder}
+
+import org.apache.spark.SparkFunSuite
+
+private[spark] class MountSecretsBootstrapSuite extends SparkFunSuite {
+
+ private val SECRET_FOO = "foo"
+ private val SECRET_BAR = "bar"
+ private val SECRET_MOUNT_PATH = "/etc/secrets/driver"
+
+ test("Mounts all given secrets") {
+ val secretNamesToMountPaths = Map(
+ SECRET_FOO -> SECRET_MOUNT_PATH,
+ SECRET_BAR -> SECRET_MOUNT_PATH)
+
+ val driverContainer = new ContainerBuilder().build()
+ val driverPod = new PodBuilder().build()
+
+ val mountSecretsBootstrap = new MountSecretsBootstrapImpl(secretNamesToMountPaths)
+ val (driverPodWithSecretsMounted, driverContainerWithSecretsMounted) =
+ mountSecretsBootstrap.mountSecrets(driverPod, driverContainer)
+ Seq(s"$SECRET_FOO-volume", s"$SECRET_BAR-volume").foreach(volumeName =>
+ assert(SecretVolumeUtils.podHasVolume(driverPodWithSecretsMounted, volumeName)))
+ Seq(s"$SECRET_FOO-volume", s"$SECRET_BAR-volume").foreach(volumeName =>
+ assert(SecretVolumeUtils.containerHasVolume(
+ driverContainerWithSecretsMounted, volumeName, SECRET_MOUNT_PATH)))
+ }
+}
diff --git a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/kubernetes/submit/SecretVolumeUtils.scala b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/kubernetes/submit/SecretVolumeUtils.scala
new file mode 100644
index 0000000000000..860bc6e0438aa
--- /dev/null
+++ b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/kubernetes/submit/SecretVolumeUtils.scala
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.deploy.kubernetes.submit
+
+import scala.collection.JavaConverters._
+
+import io.fabric8.kubernetes.api.model.{Container, Pod}
+
+private[spark] object SecretVolumeUtils {
+
+ def podHasVolume(driverPod: Pod, volumeName: String): Boolean = {
+ driverPod.getSpec.getVolumes.asScala.exists(volume => volume.getName == volumeName)
+ }
+
+ def containerHasVolume(
+ driverContainer: Container,
+ volumeName: String,
+ mountPath: String): Boolean = {
+ driverContainer.getVolumeMounts.asScala.exists(volumeMount =>
+ volumeMount.getName == volumeName && volumeMount.getMountPath == mountPath)
+ }
+}
diff --git a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/kubernetes/submit/submitsteps/MountSecretsStepSuite.scala b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/kubernetes/submit/submitsteps/MountSecretsStepSuite.scala
new file mode 100644
index 0000000000000..b94e7345cd6e1
--- /dev/null
+++ b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/kubernetes/submit/submitsteps/MountSecretsStepSuite.scala
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.deploy.kubernetes.submit.submitsteps
+
+import org.apache.spark.{SparkConf, SparkFunSuite}
+import org.apache.spark.deploy.kubernetes.submit.{MountSecretsBootstrapImpl, SecretVolumeUtils}
+
+private[spark] class MountSecretsStepSuite extends SparkFunSuite {
+
+ private val SECRET_FOO = "foo"
+ private val SECRET_BAR = "bar"
+ private val SECRET_MOUNT_PATH = "/etc/secrets/driver"
+
+ test("Mounts all given secrets") {
+ val baseDriverSpec = KubernetesDriverSpec.initialSpec(new SparkConf(false))
+ val secretNamesToMountPaths = Map(
+ SECRET_FOO -> SECRET_MOUNT_PATH,
+ SECRET_BAR -> SECRET_MOUNT_PATH)
+
+ val mountSecretsBootstrap = new MountSecretsBootstrapImpl(secretNamesToMountPaths)
+ val mountSecretsStep = new MountSecretsStep(mountSecretsBootstrap)
+ val configuredDriverSpec = mountSecretsStep.configureDriver(baseDriverSpec)
+ val driverPodWithSecretsMounted = configuredDriverSpec.driverPod
+ val driverContainerWithSecretsMounted = configuredDriverSpec.driverContainer
+
+ Seq(s"$SECRET_FOO-volume", s"$SECRET_BAR-volume").foreach(volumeName =>
+ assert(SecretVolumeUtils.podHasVolume(driverPodWithSecretsMounted, volumeName)))
+ Seq(s"$SECRET_FOO-volume", s"$SECRET_BAR-volume").foreach(volumeName =>
+ assert(SecretVolumeUtils.containerHasVolume(
+ driverContainerWithSecretsMounted, volumeName, SECRET_MOUNT_PATH)))
+ }
+}