Skip to content

Commit

Permalink
Added configuration properties to inject arbitrary secrets into the d…
Browse files Browse the repository at this point in the history
…river/executors (#479)

* Added configuration properties to inject arbitrary secrets into the driver/executors

* Addressed comments
  • Loading branch information
liyinan926 authored and mccheah committed Sep 26, 2017
1 parent b61f495 commit f28cb17
Show file tree
Hide file tree
Showing 12 changed files with 331 additions and 18 deletions.
16 changes: 16 additions & 0 deletions docs/running-on-kubernetes.md
Original file line number Diff line number Diff line change
Expand Up @@ -800,6 +800,22 @@ from the other deployment modes. See the [configuration page](configuration.html
the Driver process. The user can specify multiple of these to set multiple environment variables.
</td>
</tr>
<tr>
<td><code>spark.kubernetes.driver.secrets.[SecretName]</code></td>
<td>(none)</td>
<td>
Mounts the Kubernetes secret named <code>SecretName</code> onto the path specified by the value
in the driver Pod. The user can specify multiple instances of this for multiple secrets.
</td>
</tr>
<tr>
<td><code>spark.kubernetes.executor.secrets.[SecretName]</code></td>
<td>(none)</td>
<td>
Mounts the Kubernetes secret named <code>SecretName</code> onto the path specified by the value
in the executor Pods. The user can specify multiple instances of this for multiple secrets.
</td>
</tr>
</table>


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -152,6 +152,9 @@ package object config extends Logging {
.stringConf
.createOptional

private[spark] val KUBERNETES_DRIVER_SECRETS_PREFIX = "spark.kubernetes.driver.secrets."
private[spark] val KUBERNETES_EXECUTOR_SECRETS_PREFIX = "spark.kubernetes.executor.secrets."

private[spark] val KUBERNETES_DRIVER_POD_NAME =
ConfigBuilder("spark.kubernetes.driver.pod.name")
.doc("Name of the driver pod.")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ import org.apache.spark.SparkConf
import org.apache.spark.deploy.kubernetes.ConfigurationUtils
import org.apache.spark.deploy.kubernetes.config._
import org.apache.spark.deploy.kubernetes.constants._
import org.apache.spark.deploy.kubernetes.submit.submitsteps.{BaseDriverConfigurationStep, DependencyResolutionStep, DriverConfigurationStep, DriverKubernetesCredentialsStep, InitContainerBootstrapStep, MountSmallLocalFilesStep, PythonStep}
import org.apache.spark.deploy.kubernetes.submit.submitsteps._
import org.apache.spark.deploy.kubernetes.submit.submitsteps.initcontainer.InitContainerConfigurationStepsOrchestrator
import org.apache.spark.launcher.SparkLauncher
import org.apache.spark.util.Utils
Expand Down Expand Up @@ -83,6 +83,11 @@ private[spark] class DriverConfigurationStepsOrchestrator(
val allDriverLabels = driverCustomLabels ++ Map(
SPARK_APP_ID_LABEL -> kubernetesAppId,
SPARK_ROLE_LABEL -> SPARK_POD_DRIVER_ROLE)
val driverSecretNamesToMountPaths = ConfigurationUtils.parsePrefixedKeyValuePairs(
submissionSparkConf,
KUBERNETES_DRIVER_SECRETS_PREFIX,
"driver secrets")

val initialSubmissionStep = new BaseDriverConfigurationStep(
kubernetesAppId,
kubernetesResourceNamePrefix,
Expand All @@ -92,8 +97,10 @@ private[spark] class DriverConfigurationStepsOrchestrator(
mainClass,
appArgs,
submissionSparkConf)

val kubernetesCredentialsStep = new DriverKubernetesCredentialsStep(
submissionSparkConf, kubernetesResourceNamePrefix)

val pythonStep = mainAppResource match {
case PythonMainAppResource(mainPyResource) =>
Option(new PythonStep(mainPyResource, additionalPythonFiles, filesDownloadPath))
Expand Down Expand Up @@ -153,17 +160,27 @@ private[spark] class DriverConfigurationStepsOrchestrator(
} else {
(filesDownloadPath, Seq.empty[DriverConfigurationStep])
}

val dependencyResolutionStep = new DependencyResolutionStep(
sparkJars,
sparkFiles,
jarsDownloadPath,
localFilesDownloadPath)

val mountSecretsStep = if (driverSecretNamesToMountPaths.nonEmpty) {
val mountSecretsBootstrap = new MountSecretsBootstrapImpl(driverSecretNamesToMountPaths)
Some(new MountSecretsStep(mountSecretsBootstrap))
} else {
None
}

Seq(
initialSubmissionStep,
kubernetesCredentialsStep,
dependencyResolutionStep) ++
submittedDependenciesBootstrapSteps ++
pythonStep.toSeq
pythonStep.toSeq ++
mountSecretsStep.toSeq
}

private def areAnyFilesNonContainerLocal(files: Seq[String]): Boolean = {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark.deploy.kubernetes.submit

import io.fabric8.kubernetes.api.model.{Container, ContainerBuilder, Pod, PodBuilder}

/**
* Bootstraps a driver or executor pod with needed secrets mounted.
*/
private[spark] trait MountSecretsBootstrap {

/**
* Mounts Kubernetes secrets as secret volumes into the given container in the given pod.
*
* @param pod the pod into which the secret volumes are being added.
* @param container the container into which the secret volumes are being mounted.
* @return the updated pod and container with the secrets mounted.
*/
def mountSecrets(pod: Pod, container: Container): (Pod, Container)
}

private[spark] class MountSecretsBootstrapImpl(
secretNamesToMountPaths: Map[String, String]) extends MountSecretsBootstrap {

override def mountSecrets(pod: Pod, container: Container): (Pod, Container) = {
var podBuilder = new PodBuilder(pod)
secretNamesToMountPaths.keys.foreach(name =>
podBuilder = podBuilder
.editOrNewSpec()
.addNewVolume()
.withName(secretVolumeName(name))
.withNewSecret()
.withSecretName(name)
.endSecret()
.endVolume()
.endSpec())

var containerBuilder = new ContainerBuilder(container)
secretNamesToMountPaths.foreach(namePath =>
containerBuilder = containerBuilder
.addNewVolumeMount()
.withName(secretVolumeName(namePath._1))
.withMountPath(namePath._2)
.endVolumeMount()
)

(podBuilder.build(), containerBuilder.build())
}

private def secretVolumeName(secretName: String): String = {
secretName + "-volume"
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark.deploy.kubernetes.submit.submitsteps

import org.apache.spark.deploy.kubernetes.submit.MountSecretsBootstrap

/**
* A driver configuration step for mounting user-specified secrets onto user-specified paths.
*
* @param mountSecretsBootstrap a utility actually handling mounting of the secrets.
*/
private[spark] class MountSecretsStep(
mountSecretsBootstrap: MountSecretsBootstrap) extends DriverConfigurationStep {

override def configureDriver(driverSpec: KubernetesDriverSpec): KubernetesDriverSpec = {
val (driverPodWithSecretsMounted, driverContainerWithSecretsMounted) =
mountSecretsBootstrap.mountSecrets(driverSpec.driverPod, driverSpec.driverContainer)
driverSpec.copy(
driverPod = driverPodWithSecretsMounted,
driverContainer = driverContainerWithSecretsMounted
)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,17 +16,16 @@
*/
package org.apache.spark.scheduler.cluster.kubernetes

import java.util.concurrent.atomic.AtomicLong
import scala.collection.JavaConverters._

import io.fabric8.kubernetes.api.model.{ContainerBuilder, ContainerPortBuilder, EnvVar, EnvVarBuilder, EnvVarSourceBuilder, Pod, PodBuilder, QuantityBuilder}
import org.apache.commons.io.FilenameUtils
import scala.collection.JavaConverters._

import org.apache.spark.{SparkConf, SparkException}
import org.apache.spark.deploy.kubernetes.{ConfigurationUtils, InitContainerResourceStagingServerSecretPlugin, PodWithDetachedInitContainer, SparkPodInitContainerBootstrap}
import org.apache.spark.deploy.kubernetes.config._
import org.apache.spark.deploy.kubernetes.constants._
import org.apache.spark.deploy.kubernetes.submit.{InitContainerUtil, MountSmallFilesBootstrap}
import org.apache.spark.deploy.kubernetes.submit.{InitContainerUtil, MountSecretsBootstrap, MountSmallFilesBootstrap}
import org.apache.spark.util.Utils

// Configures executor pods. Construct one of these with a SparkConf to set up properties that are
Expand All @@ -45,6 +44,7 @@ private[spark] trait ExecutorPodFactory {
private[spark] class ExecutorPodFactoryImpl(
sparkConf: SparkConf,
nodeAffinityExecutorPodModifier: NodeAffinityExecutorPodModifier,
mountSecretsBootstrap: Option[MountSecretsBootstrap],
mountSmallFilesBootstrap: Option[MountSmallFilesBootstrap],
executorInitContainerBootstrap: Option[SparkPodInitContainerBootstrap],
executorMountInitContainerSecretPlugin: Option[InitContainerResourceStagingServerSecretPlugin])
Expand Down Expand Up @@ -250,11 +250,18 @@ private[spark] class ExecutorPodFactoryImpl(
.build()
}
}.getOrElse(executorPod)

val (withMaybeSecretsMountedPod, withMaybeSecretsMountedContainer) =
mountSecretsBootstrap.map {bootstrap =>
bootstrap.mountSecrets(withMaybeShuffleConfigPod, withMaybeShuffleConfigExecutorContainer)
}.getOrElse((withMaybeShuffleConfigPod, withMaybeShuffleConfigExecutorContainer))

val (withMaybeSmallFilesMountedPod, withMaybeSmallFilesMountedContainer) =
mountSmallFilesBootstrap.map { bootstrap =>
bootstrap.mountSmallFilesSecret(
withMaybeShuffleConfigPod, withMaybeShuffleConfigExecutorContainer)
}.getOrElse((withMaybeShuffleConfigPod, withMaybeShuffleConfigExecutorContainer))
withMaybeSecretsMountedPod, withMaybeSecretsMountedContainer)
}.getOrElse((withMaybeSecretsMountedPod, withMaybeSecretsMountedContainer))

val (executorPodWithInitContainer, initBootstrappedExecutorContainer) =
executorInitContainerBootstrap.map { bootstrap =>
val podWithDetachedInitContainer = bootstrap.bootstrapInitContainerAndVolumes(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,10 +21,10 @@ import java.io.File
import io.fabric8.kubernetes.client.Config

import org.apache.spark.SparkContext
import org.apache.spark.deploy.kubernetes.{InitContainerResourceStagingServerSecretPluginImpl, SparkKubernetesClientFactory, SparkPodInitContainerBootstrapImpl}
import org.apache.spark.deploy.kubernetes.{ConfigurationUtils, InitContainerResourceStagingServerSecretPluginImpl, SparkKubernetesClientFactory, SparkPodInitContainerBootstrapImpl}
import org.apache.spark.deploy.kubernetes.config._
import org.apache.spark.deploy.kubernetes.constants._
import org.apache.spark.deploy.kubernetes.submit.MountSmallFilesBootstrapImpl
import org.apache.spark.deploy.kubernetes.submit.{MountSecretsBootstrapImpl, MountSmallFilesBootstrapImpl}
import org.apache.spark.internal.Logging
import org.apache.spark.scheduler.{ExternalClusterManager, SchedulerBackend, TaskScheduler, TaskSchedulerImpl}

Expand All @@ -51,6 +51,7 @@ private[spark] class KubernetesClusterManager extends ExternalClusterManager wit
sparkConf.get(EXECUTOR_INIT_CONTAINER_SECRET)
val maybeExecutorInitContainerSecretMountPath =
sparkConf.get(EXECUTOR_INIT_CONTAINER_SECRET_MOUNT_DIR)

val executorInitContainerSecretVolumePlugin = for {
initContainerSecretName <- maybeExecutorInitContainerSecretName
initContainerSecretMountPath <- maybeExecutorInitContainerSecretMountPath
Expand All @@ -59,10 +60,11 @@ private[spark] class KubernetesClusterManager extends ExternalClusterManager wit
initContainerSecretName,
initContainerSecretMountPath)
}

// Only set up the bootstrap if they've provided both the config map key and the config map
// name. The config map might not be provided if init-containers aren't being used to
// bootstrap dependencies.
val executorInitContainerbootStrap = for {
val executorInitContainerBootstrap = for {
configMap <- maybeInitContainerConfigMap
configMapKey <- maybeInitContainerConfigMapKey
} yield {
Expand All @@ -75,12 +77,22 @@ private[spark] class KubernetesClusterManager extends ExternalClusterManager wit
configMap,
configMapKey)
}

val mountSmallFilesBootstrap = for {
secretName <- maybeSubmittedFilesSecret
secretMountPath <- maybeSubmittedFilesSecretMountPath
} yield {
new MountSmallFilesBootstrapImpl(secretName, secretMountPath)
}

val executorSecretNamesToMountPaths = ConfigurationUtils.parsePrefixedKeyValuePairs(sparkConf,
KUBERNETES_EXECUTOR_SECRETS_PREFIX, "executor secrets")
val mountSecretBootstrap = if (executorSecretNamesToMountPaths.nonEmpty) {
Some(new MountSecretsBootstrapImpl(executorSecretNamesToMountPaths))
} else {
None
}

if (maybeInitContainerConfigMap.isEmpty) {
logWarning("The executor's init-container config map was not specified. Executors will" +
" therefore not attempt to fetch remote or submitted dependencies.")
Expand All @@ -89,6 +101,7 @@ private[spark] class KubernetesClusterManager extends ExternalClusterManager wit
logWarning("The executor's init-container config map key was not specified. Executors will" +
" therefore not attempt to fetch remote or submitted dependencies.")
}

val kubernetesClient = SparkKubernetesClientFactory.createKubernetesClient(
KUBERNETES_MASTER_INTERNAL_URL,
Some(sparkConf.get(KUBERNETES_NAMESPACE)),
Expand All @@ -99,8 +112,9 @@ private[spark] class KubernetesClusterManager extends ExternalClusterManager wit
val executorPodFactory = new ExecutorPodFactoryImpl(
sparkConf,
NodeAffinityExecutorPodModifierImpl,
mountSecretBootstrap,
mountSmallFilesBootstrap,
executorInitContainerbootStrap,
executorInitContainerBootstrap,
executorInitContainerSecretVolumePlugin)
new KubernetesClusterSchedulerBackend(
sc.taskScheduler.asInstanceOf[TaskSchedulerImpl],
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,16 +22,14 @@ import java.util.Collections
import java.util.concurrent.{ConcurrentHashMap, TimeUnit}
import java.util.concurrent.atomic.{AtomicInteger, AtomicLong, AtomicReference}

import com.fasterxml.jackson.databind.ObjectMapper
import com.fasterxml.jackson.module.scala.DefaultScalaModule
import io.fabric8.kubernetes.api.model._
import io.fabric8.kubernetes.client.{KubernetesClient, KubernetesClientException, Watcher}
import io.fabric8.kubernetes.client.Watcher.Action
import org.apache.commons.io.FilenameUtils
import scala.collection.{concurrent, mutable}
import scala.collection.JavaConverters._
import scala.concurrent.{ExecutionContext, Future}

import io.fabric8.kubernetes.api.model._
import io.fabric8.kubernetes.client.{KubernetesClient, KubernetesClientException, Watcher}
import io.fabric8.kubernetes.client.Watcher.Action

import org.apache.spark.{SparkContext, SparkEnv, SparkException}
import org.apache.spark.deploy.kubernetes.ConfigurationUtils
import org.apache.spark.deploy.kubernetes.config._
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ package org.apache.spark.deploy.kubernetes.submit

import org.apache.spark.{SparkConf, SparkFunSuite}
import org.apache.spark.deploy.kubernetes.config._
import org.apache.spark.deploy.kubernetes.submit.submitsteps.{BaseDriverConfigurationStep, DependencyResolutionStep, DriverConfigurationStep, DriverKubernetesCredentialsStep, InitContainerBootstrapStep, MountSmallLocalFilesStep, PythonStep}
import org.apache.spark.deploy.kubernetes.submit.submitsteps._

private[spark] class DriverConfigurationStepsOrchestratorSuite extends SparkFunSuite {

Expand All @@ -29,6 +29,9 @@ private[spark] class DriverConfigurationStepsOrchestratorSuite extends SparkFunS
private val MAIN_CLASS = "org.apache.spark.examples.SparkPi"
private val APP_ARGS = Array("arg1", "arg2")
private val ADDITIONAL_PYTHON_FILES = Seq("local:///var/apps/python/py1.py")
private val SECRET_FOO = "foo"
private val SECRET_BAR = "bar"
private val SECRET_MOUNT_PATH = "/etc/secrets/driver"

test("Base submission steps without an init-container or python files.") {
val sparkConf = new SparkConf(false)
Expand Down Expand Up @@ -116,6 +119,29 @@ private[spark] class DriverConfigurationStepsOrchestratorSuite extends SparkFunS
classOf[MountSmallLocalFilesStep])
}

test("Submission steps with driver secrets to mount") {
val sparkConf = new SparkConf(false)
.set(s"$KUBERNETES_DRIVER_SECRETS_PREFIX$SECRET_FOO", SECRET_MOUNT_PATH)
.set(s"$KUBERNETES_DRIVER_SECRETS_PREFIX$SECRET_BAR", SECRET_MOUNT_PATH)
val mainAppResource = JavaMainAppResource("local:///var/apps/jars/main.jar")
val orchestrator = new DriverConfigurationStepsOrchestrator(
NAMESPACE,
APP_ID,
LAUNCH_TIME,
mainAppResource,
APP_NAME,
MAIN_CLASS,
APP_ARGS,
ADDITIONAL_PYTHON_FILES,
sparkConf)
validateStepTypes(
orchestrator,
classOf[BaseDriverConfigurationStep],
classOf[DriverKubernetesCredentialsStep],
classOf[DependencyResolutionStep],
classOf[MountSecretsStep])
}

private def validateStepTypes(
orchestrator: DriverConfigurationStepsOrchestrator,
types: Class[_ <: DriverConfigurationStep]*): Unit = {
Expand Down
Loading

0 comments on commit f28cb17

Please sign in to comment.