Skip to content

Commit

Permalink
[SPARK-27872][K8S][2.4] Fix executor service account inconsistency
Browse files Browse the repository at this point in the history
### What changes were proposed in this pull request?

Similar patch to #24748 but applied to the branch-2.4.
Backporting the fix to releases 2.4.x.

Closes #29877 from nssalian/SPARK-27872.

Authored-by: nssalian <[email protected]>
Signed-off-by: Dongjoon Hyun <[email protected]>
  • Loading branch information
nssalian authored and dongjoon-hyun committed Oct 12, 2020
1 parent e21c2d3 commit 5a50e30
Show file tree
Hide file tree
Showing 8 changed files with 87 additions and 16 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -61,10 +61,9 @@ private[spark] object Config extends Logging {
.stringConf
.createOptional

val KUBERNETES_AUTH_DRIVER_CONF_PREFIX =
"spark.kubernetes.authenticate.driver"
val KUBERNETES_AUTH_DRIVER_MOUNTED_CONF_PREFIX =
"spark.kubernetes.authenticate.driver.mounted"
val KUBERNETES_AUTH_DRIVER_CONF_PREFIX = "spark.kubernetes.authenticate.driver"
val KUBERNETES_AUTH_EXECUTOR_CONF_PREFIX = "spark.kubernetes.authenticate.executor"
val KUBERNETES_AUTH_DRIVER_MOUNTED_CONF_PREFIX = "spark.kubernetes.authenticate.driver.mounted"
val KUBERNETES_AUTH_CLIENT_MODE_PREFIX = "spark.kubernetes.authenticate"
val OAUTH_TOKEN_CONF_SUFFIX = "oauthToken"
val OAUTH_TOKEN_FILE_CONF_SUFFIX = "oauthTokenFile"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@
*/
package org.apache.spark.deploy.k8s

import io.fabric8.kubernetes.api.model.{Container, ContainerBuilder, ContainerStateRunning, ContainerStateTerminated, ContainerStateWaiting, ContainerStatus, Pod, PodBuilder}

import org.apache.spark.SparkConf
import org.apache.spark.util.Utils

Expand Down Expand Up @@ -60,4 +62,15 @@ private[spark] object KubernetesUtils {
}

def parseMasterUrl(url: String): String = url.substring("k8s://".length)

def buildPodWithServiceAccount(serviceAccount: Option[String], pod: SparkPod): Option[Pod] = {
serviceAccount.map { account =>
new PodBuilder(pod.pod)
.editOrNewSpec()
.withServiceAccount(account)
.withServiceAccountName(account)
.endSpec()
.build()
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import io.fabric8.kubernetes.api.model.{ContainerBuilder, HasMetadata, PodBuilde
import org.apache.spark.deploy.k8s.{KubernetesConf, SparkPod}
import org.apache.spark.deploy.k8s.Config._
import org.apache.spark.deploy.k8s.Constants._
import org.apache.spark.deploy.k8s.KubernetesUtils.buildPodWithServiceAccount

private[spark] class DriverKubernetesCredentialsFeatureStep(kubernetesConf: KubernetesConf[_])
extends KubernetesFeatureConfigStep {
Expand Down Expand Up @@ -70,15 +71,7 @@ private[spark] class DriverKubernetesCredentialsFeatureStep(kubernetesConf: Kube

override def configurePod(pod: SparkPod): SparkPod = {
if (!shouldMountSecret) {
pod.copy(
pod = driverServiceAccount.map { account =>
new PodBuilder(pod.pod)
.editOrNewSpec()
.withServiceAccount(account)
.withServiceAccountName(account)
.endSpec()
.build()
}.getOrElse(pod.pod))
pod.copy(pod = buildPodWithServiceAccount(driverServiceAccount, pod).getOrElse(pod.pod))
} else {
val driverPodWithMountedKubernetesCredentials =
new PodBuilder(pod.pod)
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark.deploy.k8s.features

import io.fabric8.kubernetes.api.model.HasMetadata

import org.apache.spark.deploy.k8s.{KubernetesConf, SparkPod}
import org.apache.spark.deploy.k8s.Config.KUBERNETES_SERVICE_ACCOUNT_NAME
import org.apache.spark.deploy.k8s.KubernetesUtils.buildPodWithServiceAccount

private[spark] class ExecutorKubernetesCredentialsFeatureStep(kubernetesConf: KubernetesConf[_])
extends KubernetesFeatureConfigStep {
private lazy val driverServiceAccount = kubernetesConf.get(KUBERNETES_SERVICE_ACCOUNT_NAME)

override def configurePod(pod: SparkPod): SparkPod = {
pod.copy(
// if not setup by the pod template fallback to the driver's sa,
// last option is the default sa.
pod = if (Option(pod.pod.getSpec.getServiceAccount).isEmpty) {
buildPodWithServiceAccount(driverServiceAccount, pod).getOrElse(pod.pod)
} else {
pod.pod
})
}

override def getAdditionalPodSystemProperties(): Map[String, String] = Map.empty

override def getAdditionalKubernetesResources(): Seq[HasMetadata] = Seq.empty
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,12 +18,15 @@ package org.apache.spark.scheduler.cluster.k8s

import org.apache.spark.deploy.k8s.{KubernetesConf, KubernetesExecutorSpecificConf, KubernetesRoleSpecificConf, SparkPod}
import org.apache.spark.deploy.k8s.features._
import org.apache.spark.deploy.k8s.features.{BasicExecutorFeatureStep, EnvSecretsFeatureStep, LocalDirsFeatureStep, MountSecretsFeatureStep}
import org.apache.spark.deploy.k8s.features.{BasicExecutorFeatureStep, EnvSecretsFeatureStep, ExecutorKubernetesCredentialsFeatureStep, LocalDirsFeatureStep, MountSecretsFeatureStep}

private[spark] class KubernetesExecutorBuilder(
provideBasicStep: (KubernetesConf [KubernetesExecutorSpecificConf])
=> BasicExecutorFeatureStep =
new BasicExecutorFeatureStep(_),
provideCredentialsStep: (KubernetesConf [KubernetesExecutorSpecificConf])
=> ExecutorKubernetesCredentialsFeatureStep =
new ExecutorKubernetesCredentialsFeatureStep(_),
provideSecretsStep: (KubernetesConf[_ <: KubernetesRoleSpecificConf])
=> MountSecretsFeatureStep =
new MountSecretsFeatureStep(_),
Expand All @@ -50,8 +53,10 @@ private[spark] class KubernetesExecutorBuilder(
val volumesFeature = if (kubernetesConf.roleVolumes.nonEmpty) {
Seq(provideVolumesStep(kubernetesConf))
} else Nil
val credentialsFeature = Seq(provideCredentialsStep(kubernetesConf))

val allFeatures = baseFeatures ++ secretFeature ++ secretEnvFeature ++ volumesFeature
val allFeatures =
baseFeatures ++ secretFeature ++ secretEnvFeature ++ volumesFeature ++ credentialsFeature

var executorPod = SparkPod.initialPod()
for (feature <- allFeatures) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,13 +24,16 @@ import org.apache.spark.deploy.k8s.features._

class KubernetesExecutorBuilderSuite extends SparkFunSuite {
private val BASIC_STEP_TYPE = "basic"
private val CREDENTIALS_STEP_TYPE = "creds"
private val SECRETS_STEP_TYPE = "mount-secrets"
private val ENV_SECRETS_STEP_TYPE = "env-secrets"
private val LOCAL_DIRS_STEP_TYPE = "local-dirs"
private val MOUNT_VOLUMES_STEP_TYPE = "mount-volumes"

private val basicFeatureStep = KubernetesFeaturesTestUtils.getMockConfigStepForStepType(
BASIC_STEP_TYPE, classOf[BasicExecutorFeatureStep])
private val credentialsStep = KubernetesFeaturesTestUtils.getMockConfigStepForStepType(
CREDENTIALS_STEP_TYPE, classOf[ExecutorKubernetesCredentialsFeatureStep])
private val mountSecretsStep = KubernetesFeaturesTestUtils.getMockConfigStepForStepType(
SECRETS_STEP_TYPE, classOf[MountSecretsFeatureStep])
private val envSecretsStep = KubernetesFeaturesTestUtils.getMockConfigStepForStepType(
Expand All @@ -42,6 +45,7 @@ class KubernetesExecutorBuilderSuite extends SparkFunSuite {

private val builderUnderTest = new KubernetesExecutorBuilder(
_ => basicFeatureStep,
_ => credentialsStep,
_ => mountSecretsStep,
_ => envSecretsStep,
_ => localDirsStep,
Expand All @@ -62,7 +66,8 @@ class KubernetesExecutorBuilderSuite extends SparkFunSuite {
Nil,
Seq.empty[String])
validateStepTypesApplied(
builderUnderTest.buildFromFeatures(conf), BASIC_STEP_TYPE, LOCAL_DIRS_STEP_TYPE)
builderUnderTest.buildFromFeatures(conf), BASIC_STEP_TYPE,
CREDENTIALS_STEP_TYPE, LOCAL_DIRS_STEP_TYPE)
}

test("Apply secrets step if secrets are present.") {
Expand All @@ -82,6 +87,7 @@ class KubernetesExecutorBuilderSuite extends SparkFunSuite {
validateStepTypesApplied(
builderUnderTest.buildFromFeatures(conf),
BASIC_STEP_TYPE,
CREDENTIALS_STEP_TYPE,
LOCAL_DIRS_STEP_TYPE,
SECRETS_STEP_TYPE,
ENV_SECRETS_STEP_TYPE)
Expand Down Expand Up @@ -109,6 +115,7 @@ class KubernetesExecutorBuilderSuite extends SparkFunSuite {
validateStepTypesApplied(
builderUnderTest.buildFromFeatures(conf),
BASIC_STEP_TYPE,
CREDENTIALS_STEP_TYPE,
LOCAL_DIRS_STEP_TYPE,
MOUNT_VOLUMES_STEP_TYPE)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,13 @@ private[spark] trait BasicTestsSuite { k8sSuite: KubernetesSuite =>
})
}

test("All pods have the same service account by default", k8sTestTag) {
runSparkPiAndVerifyCompletion(
executorPodChecker = (executorPod: Pod) => {
doExecutorServiceAccountCheck(executorPod, kubernetesTestComponents.serviceAccountName)
})
}

test("Run extraJVMOptions check on driver", k8sTestTag) {
sparkAppConf
.set("spark.driver.extraJavaOptions", "-Dspark.test.foo=spark.test.bar")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -264,6 +264,10 @@ private[spark] class KubernetesSuite extends SparkFunSuite
=== baseMemory)
}

protected def doExecutorServiceAccountCheck(executorPod: Pod, account: String): Unit = {
doBasicExecutorPodCheck(executorPod)
assert(executorPod.getSpec.getServiceAccount == kubernetesTestComponents.serviceAccountName)
}

protected def doBasicDriverPyPodCheck(driverPod: Pod): Unit = {
assert(driverPod.getMetadata.getName === driverPodName)
Expand Down

0 comments on commit 5a50e30

Please sign in to comment.