Skip to content

Commit

Permalink
fix executor account inconsistency
Browse files Browse the repository at this point in the history
  • Loading branch information
Stavros Kontopoulos committed Jun 3, 2019
1 parent 6748b48 commit a42f7a8
Show file tree
Hide file tree
Showing 7 changed files with 65 additions and 13 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -75,10 +75,9 @@ private[spark] object Config extends Logging {
.toSequence
.createWithDefault(Nil)

val KUBERNETES_AUTH_DRIVER_CONF_PREFIX =
"spark.kubernetes.authenticate.driver"
val KUBERNETES_AUTH_DRIVER_MOUNTED_CONF_PREFIX =
"spark.kubernetes.authenticate.driver.mounted"
val KUBERNETES_AUTH_DRIVER_CONF_PREFIX = "spark.kubernetes.authenticate.driver"
val KUBERNETES_AUTH_EXECUTOR_CONF_PREFIX = "spark.kubernetes.authenticate.executor"
val KUBERNETES_AUTH_DRIVER_MOUNTED_CONF_PREFIX = "spark.kubernetes.authenticate.driver.mounted"
val KUBERNETES_AUTH_CLIENT_MODE_PREFIX = "spark.kubernetes.authenticate"
val OAUTH_TOKEN_CONF_SUFFIX = "oauthToken"
val OAUTH_TOKEN_FILE_CONF_SUFFIX = "oauthTokenFile"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -289,4 +289,15 @@ private[spark] object KubernetesUtils extends Logging {
throw new SparkException(s"Error uploading file ${src.getName}", e)
}
}

def buildPodWithServiceAccount(serviceAccount: Option[String], pod: SparkPod): Option[Pod] = {
serviceAccount.map { account =>
new PodBuilder(pod.pod)
.editOrNewSpec()
.withServiceAccount(account)
.withServiceAccountName(account)
.endSpec()
.build()
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import io.fabric8.kubernetes.api.model.{ContainerBuilder, HasMetadata, PodBuilde
import org.apache.spark.deploy.k8s.{KubernetesConf, SparkPod}
import org.apache.spark.deploy.k8s.Config._
import org.apache.spark.deploy.k8s.Constants._
import org.apache.spark.deploy.k8s.KubernetesUtils.buildPodWithServiceAccount

private[spark] class DriverKubernetesCredentialsFeatureStep(kubernetesConf: KubernetesConf)
extends KubernetesFeatureConfigStep {
Expand Down Expand Up @@ -70,15 +71,7 @@ private[spark] class DriverKubernetesCredentialsFeatureStep(kubernetesConf: Kube

override def configurePod(pod: SparkPod): SparkPod = {
if (!shouldMountSecret) {
pod.copy(
pod = driverServiceAccount.map { account =>
new PodBuilder(pod.pod)
.editOrNewSpec()
.withServiceAccount(account)
.withServiceAccountName(account)
.endSpec()
.build()
}.getOrElse(pod.pod))
pod.copy(pod = buildPodWithServiceAccount(driverServiceAccount, pod).getOrElse(pod.pod))
} else {
val driverPodWithMountedKubernetesCredentials =
new PodBuilder(pod.pod)
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark.deploy.k8s.features

import org.apache.spark.deploy.k8s.{KubernetesConf, SparkPod}
import org.apache.spark.deploy.k8s.Config.KUBERNETES_SERVICE_ACCOUNT_NAME
import org.apache.spark.deploy.k8s.KubernetesUtils.buildPodWithServiceAccount

private[spark] class ExecutorKubernetesCredentialsFeatureStep(kubernetesConf: KubernetesConf)
extends KubernetesFeatureConfigStep {
private lazy val driverServiceAccount = kubernetesConf.get(KUBERNETES_SERVICE_ACCOUNT_NAME)

override def configurePod(pod: SparkPod): SparkPod = {
pod.copy(
// if not setup by the pod template fallback to the driver's sa,
// last option is the default sa.
pod = if (Option(pod.pod.getSpec.getServiceAccount).isEmpty) {
buildPodWithServiceAccount(driverServiceAccount, pod).getOrElse(pod.pod)
} else {
pod.pod
})
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ private[spark] class KubernetesExecutorBuilder {

val features = Seq(
new BasicExecutorFeatureStep(conf, secMgr),
new ExecutorKubernetesCredentialsFeatureStep(conf),
new MountSecretsFeatureStep(conf),
new EnvSecretsFeatureStep(conf),
new LocalDirsFeatureStep(conf),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,13 @@ private[spark] trait BasicTestsSuite { k8sSuite: KubernetesSuite =>
})
}

test("All pods have the same service account by default", k8sTestTag) {
runSparkPiAndVerifyCompletion(
executorPodChecker = (executorPod: Pod) => {
doExecutorServiceAccountCheck(executorPod, kubernetesTestComponents.serviceAccountName)
})
}

test("Run extraJVMOptions check on driver", k8sTestTag) {
sparkAppConf
.set("spark.driver.extraJavaOptions", "-Dspark.test.foo=spark.test.bar")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -335,6 +335,10 @@ class KubernetesSuite extends SparkFunSuite
=== baseMemory)
}

protected def doExecutorServiceAccountCheck(executorPod: Pod, account: String): Unit = {
doBasicExecutorPodCheck(executorPod)
assert(executorPod.getSpec.getServiceAccount == kubernetesTestComponents.serviceAccountName)
}

protected def doBasicDriverPyPodCheck(driverPod: Pod): Unit = {
assert(driverPod.getMetadata.getName === driverPodName)
Expand Down

0 comments on commit a42f7a8

Please sign in to comment.