Skip to content
This repository has been archived by the owner on May 9, 2024. It is now read-only.

Commit

Permalink
Ported SPARK-27872 to 2.4.5
Browse files Browse the repository at this point in the history
  • Loading branch information
yuchaoran2011 committed Jul 13, 2020
1 parent cee4ecb commit cc3a87c
Show file tree
Hide file tree
Showing 7 changed files with 66 additions and 14 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -61,10 +61,9 @@ private[spark] object Config extends Logging {
.stringConf
.createOptional

val KUBERNETES_AUTH_DRIVER_CONF_PREFIX =
"spark.kubernetes.authenticate.driver"
val KUBERNETES_AUTH_DRIVER_MOUNTED_CONF_PREFIX =
"spark.kubernetes.authenticate.driver.mounted"
val KUBERNETES_AUTH_DRIVER_CONF_PREFIX = "spark.kubernetes.authenticate.driver"
val KUBERNETES_AUTH_EXECUTOR_CONF_PREFIX = "spark.kubernetes.authenticate.executor"
val KUBERNETES_AUTH_DRIVER_MOUNTED_CONF_PREFIX = "spark.kubernetes.authenticate.driver.mounted"
val KUBERNETES_AUTH_CLIENT_MODE_PREFIX = "spark.kubernetes.authenticate"
val OAUTH_TOKEN_CONF_SUFFIX = "oauthToken"
val OAUTH_TOKEN_FILE_CONF_SUFFIX = "oauthTokenFile"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,4 +60,15 @@ private[spark] object KubernetesUtils {
}

def parseMasterUrl(url: String): String = url.substring("k8s://".length)

def buildPodWithServiceAccount(serviceAccount: Option[String], pod: SparkPod): Option[Pod] = {
serviceAccount.map { account =>
new PodBuilder(pod.pod)
.editOrNewSpec()
.withServiceAccount(account)
.withServiceAccountName(account)
.endSpec()
.build()
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import io.fabric8.kubernetes.api.model.{ContainerBuilder, HasMetadata, PodBuilde
import org.apache.spark.deploy.k8s.{KubernetesConf, SparkPod}
import org.apache.spark.deploy.k8s.Config._
import org.apache.spark.deploy.k8s.Constants._
import org.apache.spark.deploy.k8s.KubernetesUtils.buildPodWithServiceAccount

private[spark] class DriverKubernetesCredentialsFeatureStep(kubernetesConf: KubernetesConf[_])
extends KubernetesFeatureConfigStep {
Expand Down Expand Up @@ -70,15 +71,7 @@ private[spark] class DriverKubernetesCredentialsFeatureStep(kubernetesConf: Kube

override def configurePod(pod: SparkPod): SparkPod = {
if (!shouldMountSecret) {
pod.copy(
pod = driverServiceAccount.map { account =>
new PodBuilder(pod.pod)
.editOrNewSpec()
.withServiceAccount(account)
.withServiceAccountName(account)
.endSpec()
.build()
}.getOrElse(pod.pod))
pod.copy(pod = buildPodWithServiceAccount(driverServiceAccount, pod).getOrElse(pod.pod))
} else {
val driverPodWithMountedKubernetesCredentials =
new PodBuilder(pod.pod)
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark.deploy.k8s.features

import org.apache.spark.deploy.k8s.{KubernetesConf, SparkPod}
import org.apache.spark.deploy.k8s.Config.KUBERNETES_SERVICE_ACCOUNT_NAME
import org.apache.spark.deploy.k8s.KubernetesUtils.buildPodWithServiceAccount

private[spark] class ExecutorKubernetesCredentialsFeatureStep(kubernetesConf: KubernetesConf)
extends KubernetesFeatureConfigStep {
private lazy val driverServiceAccount = kubernetesConf.get(KUBERNETES_SERVICE_ACCOUNT_NAME)

override def configurePod(pod: SparkPod): SparkPod = {
pod.copy(
// if not setup by the pod template fallback to the driver's sa,
// last option is the default sa.
pod = if (Option(pod.pod.getSpec.getServiceAccount).isEmpty) {
buildPodWithServiceAccount(driverServiceAccount, pod).getOrElse(pod.pod)
} else {
pod.pod
})
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -50,8 +50,9 @@ private[spark] class KubernetesExecutorBuilder(
val volumesFeature = if (kubernetesConf.roleVolumes.nonEmpty) {
Seq(provideVolumesStep(kubernetesConf))
} else Nil
val credentialsFeature = Seq(new ExecutorKubernetesCredentialsFeatureStep(kubernetesConf))

val allFeatures = baseFeatures ++ secretFeature ++ secretEnvFeature ++ volumesFeature
val allFeatures = baseFeatures ++ secretFeature ++ secretEnvFeature ++ volumesFeature ++ credentialsFeature

var executorPod = SparkPod.initialPod()
for (feature <- allFeatures) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,13 @@ private[spark] trait BasicTestsSuite { k8sSuite: KubernetesSuite =>
})
}

test("All pods have the same service account by default", k8sTestTag) {
runSparkPiAndVerifyCompletion(
executorPodChecker = (executorPod: Pod) => {
doExecutorServiceAccountCheck(executorPod, kubernetesTestComponents.serviceAccountName)
})
}

test("Run extraJVMOptions check on driver", k8sTestTag) {
sparkAppConf
.set("spark.driver.extraJavaOptions", "-Dspark.test.foo=spark.test.bar")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -263,6 +263,10 @@ private[spark] class KubernetesSuite extends SparkFunSuite
=== baseMemory)
}

protected def doExecutorServiceAccountCheck(executorPod: Pod, account: String): Unit = {
doBasicExecutorPodCheck(executorPod)
assert(executorPod.getSpec.getServiceAccount == kubernetesTestComponents.serviceAccountName)
}

protected def doBasicDriverPyPodCheck(driverPod: Pod): Unit = {
assert(driverPod.getMetadata.getName === driverPodName)
Expand Down

0 comments on commit cc3a87c

Please sign in to comment.