diff --git a/dev/deps/spark-deps-hadoop-2-hive-2.3 b/dev/deps/spark-deps-hadoop-2-hive-2.3
index 3d21dfd3a68a9..8a9621e5364be 100644
--- a/dev/deps/spark-deps-hadoop-2-hive-2.3
+++ b/dev/deps/spark-deps-hadoop-2-hive-2.3
@@ -24,7 +24,6 @@ arrow-memory-core/9.0.0//arrow-memory-core-9.0.0.jar
arrow-memory-netty/9.0.0//arrow-memory-netty-9.0.0.jar
arrow-vector/9.0.0//arrow-vector-9.0.0.jar
audience-annotations/0.5.0//audience-annotations-0.5.0.jar
-automaton/1.11-8//automaton-1.11-8.jar
avro-ipc/1.11.1//avro-ipc-1.11.1.jar
avro-mapred/1.11.1//avro-mapred-1.11.1.jar
avro/1.11.1//avro-1.11.1.jar
@@ -69,7 +68,6 @@ error_prone_annotations/2.10.0//error_prone_annotations-2.10.0.jar
failureaccess/1.0.1//failureaccess-1.0.1.jar
flatbuffers-java/1.12.0//flatbuffers-java-1.12.0.jar
gcs-connector/hadoop2-2.2.7/shaded/gcs-connector-hadoop2-2.2.7-shaded.jar
-generex/1.0.2//generex-1.0.2.jar
gmetric4j/1.0.10//gmetric4j-1.0.10.jar
grpc-api/1.47.0//grpc-api-1.47.0.jar
grpc-context/1.47.0//grpc-context-1.47.0.jar
@@ -175,27 +173,30 @@ jsr305/3.0.0//jsr305-3.0.0.jar
jta/1.1//jta-1.1.jar
jul-to-slf4j/1.7.36//jul-to-slf4j-1.7.36.jar
kryo-shaded/4.0.2//kryo-shaded-4.0.2.jar
-kubernetes-client/5.12.3//kubernetes-client-5.12.3.jar
-kubernetes-model-admissionregistration/5.12.3//kubernetes-model-admissionregistration-5.12.3.jar
-kubernetes-model-apiextensions/5.12.3//kubernetes-model-apiextensions-5.12.3.jar
-kubernetes-model-apps/5.12.3//kubernetes-model-apps-5.12.3.jar
-kubernetes-model-autoscaling/5.12.3//kubernetes-model-autoscaling-5.12.3.jar
-kubernetes-model-batch/5.12.3//kubernetes-model-batch-5.12.3.jar
-kubernetes-model-certificates/5.12.3//kubernetes-model-certificates-5.12.3.jar
-kubernetes-model-common/5.12.3//kubernetes-model-common-5.12.3.jar
-kubernetes-model-coordination/5.12.3//kubernetes-model-coordination-5.12.3.jar
-kubernetes-model-core/5.12.3//kubernetes-model-core-5.12.3.jar
-kubernetes-model-discovery/5.12.3//kubernetes-model-discovery-5.12.3.jar
-kubernetes-model-events/5.12.3//kubernetes-model-events-5.12.3.jar
-kubernetes-model-extensions/5.12.3//kubernetes-model-extensions-5.12.3.jar
-kubernetes-model-flowcontrol/5.12.3//kubernetes-model-flowcontrol-5.12.3.jar
-kubernetes-model-metrics/5.12.3//kubernetes-model-metrics-5.12.3.jar
-kubernetes-model-networking/5.12.3//kubernetes-model-networking-5.12.3.jar
-kubernetes-model-node/5.12.3//kubernetes-model-node-5.12.3.jar
-kubernetes-model-policy/5.12.3//kubernetes-model-policy-5.12.3.jar
-kubernetes-model-rbac/5.12.3//kubernetes-model-rbac-5.12.3.jar
-kubernetes-model-scheduling/5.12.3//kubernetes-model-scheduling-5.12.3.jar
-kubernetes-model-storageclass/5.12.3//kubernetes-model-storageclass-5.12.3.jar
+kubernetes-client-api/6.1.1//kubernetes-client-api-6.1.1.jar
+kubernetes-client/6.1.1//kubernetes-client-6.1.1.jar
+kubernetes-httpclient-okhttp/6.1.1//kubernetes-httpclient-okhttp-6.1.1.jar
+kubernetes-model-admissionregistration/6.1.1//kubernetes-model-admissionregistration-6.1.1.jar
+kubernetes-model-apiextensions/6.1.1//kubernetes-model-apiextensions-6.1.1.jar
+kubernetes-model-apps/6.1.1//kubernetes-model-apps-6.1.1.jar
+kubernetes-model-autoscaling/6.1.1//kubernetes-model-autoscaling-6.1.1.jar
+kubernetes-model-batch/6.1.1//kubernetes-model-batch-6.1.1.jar
+kubernetes-model-certificates/6.1.1//kubernetes-model-certificates-6.1.1.jar
+kubernetes-model-common/6.1.1//kubernetes-model-common-6.1.1.jar
+kubernetes-model-coordination/6.1.1//kubernetes-model-coordination-6.1.1.jar
+kubernetes-model-core/6.1.1//kubernetes-model-core-6.1.1.jar
+kubernetes-model-discovery/6.1.1//kubernetes-model-discovery-6.1.1.jar
+kubernetes-model-events/6.1.1//kubernetes-model-events-6.1.1.jar
+kubernetes-model-extensions/6.1.1//kubernetes-model-extensions-6.1.1.jar
+kubernetes-model-flowcontrol/6.1.1//kubernetes-model-flowcontrol-6.1.1.jar
+kubernetes-model-gatewayapi/6.1.1//kubernetes-model-gatewayapi-6.1.1.jar
+kubernetes-model-metrics/6.1.1//kubernetes-model-metrics-6.1.1.jar
+kubernetes-model-networking/6.1.1//kubernetes-model-networking-6.1.1.jar
+kubernetes-model-node/6.1.1//kubernetes-model-node-6.1.1.jar
+kubernetes-model-policy/6.1.1//kubernetes-model-policy-6.1.1.jar
+kubernetes-model-rbac/6.1.1//kubernetes-model-rbac-6.1.1.jar
+kubernetes-model-scheduling/6.1.1//kubernetes-model-scheduling-6.1.1.jar
+kubernetes-model-storageclass/6.1.1//kubernetes-model-storageclass-6.1.1.jar
lapack/3.0.2//lapack-3.0.2.jar
leveldbjni-all/1.8//leveldbjni-all-1.8.jar
libfb303/0.9.3//libfb303-0.9.3.jar
diff --git a/dev/deps/spark-deps-hadoop-3-hive-2.3 b/dev/deps/spark-deps-hadoop-3-hive-2.3
index 1fab542bad938..c26dfa3f9ce7f 100644
--- a/dev/deps/spark-deps-hadoop-3-hive-2.3
+++ b/dev/deps/spark-deps-hadoop-3-hive-2.3
@@ -23,7 +23,6 @@ arrow-memory-core/9.0.0//arrow-memory-core-9.0.0.jar
arrow-memory-netty/9.0.0//arrow-memory-netty-9.0.0.jar
arrow-vector/9.0.0//arrow-vector-9.0.0.jar
audience-annotations/0.5.0//audience-annotations-0.5.0.jar
-automaton/1.11-8//automaton-1.11-8.jar
avro-ipc/1.11.1//avro-ipc-1.11.1.jar
avro-mapred/1.11.1//avro-mapred-1.11.1.jar
avro/1.11.1//avro-1.11.1.jar
@@ -66,7 +65,6 @@ error_prone_annotations/2.10.0//error_prone_annotations-2.10.0.jar
failureaccess/1.0.1//failureaccess-1.0.1.jar
flatbuffers-java/1.12.0//flatbuffers-java-1.12.0.jar
gcs-connector/hadoop3-2.2.7/shaded/gcs-connector-hadoop3-2.2.7-shaded.jar
-generex/1.0.2//generex-1.0.2.jar
gmetric4j/1.0.10//gmetric4j-1.0.10.jar
grpc-api/1.47.0//grpc-api-1.47.0.jar
grpc-context/1.47.0//grpc-context-1.47.0.jar
@@ -159,27 +157,30 @@ jsr305/3.0.0//jsr305-3.0.0.jar
jta/1.1//jta-1.1.jar
jul-to-slf4j/1.7.36//jul-to-slf4j-1.7.36.jar
kryo-shaded/4.0.2//kryo-shaded-4.0.2.jar
-kubernetes-client/5.12.3//kubernetes-client-5.12.3.jar
-kubernetes-model-admissionregistration/5.12.3//kubernetes-model-admissionregistration-5.12.3.jar
-kubernetes-model-apiextensions/5.12.3//kubernetes-model-apiextensions-5.12.3.jar
-kubernetes-model-apps/5.12.3//kubernetes-model-apps-5.12.3.jar
-kubernetes-model-autoscaling/5.12.3//kubernetes-model-autoscaling-5.12.3.jar
-kubernetes-model-batch/5.12.3//kubernetes-model-batch-5.12.3.jar
-kubernetes-model-certificates/5.12.3//kubernetes-model-certificates-5.12.3.jar
-kubernetes-model-common/5.12.3//kubernetes-model-common-5.12.3.jar
-kubernetes-model-coordination/5.12.3//kubernetes-model-coordination-5.12.3.jar
-kubernetes-model-core/5.12.3//kubernetes-model-core-5.12.3.jar
-kubernetes-model-discovery/5.12.3//kubernetes-model-discovery-5.12.3.jar
-kubernetes-model-events/5.12.3//kubernetes-model-events-5.12.3.jar
-kubernetes-model-extensions/5.12.3//kubernetes-model-extensions-5.12.3.jar
-kubernetes-model-flowcontrol/5.12.3//kubernetes-model-flowcontrol-5.12.3.jar
-kubernetes-model-metrics/5.12.3//kubernetes-model-metrics-5.12.3.jar
-kubernetes-model-networking/5.12.3//kubernetes-model-networking-5.12.3.jar
-kubernetes-model-node/5.12.3//kubernetes-model-node-5.12.3.jar
-kubernetes-model-policy/5.12.3//kubernetes-model-policy-5.12.3.jar
-kubernetes-model-rbac/5.12.3//kubernetes-model-rbac-5.12.3.jar
-kubernetes-model-scheduling/5.12.3//kubernetes-model-scheduling-5.12.3.jar
-kubernetes-model-storageclass/5.12.3//kubernetes-model-storageclass-5.12.3.jar
+kubernetes-client-api/6.1.1//kubernetes-client-api-6.1.1.jar
+kubernetes-client/6.1.1//kubernetes-client-6.1.1.jar
+kubernetes-httpclient-okhttp/6.1.1//kubernetes-httpclient-okhttp-6.1.1.jar
+kubernetes-model-admissionregistration/6.1.1//kubernetes-model-admissionregistration-6.1.1.jar
+kubernetes-model-apiextensions/6.1.1//kubernetes-model-apiextensions-6.1.1.jar
+kubernetes-model-apps/6.1.1//kubernetes-model-apps-6.1.1.jar
+kubernetes-model-autoscaling/6.1.1//kubernetes-model-autoscaling-6.1.1.jar
+kubernetes-model-batch/6.1.1//kubernetes-model-batch-6.1.1.jar
+kubernetes-model-certificates/6.1.1//kubernetes-model-certificates-6.1.1.jar
+kubernetes-model-common/6.1.1//kubernetes-model-common-6.1.1.jar
+kubernetes-model-coordination/6.1.1//kubernetes-model-coordination-6.1.1.jar
+kubernetes-model-core/6.1.1//kubernetes-model-core-6.1.1.jar
+kubernetes-model-discovery/6.1.1//kubernetes-model-discovery-6.1.1.jar
+kubernetes-model-events/6.1.1//kubernetes-model-events-6.1.1.jar
+kubernetes-model-extensions/6.1.1//kubernetes-model-extensions-6.1.1.jar
+kubernetes-model-flowcontrol/6.1.1//kubernetes-model-flowcontrol-6.1.1.jar
+kubernetes-model-gatewayapi/6.1.1//kubernetes-model-gatewayapi-6.1.1.jar
+kubernetes-model-metrics/6.1.1//kubernetes-model-metrics-6.1.1.jar
+kubernetes-model-networking/6.1.1//kubernetes-model-networking-6.1.1.jar
+kubernetes-model-node/6.1.1//kubernetes-model-node-6.1.1.jar
+kubernetes-model-policy/6.1.1//kubernetes-model-policy-6.1.1.jar
+kubernetes-model-rbac/6.1.1//kubernetes-model-rbac-6.1.1.jar
+kubernetes-model-scheduling/6.1.1//kubernetes-model-scheduling-6.1.1.jar
+kubernetes-model-storageclass/6.1.1//kubernetes-model-storageclass-6.1.1.jar
lapack/3.0.2//lapack-3.0.2.jar
leveldbjni-all/1.8//leveldbjni-all-1.8.jar
libfb303/0.9.3//libfb303-0.9.3.jar
diff --git a/pom.xml b/pom.xml
index 5fbd82ad57add..22abe0f7d3cb7 100644
--- a/pom.xml
+++ b/pom.xml
@@ -216,7 +216,7 @@
9.0.0
org.fusesource.leveldbjni
- 5.12.3
+ 6.1.1
${java.home}
diff --git a/resource-managers/kubernetes/core/pom.xml b/resource-managers/kubernetes/core/pom.xml
index 1c729cc441ee3..30d5f7d2bb8f8 100644
--- a/resource-managers/kubernetes/core/pom.xml
+++ b/resource-managers/kubernetes/core/pom.xml
@@ -75,6 +75,11 @@
test
+
+ io.fabric8
+ kubernetes-httpclient-okhttp
+ ${kubernetes-client.version}
+
io.fabric8
kubernetes-client
diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/SparkKubernetesClientFactory.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/SparkKubernetesClientFactory.scala
index 51040857c64f8..0b806f046402e 100644
--- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/SparkKubernetesClientFactory.scala
+++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/SparkKubernetesClientFactory.scala
@@ -21,7 +21,7 @@ import java.io.File
import com.fasterxml.jackson.databind.ObjectMapper
import com.google.common.base.Charsets
import com.google.common.io.Files
-import io.fabric8.kubernetes.client.{ConfigBuilder, DefaultKubernetesClient, KubernetesClient}
+import io.fabric8.kubernetes.client.{ConfigBuilder, KubernetesClient, KubernetesClientBuilder}
import io.fabric8.kubernetes.client.Config.KUBERNETES_REQUEST_RETRY_BACKOFFLIMIT_SYSTEM_PROPERTY
import io.fabric8.kubernetes.client.Config.autoConfigure
import io.fabric8.kubernetes.client.okhttp.OkHttpClientFactory
@@ -115,7 +115,10 @@ private[spark] object SparkKubernetesClientFactory extends Logging {
}
logDebug("Kubernetes client config: " +
new ObjectMapper().writerWithDefaultPrettyPrinter().writeValueAsString(config))
- new DefaultKubernetesClient(factoryWithCustomDispatcher.createHttpClient(config), config)
+ new KubernetesClientBuilder()
+ .withHttpClientFactory(factoryWithCustomDispatcher)
+ .withConfig(config)
+ .build()
}
private implicit class OptionConfigurableConfigBuilder(val configBuilder: ConfigBuilder)
diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/K8sSubmitOps.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/K8sSubmitOps.scala
index 0238d5eafdea1..2ce6181a2fe9c 100644
--- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/K8sSubmitOps.scala
+++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/K8sSubmitOps.scala
@@ -19,9 +19,9 @@ package org.apache.spark.deploy.k8s.submit
import scala.collection.JavaConverters._
import K8SSparkSubmitOperation.getGracePeriod
-import io.fabric8.kubernetes.api.model.{Pod, PodList}
+import io.fabric8.kubernetes.api.model.Pod
import io.fabric8.kubernetes.client.KubernetesClient
-import io.fabric8.kubernetes.client.dsl.{NonNamespaceOperation, PodResource}
+import io.fabric8.kubernetes.client.dsl.PodResource
import org.apache.spark.SparkConf
import org.apache.spark.deploy.SparkSubmitOperation
@@ -32,17 +32,15 @@ import org.apache.spark.deploy.k8s.KubernetesUtils.formatPodState
import org.apache.spark.util.{CommandLineLoggingUtils, Utils}
private sealed trait K8sSubmitOp extends CommandLineLoggingUtils {
- type NON_NAMESPACED_PODS =
- NonNamespaceOperation[Pod, PodList, PodResource[Pod]]
def executeOnPod(pName: String, namespace: Option[String], sparkConf: SparkConf)
(implicit client: KubernetesClient): Unit
def executeOnGlob(pods: List[Pod], ns: Option[String], sparkConf: SparkConf)
(implicit client: KubernetesClient): Unit
- def listPodsInNameSpace(namespace: Option[String])
- (implicit client: KubernetesClient): NON_NAMESPACED_PODS = {
+ def getPod(namespace: Option[String], name: String)
+ (implicit client: KubernetesClient): PodResource = {
namespace match {
- case Some(ns) => client.pods.inNamespace(ns)
- case None => client.pods
+ case Some(ns) => client.pods.inNamespace(ns).withName(name)
+ case None => client.pods.withName(name)
}
}
}
@@ -50,7 +48,7 @@ private sealed trait K8sSubmitOp extends CommandLineLoggingUtils {
private class KillApplication extends K8sSubmitOp {
override def executeOnPod(pName: String, namespace: Option[String], sparkConf: SparkConf)
(implicit client: KubernetesClient): Unit = {
- val podToDelete = listPodsInNameSpace(namespace).withName(pName)
+ val podToDelete = getPod(namespace, pName)
if (Option(podToDelete).isDefined) {
getGracePeriod(sparkConf) match {
@@ -66,19 +64,11 @@ private class KillApplication extends K8sSubmitOp {
(implicit client: KubernetesClient): Unit = {
if (pods.nonEmpty) {
pods.foreach { pod => printMessage(s"Deleting driver pod: ${pod.getMetadata.getName}.") }
- val listedPods = listPodsInNameSpace(namespace)
-
getGracePeriod(sparkConf) match {
case Some(period) =>
- // this is not using the batch api because no option is provided
- // when using the grace period.
- pods.foreach { pod =>
- listedPods
- .withName(pod.getMetadata.getName)
- .withGracePeriod(period)
- .delete()
- }
- case _ => listedPods.delete(pods.asJava)
+ client.resourceList(pods.asJava).withGracePeriod(period).delete()
+ case _ =>
+ client.resourceList(pods.asJava).delete()
}
} else {
printMessage("No applications found.")
@@ -89,7 +79,7 @@ private class KillApplication extends K8sSubmitOp {
private class ListStatus extends K8sSubmitOp {
override def executeOnPod(pName: String, namespace: Option[String], sparkConf: SparkConf)
(implicit client: KubernetesClient): Unit = {
- val pod = listPodsInNameSpace(namespace).withName(pName).get()
+ val pod = getPod(namespace, pName).get()
if (Option(pod).isDefined) {
printMessage("Application status (driver): " +
Option(pod).map(formatPodState).getOrElse("unknown."))
@@ -145,13 +135,12 @@ private[spark] class K8SSparkSubmitOperation extends SparkSubmitOperation
.pods
}
val pods = ops
+ .withLabel(SPARK_ROLE_LABEL, SPARK_POD_DRIVER_ROLE)
.list()
.getItems
.asScala
.filter { pod =>
- val meta = pod.getMetadata
- meta.getName.startsWith(pName.stripSuffix("*")) &&
- meta.getLabels.get(SPARK_ROLE_LABEL) == SPARK_POD_DRIVER_ROLE
+ pod.getMetadata.getName.startsWith(pName.stripSuffix("*"))
}.toList
op.executeOnGlob(pods, namespace, sparkConf)
} else {
diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/KubernetesClientApplication.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/KubernetesClientApplication.scala
index 3a3ab081fe843..14d3c4d1f42f4 100644
--- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/KubernetesClientApplication.scala
+++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/KubernetesClientApplication.scala
@@ -149,7 +149,8 @@ private[spark] class Client(
var watch: Watch = null
var createdDriverPod: Pod = null
try {
- createdDriverPod = kubernetesClient.pods().create(resolvedDriverPod)
+ createdDriverPod =
+ kubernetesClient.pods().inNamespace(conf.namespace).resource(resolvedDriverPod).create()
} catch {
case NonFatal(e) =>
kubernetesClient.resourceList(preKubernetesResources: _*).delete()
@@ -163,7 +164,7 @@ private[spark] class Client(
kubernetesClient.resourceList(preKubernetesResources: _*).createOrReplace()
} catch {
case NonFatal(e) =>
- kubernetesClient.pods().delete(createdDriverPod)
+ kubernetesClient.pods().resource(createdDriverPod).delete()
kubernetesClient.resourceList(preKubernetesResources: _*).delete()
throw e
}
@@ -175,7 +176,7 @@ private[spark] class Client(
kubernetesClient.resourceList(otherKubernetesResources: _*).createOrReplace()
} catch {
case NonFatal(e) =>
- kubernetesClient.pods().delete(createdDriverPod)
+ kubernetesClient.pods().resource(createdDriverPod).delete()
throw e
}
@@ -185,6 +186,7 @@ private[spark] class Client(
while (true) {
val podWithName = kubernetesClient
.pods()
+ .inNamespace(conf.namespace)
.withName(driverPodName)
// Reset resource to old before we start the watch, this is important for race conditions
watcher.reset()
diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsAllocator.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsAllocator.scala
index 9bdc30e446646..524ab0c845c6d 100644
--- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsAllocator.scala
+++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsAllocator.scala
@@ -76,6 +76,7 @@ class ExecutorPodsAllocator(
val driverPod = kubernetesDriverPodName
.map(name => Option(kubernetesClient.pods()
+ .inNamespace(namespace)
.withName(name)
.get())
.getOrElse(throw new SparkException(
@@ -112,6 +113,7 @@ class ExecutorPodsAllocator(
Utils.tryLogNonFatalError {
kubernetesClient
.pods()
+ .inNamespace(namespace)
.withName(pod.getMetadata.getName)
.waitUntilReady(driverPodReadinessTimeout, TimeUnit.SECONDS)
}
@@ -185,6 +187,7 @@ class ExecutorPodsAllocator(
Utils.tryLogNonFatalError {
kubernetesClient
.pods()
+ .inNamespace(namespace)
.withLabel(SPARK_APP_ID_LABEL, applicationId)
.withLabel(SPARK_ROLE_LABEL, SPARK_POD_EXECUTOR_ROLE)
.withLabelIn(SPARK_EXECUTOR_ID_LABEL, timedOut.toSeq.map(_.toString): _*)
@@ -299,6 +302,7 @@ class ExecutorPodsAllocator(
Utils.tryLogNonFatalError {
kubernetesClient
.pods()
+ .inNamespace(namespace)
.withField("status.phase", "Pending")
.withLabel(SPARK_APP_ID_LABEL, applicationId)
.withLabel(SPARK_ROLE_LABEL, SPARK_POD_EXECUTOR_ROLE)
@@ -363,6 +367,7 @@ class ExecutorPodsAllocator(
try {
val createdPVCs = kubernetesClient
.persistentVolumeClaims
+ .inNamespace(namespace)
.withLabel("spark-app-selector", applicationId)
.list()
.getItems
@@ -406,7 +411,8 @@ class ExecutorPodsAllocator(
.build()
val resources = replacePVCsIfNeeded(
podWithAttachedContainer, resolvedExecutorSpec.executorKubernetesResources, reusablePVCs)
- val createdExecutorPod = kubernetesClient.pods().create(podWithAttachedContainer)
+ val createdExecutorPod =
+ kubernetesClient.pods().inNamespace(namespace).resource(podWithAttachedContainer).create()
try {
addOwnerReference(createdExecutorPod, resources)
resources
@@ -418,13 +424,16 @@ class ExecutorPodsAllocator(
val pvc = resource.asInstanceOf[PersistentVolumeClaim]
logInfo(s"Trying to create PersistentVolumeClaim ${pvc.getMetadata.getName} with " +
s"StorageClass ${pvc.getSpec.getStorageClassName}")
- kubernetesClient.persistentVolumeClaims().create(pvc)
+ kubernetesClient.persistentVolumeClaims().inNamespace(namespace).resource(pvc).create()
}
newlyCreatedExecutors(newExecutorId) = (resourceProfileId, clock.getTimeMillis())
logDebug(s"Requested executor with id $newExecutorId from Kubernetes.")
} catch {
case NonFatal(e) =>
- kubernetesClient.pods().delete(createdExecutorPod)
+ kubernetesClient.pods()
+ .inNamespace(namespace)
+ .resource(createdExecutorPod)
+ .delete()
throw e
}
}
@@ -475,6 +484,7 @@ class ExecutorPodsAllocator(
Utils.tryLogNonFatalError {
kubernetesClient
.pods()
+ .inNamespace(namespace)
.withLabel(SPARK_APP_ID_LABEL, applicationId)
.withLabel(SPARK_ROLE_LABEL, SPARK_POD_EXECUTOR_ROLE)
.delete()
diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsLifecycleManager.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsLifecycleManager.scala
index e255de4d2dd9e..7d32b35eab95f 100644
--- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsLifecycleManager.scala
+++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsLifecycleManager.scala
@@ -17,6 +17,7 @@
package org.apache.spark.scheduler.cluster.k8s
import java.util.concurrent.TimeUnit
+import java.util.function.UnaryOperator
import com.google.common.cache.CacheBuilder
import io.fabric8.kubernetes.api.model.{Pod, PodBuilder}
@@ -57,6 +58,8 @@ private[spark] class ExecutorPodsLifecycleManager(
// This set is cleaned up when a snapshot containing the updated pod is processed.
private val inactivatedPods = mutable.HashSet.empty[Long]
+ private val namespace = conf.get(KUBERNETES_NAMESPACE)
+
def start(schedulerBackend: KubernetesClusterSchedulerBackend): Unit = {
val eventProcessingInterval = conf.get(KUBERNETES_EXECUTOR_EVENT_PROCESSING_INTERVAL)
snapshotsStore.addSubscriber(eventProcessingInterval) {
@@ -168,6 +171,7 @@ private[spark] class ExecutorPodsLifecycleManager(
// of getting rid of the pod is what matters.
kubernetesClient
.pods()
+ .inNamespace(namespace)
.withName(updatedPod.getMetadata.getName)
.delete()
} else if (!inactivatedPods.contains(execId) && !isPodInactive(updatedPod)) {
@@ -175,16 +179,11 @@ private[spark] class ExecutorPodsLifecycleManager(
// can be ignored in future updates from the API server.
logDebug(s"Marking executor ${updatedPod.getMetadata.getName} as inactive since " +
"deletion is disabled.")
- val inactivatedPod = new PodBuilder(updatedPod)
- .editMetadata()
- .addToLabels(Map(SPARK_EXECUTOR_INACTIVE_LABEL -> "true").asJava)
- .endMetadata()
- .build()
-
kubernetesClient
.pods()
+ .inNamespace(namespace)
.withName(updatedPod.getMetadata.getName)
- .patch(inactivatedPod)
+ .edit(executorInactivationFn)
inactivatedPods += execId
}
@@ -274,4 +273,9 @@ private object ExecutorPodsLifecycleManager {
s"${code}${humanStr}"
}
+ def executorInactivationFn: UnaryOperator[Pod] = (p: Pod) => new PodBuilder(p)
+ .editOrNewMetadata()
+ .addToLabels(SPARK_EXECUTOR_INACTIVE_LABEL, "true")
+ .endMetadata()
+ .build()
}
diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsWatchSnapshotSource.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsWatchSnapshotSource.scala
index a334ece565377..4809222650d82 100644
--- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsWatchSnapshotSource.scala
+++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsWatchSnapshotSource.scala
@@ -25,6 +25,7 @@ import io.fabric8.kubernetes.client.Watcher.Action
import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.annotation.{DeveloperApi, Since, Stable}
import org.apache.spark.deploy.k8s.Config.KUBERNETES_EXECUTOR_ENABLE_API_WATCHER
+import org.apache.spark.deploy.k8s.Config.KUBERNETES_NAMESPACE
import org.apache.spark.deploy.k8s.Constants._
import org.apache.spark.internal.Logging
import org.apache.spark.util.Utils
@@ -46,6 +47,8 @@ class ExecutorPodsWatchSnapshotSource(
private var watchConnection: Closeable = _
private val enableWatching = conf.get(KUBERNETES_EXECUTOR_ENABLE_API_WATCHER)
+ private val namespace = conf.get(KUBERNETES_NAMESPACE)
+
// If we're constructed with the old API get the SparkConf from the running SparkContext.
def this(snapshotsStore: ExecutorPodsSnapshotsStore, kubernetesClient: KubernetesClient) = {
this(snapshotsStore, kubernetesClient, SparkContext.getOrCreate().conf)
@@ -58,6 +61,7 @@ class ExecutorPodsWatchSnapshotSource(
logDebug(s"Starting to watch for pods with labels $SPARK_APP_ID_LABEL=$applicationId," +
s" $SPARK_ROLE_LABEL=$SPARK_POD_EXECUTOR_ROLE.")
watchConnection = kubernetesClient.pods()
+ .inNamespace(namespace)
.withLabel(SPARK_APP_ID_LABEL, applicationId)
.withLabel(SPARK_ROLE_LABEL, SPARK_POD_EXECUTOR_ROLE)
.watch(new ExecutorPodsWatcher())
diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesClusterSchedulerBackend.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesClusterSchedulerBackend.scala
index 985b8b7bef051..4a8cb6d705051 100644
--- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesClusterSchedulerBackend.scala
+++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesClusterSchedulerBackend.scala
@@ -19,7 +19,6 @@ package org.apache.spark.scheduler.cluster.k8s
import java.util.concurrent.{ScheduledExecutorService, TimeUnit}
import java.util.concurrent.atomic.AtomicInteger
-import scala.collection.JavaConverters._
import scala.concurrent.Future
import io.fabric8.kubernetes.api.model.Pod
@@ -69,6 +68,8 @@ private[spark] class KubernetesClusterSchedulerBackend(
private val defaultProfile = scheduler.sc.resourceProfileManager.defaultResourceProfile
+ private val namespace = conf.get(KUBERNETES_NAMESPACE)
+
// Allow removeExecutor to be accessible by ExecutorPodsLifecycleEventHandler
private[k8s] def doRemoveExecutor(executorId: String, reason: ExecutorLossReason): Unit = {
removeExecutor(executorId, reason)
@@ -77,7 +78,7 @@ private[spark] class KubernetesClusterSchedulerBackend(
private def setUpExecutorConfigMap(driverPod: Option[Pod]): Unit = {
val configMapName = KubernetesClientUtils.configMapNameExecutor
val resolvedExecutorProperties =
- Map(KUBERNETES_NAMESPACE.key -> conf.get(KUBERNETES_NAMESPACE))
+ Map(KUBERNETES_NAMESPACE.key -> namespace)
val confFilesMap = KubernetesClientUtils
.buildSparkConfDirFilesMap(configMapName, conf, resolvedExecutorProperties) ++
resolvedExecutorProperties
@@ -85,7 +86,7 @@ private[spark] class KubernetesClusterSchedulerBackend(
Map(SPARK_APP_ID_LABEL -> applicationId(), SPARK_ROLE_LABEL -> SPARK_POD_EXECUTOR_ROLE)
val configMap = KubernetesClientUtils.buildConfigMap(configMapName, confFilesMap, labels)
KubernetesUtils.addOwnerReference(driverPod.orNull, Seq(configMap))
- kubernetesClient.configMaps().create(configMap)
+ kubernetesClient.configMaps().inNamespace(namespace).resource(configMap).create()
}
/**
@@ -136,6 +137,7 @@ private[spark] class KubernetesClusterSchedulerBackend(
Utils.tryLogNonFatalError {
kubernetesClient
.services()
+ .inNamespace(namespace)
.withLabel(SPARK_APP_ID_LABEL, applicationId())
.delete()
}
@@ -145,6 +147,7 @@ private[spark] class KubernetesClusterSchedulerBackend(
Utils.tryLogNonFatalError {
kubernetesClient
.persistentVolumeClaims()
+ .inNamespace(namespace)
.withLabel(SPARK_APP_ID_LABEL, applicationId())
.delete()
}
@@ -158,6 +161,7 @@ private[spark] class KubernetesClusterSchedulerBackend(
Utils.tryLogNonFatalError {
kubernetesClient
.configMaps()
+ .inNamespace(namespace)
.withLabel(SPARK_APP_ID_LABEL, applicationId())
.withLabel(SPARK_ROLE_LABEL, SPARK_POD_EXECUTOR_ROLE)
.delete()
@@ -193,22 +197,19 @@ private[spark] class KubernetesClusterSchedulerBackend(
conf.get(KUBERNETES_EXECUTOR_DECOMMISSION_LABEL).foreach { label =>
val labelTask = new Runnable() {
override def run(): Unit = Utils.tryLogNonFatalError {
-
- val podsToLabel = kubernetesClient.pods()
+ kubernetesClient.pods()
+ .inNamespace(namespace)
.withLabel(SPARK_APP_ID_LABEL, applicationId())
.withLabel(SPARK_ROLE_LABEL, SPARK_POD_EXECUTOR_ROLE)
.withLabelIn(SPARK_EXECUTOR_ID_LABEL, execIds: _*)
- .list().getItems().asScala
-
- podsToLabel.foreach { pod =>
- kubernetesClient.pods()
- .inNamespace(pod.getMetadata.getNamespace)
- .withName(pod.getMetadata.getName)
- .edit({p: Pod => new PodBuilder(p).editMetadata()
- .addToLabels(label,
- conf.get(KUBERNETES_EXECUTOR_DECOMMISSION_LABEL_VALUE).getOrElse(""))
- .endMetadata()
- .build()})
+ .resources()
+ .forEach { podResource =>
+ podResource.edit({ p: Pod =>
+ new PodBuilder(p).editOrNewMetadata()
+ .addToLabels(label,
+ conf.get(KUBERNETES_EXECUTOR_DECOMMISSION_LABEL_VALUE).getOrElse(""))
+ .endMetadata()
+ .build()})
}
}
}
@@ -246,6 +247,7 @@ private[spark] class KubernetesClusterSchedulerBackend(
override def run(): Unit = Utils.tryLogNonFatalError {
val running = kubernetesClient
.pods()
+ .inNamespace(namespace)
.withField("status.phase", "Running")
.withLabel(SPARK_APP_ID_LABEL, applicationId())
.withLabel(SPARK_ROLE_LABEL, SPARK_POD_EXECUTOR_ROLE)
@@ -302,6 +304,7 @@ private[spark] class KubernetesClusterSchedulerBackend(
override def run(): Unit = Utils.tryLogNonFatalError {
// Label the pod with it's exec ID
kubernetesClient.pods()
+ .inNamespace(namespace)
.withName(x.podName)
.edit({p: Pod => new PodBuilder(p).editMetadata()
.addToLabels(SPARK_EXECUTOR_ID_LABEL, newId)
diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/StatefulSetPodsAllocator.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/StatefulSetPodsAllocator.scala
index 294ee70168b23..5eeab5501e47f 100644
--- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/StatefulSetPodsAllocator.scala
+++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/StatefulSetPodsAllocator.scala
@@ -53,6 +53,7 @@ class StatefulSetPodsAllocator(
val driverPod = kubernetesDriverPodName
.map(name => Option(kubernetesClient.pods()
+ .inNamespace(namespace)
.withName(name)
.get())
.getOrElse(throw new SparkException(
@@ -69,6 +70,7 @@ class StatefulSetPodsAllocator(
Utils.tryLogNonFatalError {
kubernetesClient
.pods()
+ .inNamespace(namespace)
.withName(pod.getMetadata.getName)
.waitUntilReady(driverPodReadinessTimeout, TimeUnit.SECONDS)
}
@@ -99,7 +101,7 @@ class StatefulSetPodsAllocator(
applicationId: String,
resourceProfileId: Int): Unit = {
if (setsCreated.contains(resourceProfileId)) {
- val statefulset = kubernetesClient.apps().statefulSets().withName(
+ val statefulset = kubernetesClient.apps().statefulSets().inNamespace(namespace).withName(
setName(applicationId, resourceProfileId: Int))
statefulset.scale(expected, false /* wait */)
} else {
@@ -169,7 +171,7 @@ class StatefulSetPodsAllocator(
val statefulSet = new io.fabric8.kubernetes.api.model.apps.StatefulSetBuilder()
.withNewMetadata()
.withName(setName(applicationId, resourceProfileId))
- .withNamespace(conf.get(KUBERNETES_NAMESPACE))
+ .withNamespace(namespace)
.endMetadata()
.withNewSpec()
.withPodManagementPolicy("Parallel")
@@ -185,7 +187,7 @@ class StatefulSetPodsAllocator(
.build()
addOwnerReference(driverPod.get, Seq(statefulSet))
- kubernetesClient.apps().statefulSets().create(statefulSet)
+ kubernetesClient.apps().statefulSets().inNamespace(namespace).resource(statefulSet).create()
setsCreated += (resourceProfileId)
}
}
@@ -194,7 +196,12 @@ class StatefulSetPodsAllocator(
// Cleanup the statefulsets when we stop
setsCreated.foreach { rpid =>
Utils.tryLogNonFatalError {
- kubernetesClient.apps().statefulSets().withName(setName(applicationId, rpid)).delete()
+ kubernetesClient
+ .apps()
+ .statefulSets()
+ .inNamespace(namespace)
+ .withName(setName(applicationId, rpid))
+ .delete()
}
}
}
diff --git a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/Fabric8Aliases.scala b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/Fabric8Aliases.scala
index 14405da72810c..1a4bc9781da2f 100644
--- a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/Fabric8Aliases.scala
+++ b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/Fabric8Aliases.scala
@@ -17,19 +17,31 @@
package org.apache.spark.deploy.k8s
import io.fabric8.kubernetes.api.model.{ConfigMap, ConfigMapList, HasMetadata, PersistentVolumeClaim, PersistentVolumeClaimList, Pod, PodList}
-import io.fabric8.kubernetes.client.dsl.{FilterWatchListDeletable, MixedOperation, NamespaceListVisitFromServerGetDeleteRecreateWaitApplicable, PodResource, Resource}
+import io.fabric8.kubernetes.api.model.apps.StatefulSet
+import io.fabric8.kubernetes.api.model.apps.StatefulSetList
+import io.fabric8.kubernetes.client.dsl.{FilterWatchListDeletable, MixedOperation, NamespaceListVisitFromServerGetDeleteRecreateWaitApplicable, NonNamespaceOperation, PodResource, Resource, RollableScalableResource}
object Fabric8Aliases {
- type PODS = MixedOperation[Pod, PodList, PodResource[Pod]]
+ type PODS = MixedOperation[Pod, PodList, PodResource]
+ type PODS_WITH_NAMESPACE = NonNamespaceOperation[Pod, PodList, PodResource]
type CONFIG_MAPS = MixedOperation[
ConfigMap, ConfigMapList, Resource[ConfigMap]]
- type LABELED_PODS = FilterWatchListDeletable[Pod, PodList]
- type LABELED_CONFIG_MAPS = FilterWatchListDeletable[ConfigMap, ConfigMapList]
- type SINGLE_POD = PodResource[Pod]
+ type CONFIG_MAPS_WITH_NAMESPACE =
+ NonNamespaceOperation[ConfigMap, ConfigMapList, Resource[ConfigMap]]
+ type CONFIG_MAPS_RESOURCE = Resource[ConfigMap]
+ type LABELED_PODS = FilterWatchListDeletable[Pod, PodList, PodResource]
+ type LABELED_CONFIG_MAPS = FilterWatchListDeletable[ConfigMap, ConfigMapList, Resource[ConfigMap]]
+ type SINGLE_POD = PodResource
type RESOURCE_LIST = NamespaceListVisitFromServerGetDeleteRecreateWaitApplicable[
HasMetadata]
+ type STATEFUL_SET_RES = RollableScalableResource[StatefulSet]
+ type STATEFUL_SETS = MixedOperation[StatefulSet, StatefulSetList, STATEFUL_SET_RES]
+ type STATEFUL_SETS_NAMESPACED =
+ NonNamespaceOperation[StatefulSet, StatefulSetList, STATEFUL_SET_RES]
type PERSISTENT_VOLUME_CLAIMS = MixedOperation[PersistentVolumeClaim, PersistentVolumeClaimList,
Resource[PersistentVolumeClaim]]
- type LABELED_PERSISTENT_VOLUME_CLAIMS =
- FilterWatchListDeletable[PersistentVolumeClaim, PersistentVolumeClaimList]
+ type PVC_WITH_NAMESPACE = NonNamespaceOperation[PersistentVolumeClaim, PersistentVolumeClaimList,
+ Resource[PersistentVolumeClaim]]
+ type LABELED_PERSISTENT_VOLUME_CLAIMS = FilterWatchListDeletable[PersistentVolumeClaim,
+ PersistentVolumeClaimList, Resource[PersistentVolumeClaim]]
}
diff --git a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/PodBuilderSuite.scala b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/PodBuilderSuite.scala
index 642c18db541e1..dc2d354af9891 100644
--- a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/PodBuilderSuite.scala
+++ b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/PodBuilderSuite.scala
@@ -20,12 +20,13 @@ import java.io.File
import io.fabric8.kubernetes.api.model.{Config => _, _}
import io.fabric8.kubernetes.client.KubernetesClient
-import io.fabric8.kubernetes.client.dsl.{MixedOperation, PodResource}
+import io.fabric8.kubernetes.client.dsl.PodResource
import org.mockito.ArgumentMatchers.any
import org.mockito.Mockito.{mock, never, verify, when}
import scala.collection.JavaConverters._
import org.apache.spark.{SparkConf, SparkException, SparkFunSuite}
+import org.apache.spark.deploy.k8s.Fabric8Aliases._
import org.apache.spark.deploy.k8s.features.{KubernetesDriverCustomFeatureConfigStep, KubernetesExecutorCustomFeatureConfigStep, KubernetesFeatureConfigStep}
import org.apache.spark.internal.config.ConfigEntry
@@ -156,9 +157,8 @@ abstract class PodBuilderSuite extends SparkFunSuite {
protected def mockKubernetesClient(pod: Pod = podWithSupportedFeatures()): KubernetesClient = {
val kubernetesClient = mock(classOf[KubernetesClient])
- val pods =
- mock(classOf[MixedOperation[Pod, PodList, PodResource[Pod]]])
- val podResource = mock(classOf[PodResource[Pod]])
+ val pods = mock(classOf[PODS])
+ val podResource = mock(classOf[PodResource])
when(kubernetesClient.pods()).thenReturn(pods)
when(pods.load(any(classOf[File]))).thenReturn(podResource)
when(podResource.get()).thenReturn(pod)
diff --git a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/submit/ClientSuite.scala b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/submit/ClientSuite.scala
index 12a5202b9d067..a8c25ab5002c0 100644
--- a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/submit/ClientSuite.scala
+++ b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/submit/ClientSuite.scala
@@ -149,7 +149,10 @@ class ClientSuite extends SparkFunSuite with BeforeAndAfter {
private var podOperations: PODS = _
@Mock
- private var namedPods: PodResource[Pod] = _
+ private var podsWithNamespace: PODS_WITH_NAMESPACE = _
+
+ @Mock
+ private var namedPods: PodResource = _
@Mock
private var loggingPodStatusWatcher: LoggingPodStatusWatcher = _
@@ -170,11 +173,13 @@ class ClientSuite extends SparkFunSuite with BeforeAndAfter {
resourceNamePrefix = Some(KUBERNETES_RESOURCE_PREFIX))
when(driverBuilder.buildFromFeatures(kconf, kubernetesClient)).thenReturn(BUILT_KUBERNETES_SPEC)
when(kubernetesClient.pods()).thenReturn(podOperations)
- when(podOperations.withName(POD_NAME)).thenReturn(namedPods)
+ when(podOperations.inNamespace(kconf.namespace)).thenReturn(podsWithNamespace)
+ when(podsWithNamespace.withName(POD_NAME)).thenReturn(namedPods)
createdPodArgumentCaptor = ArgumentCaptor.forClass(classOf[Pod])
createdResourcesArgumentCaptor = ArgumentCaptor.forClass(classOf[HasMetadata])
- when(podOperations.create(fullExpectedPod())).thenReturn(podWithOwnerReference())
+ when(podsWithNamespace.resource(fullExpectedPod())).thenReturn(namedPods)
+ when(namedPods.create()).thenReturn(podWithOwnerReference())
when(namedPods.watch(loggingPodStatusWatcher)).thenReturn(mock[Watch])
when(loggingPodStatusWatcher.watchOrStop(kconf.namespace + ":" + POD_NAME)).thenReturn(true)
doReturn(resourceList)
@@ -189,7 +194,8 @@ class ClientSuite extends SparkFunSuite with BeforeAndAfter {
kubernetesClient,
loggingPodStatusWatcher)
submissionClient.run()
- verify(podOperations).create(fullExpectedPod())
+ verify(podsWithNamespace).resource(fullExpectedPod())
+ verify(namedPods).create()
}
test("The client should create Kubernetes resources") {
@@ -298,8 +304,9 @@ class ClientSuite extends SparkFunSuite with BeforeAndAfter {
val expectedKeyToPaths = (expectedConfFiles.map(x => new KeyToPath(x, 420, x)).toList ++
List(KEY_TO_PATH)).sortBy(x => x.getKey)
- when(podOperations.create(fullExpectedPod(expectedKeyToPaths)))
- .thenReturn(podWithOwnerReference(expectedKeyToPaths))
+ when(podsWithNamespace.resource(fullExpectedPod(expectedKeyToPaths)))
+ .thenReturn(namedPods)
+ when(namedPods.create()).thenReturn(podWithOwnerReference(expectedKeyToPaths))
kconf = KubernetesTestConf.createDriverConf(sparkConf = sparkConf,
resourceNamePrefix = Some(KUBERNETES_RESOURCE_PREFIX))
diff --git a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/submit/K8sSubmitOpSuite.scala b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/submit/K8sSubmitOpSuite.scala
index 142d3fe112d69..3d30fb320d641 100644
--- a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/submit/K8sSubmitOpSuite.scala
+++ b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/submit/K8sSubmitOpSuite.scala
@@ -17,12 +17,13 @@
package org.apache.spark.deploy.k8s.submit
import java.io.PrintStream
+import java.util.Arrays
import scala.collection.JavaConverters._
import io.fabric8.kubernetes.api.model._
-import io.fabric8.kubernetes.client.KubernetesClient
-import io.fabric8.kubernetes.client.dsl.PodResource
+import io.fabric8.kubernetes.client.{KubernetesClient, PropagationPolicyConfigurable}
+import io.fabric8.kubernetes.client.dsl.{Deletable, NamespaceListVisitFromServerGetDeleteRecreateWaitApplicable, PodResource}
import org.mockito.{ArgumentMatchers, Mock, MockitoAnnotations}
import org.mockito.Mockito.{times, verify, when}
import org.scalatest.BeforeAndAfter
@@ -30,9 +31,10 @@ import org.scalatest.BeforeAndAfter
import org.apache.spark.{SparkConf, SparkFunSuite}
import org.apache.spark.deploy.k8s.Config.KUBERNETES_SUBMIT_GRACE_PERIOD
import org.apache.spark.deploy.k8s.Constants.{SPARK_APP_ID_LABEL, SPARK_POD_DRIVER_ROLE, SPARK_ROLE_LABEL}
-import org.apache.spark.deploy.k8s.Fabric8Aliases.PODS
+import org.apache.spark.deploy.k8s.Fabric8Aliases.{PODS, PODS_WITH_NAMESPACE}
import org.apache.spark.scheduler.cluster.k8s.ExecutorLifecycleTestUtils.TEST_SPARK_APP_ID
+
class K8sSubmitOpSuite extends SparkFunSuite with BeforeAndAfter {
private val driverPodName1 = "driver1"
private val driverPodName2 = "driver2"
@@ -45,28 +47,39 @@ class K8sSubmitOpSuite extends SparkFunSuite with BeforeAndAfter {
private var podOperations: PODS = _
@Mock
- private var driverPodOperations1: PodResource[Pod] = _
+ private var podsWithNamespace: PODS_WITH_NAMESPACE = _
+
+ @Mock
+ private var driverPodOperations1: PodResource = _
@Mock
- private var driverPodOperations2: PodResource[Pod] = _
+ private var driverPodOperations2: PodResource = _
@Mock
private var kubernetesClient: KubernetesClient = _
+ @Mock
+ private var deletable: PropagationPolicyConfigurable[_ <: Deletable] = _
+
+ @Mock
+ private var deletableList:
+ NamespaceListVisitFromServerGetDeleteRecreateWaitApplicable[HasMetadata] = _
+
@Mock
private var err: PrintStream = _
+ private def doReturn(value: Any) = org.mockito.Mockito.doReturn(value, Seq.empty: _*)
+
before {
MockitoAnnotations.openMocks(this).close()
when(kubernetesClient.pods()).thenReturn(podOperations)
- when(podOperations.inNamespace(namespace)).thenReturn(podOperations)
- when(podOperations.delete(podList.asJava)).thenReturn(true)
- when(podOperations.withName(driverPodName1)).thenReturn(driverPodOperations1)
- when(podOperations.withName(driverPodName2)).thenReturn(driverPodOperations2)
+ when(podOperations.inNamespace(namespace)).thenReturn(podsWithNamespace)
+ when(podsWithNamespace.withName(driverPodName1)).thenReturn(driverPodOperations1)
+ when(podsWithNamespace.withName(driverPodName2)).thenReturn(driverPodOperations2)
when(driverPodOperations1.get).thenReturn(driverPod1)
- when(driverPodOperations1.delete()).thenReturn(true)
+ when(driverPodOperations1.delete()).thenReturn(Arrays.asList(new StatusDetails))
when(driverPodOperations2.get).thenReturn(driverPod2)
- when(driverPodOperations2.delete()).thenReturn(true)
+ when(driverPodOperations2.delete()).thenReturn(Arrays.asList(new StatusDetails))
}
test("List app status") {
@@ -101,18 +114,19 @@ class K8sSubmitOpSuite extends SparkFunSuite with BeforeAndAfter {
implicit val kubeClient: KubernetesClient = kubernetesClient
val killApp = new KillApplication
val conf = new SparkConf().set(KUBERNETES_SUBMIT_GRACE_PERIOD, 1L)
- when(driverPodOperations1.withGracePeriod(1L)).thenReturn(driverPodOperations1)
+ doReturn(deletable).when(driverPodOperations1).withGracePeriod(1L)
killApp.executeOnPod(driverPodName1, Option(namespace), conf)
verify(driverPodOperations1, times(1)).withGracePeriod(1L)
- verify(driverPodOperations1, times(1)).delete()
+ verify(deletable, times(1)).delete()
}
test("Kill multiple apps with glob without gracePeriod") {
implicit val kubeClient: KubernetesClient = kubernetesClient
val killApp = new KillApplication
killApp.printStream = err
+ doReturn(deletableList).when(kubernetesClient).resourceList(podList.asJava)
killApp.executeOnGlob(podList, Option(namespace), new SparkConf())
- verify(podOperations, times(1)).delete(podList.asJava)
+ verify(deletableList, times(1)).delete()
// scalastyle:off
verify(err).println(ArgumentMatchers.eq(s"Deleting driver pod: $driverPodName1."))
verify(err).println(ArgumentMatchers.eq(s"Deleting driver pod: $driverPodName2."))
diff --git a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsAllocatorSuite.scala b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsAllocatorSuite.scala
index 7ce0b57d1e9f3..caec9ef920121 100644
--- a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsAllocatorSuite.scala
+++ b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsAllocatorSuite.scala
@@ -26,7 +26,7 @@ import io.fabric8.kubernetes.api.model._
import io.fabric8.kubernetes.client.{KubernetesClient, KubernetesClientException}
import io.fabric8.kubernetes.client.dsl.PodResource
import org.mockito.{Mock, MockitoAnnotations}
-import org.mockito.ArgumentMatchers.{any, eq => meq}
+import org.mockito.ArgumentMatchers.{any, anyString, eq => meq}
import org.mockito.Mockito.{never, times, verify, when}
import org.mockito.invocation.InvocationOnMock
import org.mockito.stubbing.Answer
@@ -77,9 +77,18 @@ class ExecutorPodsAllocatorSuite extends SparkFunSuite with BeforeAndAfter {
@Mock
private var podOperations: PODS = _
+ @Mock
+ private var podsWithNamespace: PODS_WITH_NAMESPACE = _
+
+ @Mock
+ private var podResource: PodResource = _
+
@Mock
private var persistentVolumeClaims: PERSISTENT_VOLUME_CLAIMS = _
+ @Mock
+ private var pvcWithNamespace: PVC_WITH_NAMESPACE = _
+
@Mock
private var labeledPersistentVolumeClaims: LABELED_PERSISTENT_VOLUME_CLAIMS = _
@@ -90,7 +99,7 @@ class ExecutorPodsAllocatorSuite extends SparkFunSuite with BeforeAndAfter {
private var labeledPods: LABELED_PODS = _
@Mock
- private var driverPodOperations: PodResource[Pod] = _
+ private var driverPodOperations: PodResource = _
@Mock
private var executorBuilder: KubernetesExecutorBuilder = _
@@ -107,7 +116,15 @@ class ExecutorPodsAllocatorSuite extends SparkFunSuite with BeforeAndAfter {
before {
MockitoAnnotations.openMocks(this).close()
when(kubernetesClient.pods()).thenReturn(podOperations)
- when(podOperations.withName(driverPodName)).thenReturn(driverPodOperations)
+ when(podOperations.inNamespace("default")).thenReturn(podsWithNamespace)
+ when(podsWithNamespace.withName(driverPodName)).thenReturn(driverPodOperations)
+ when(podsWithNamespace.resource(any())).thenReturn(podResource)
+ when(podsWithNamespace.withLabel(anyString(), anyString())).thenReturn(labeledPods)
+ when(podsWithNamespace.withLabelIn(anyString(), any())).thenReturn(labeledPods)
+ when(podsWithNamespace.withField(anyString(), anyString())).thenReturn(labeledPods)
+ when(labeledPods.withLabel(anyString(), anyString())).thenReturn(labeledPods)
+ when(labeledPods.withLabelIn(anyString(), any())).thenReturn(labeledPods)
+ when(labeledPods.withField(anyString(), anyString())).thenReturn(labeledPods)
when(driverPodOperations.get).thenReturn(driverPod)
when(driverPodOperations.waitUntilReady(any(), any())).thenReturn(driverPod)
when(executorBuilder.buildFromFeatures(any(classOf[KubernetesExecutorConf]), meq(secMgr),
@@ -119,7 +136,8 @@ class ExecutorPodsAllocatorSuite extends SparkFunSuite with BeforeAndAfter {
when(schedulerBackend.getExecutorIds).thenReturn(Seq.empty)
podsAllocatorUnderTest.start(TEST_SPARK_APP_ID, schedulerBackend)
when(kubernetesClient.persistentVolumeClaims()).thenReturn(persistentVolumeClaims)
- when(persistentVolumeClaims.withLabel(any(), any())).thenReturn(labeledPersistentVolumeClaims)
+ when(persistentVolumeClaims.inNamespace("default")).thenReturn(pvcWithNamespace)
+ when(pvcWithNamespace.withLabel(any(), any())).thenReturn(labeledPersistentVolumeClaims)
when(labeledPersistentVolumeClaims.list()).thenReturn(persistentVolumeClaimList)
when(persistentVolumeClaimList.getItems).thenReturn(Seq.empty[PersistentVolumeClaim].asJava)
}
@@ -170,9 +188,10 @@ class ExecutorPodsAllocatorSuite extends SparkFunSuite with BeforeAndAfter {
podsAllocatorUnderTest.setTotalExpectedExecutors(Map(defaultProfile -> 2, rp -> 3))
assert(podsAllocatorUnderTest.numOutstandingPods.get() == 3)
- verify(podOperations).create(podWithAttachedContainerForId(1, defaultProfile.id))
- verify(podOperations).create(podWithAttachedContainerForId(2, defaultProfile.id))
- verify(podOperations).create(podWithAttachedContainerForId(3, rp.id))
+ verify(podsWithNamespace).resource(podWithAttachedContainerForId(1, defaultProfile.id))
+ verify(podsWithNamespace).resource(podWithAttachedContainerForId(2, defaultProfile.id))
+ verify(podsWithNamespace).resource(podWithAttachedContainerForId(3, rp.id))
+ verify(podResource, times(3)).create()
// Mark executor 2 and 3 as pending, leave 1 as newly created but this does not free up
// any pending pod slot so no new pod is requested
@@ -180,8 +199,8 @@ class ExecutorPodsAllocatorSuite extends SparkFunSuite with BeforeAndAfter {
snapshotsStore.updatePod(pendingExecutor(3, rp.id))
snapshotsStore.notifySubscribers()
assert(podsAllocatorUnderTest.numOutstandingPods.get() == 3)
- verify(podOperations, times(3)).create(any())
- verify(podOperations, never()).delete()
+ verify(podResource, times(3)).create()
+ verify(labeledPods, never()).delete()
// Downscaling for defaultProfile resource ID with 1 executor to make one free slot
// for pendings pods
@@ -189,16 +208,16 @@ class ExecutorPodsAllocatorSuite extends SparkFunSuite with BeforeAndAfter {
podsAllocatorUnderTest.setTotalExpectedExecutors(Map(defaultProfile -> 1, rp -> 3))
snapshotsStore.notifySubscribers()
assert(podsAllocatorUnderTest.numOutstandingPods.get() == 3)
- verify(podOperations).create(podWithAttachedContainerForId(4, rp.id))
- verify(podOperations, times(1)).delete()
+ verify(labeledPods, times(1)).delete()
+ verify(podsWithNamespace).resource(podWithAttachedContainerForId(4, rp.id))
// Make one pod running this way we have one more free slot for pending pods
snapshotsStore.updatePod(runningExecutor(3, rp.id))
snapshotsStore.updatePod(pendingExecutor(4, rp.id))
snapshotsStore.notifySubscribers()
assert(podsAllocatorUnderTest.numOutstandingPods.get() == 3)
- verify(podOperations).create(podWithAttachedContainerForId(5, rp.id))
- verify(podOperations, times(1)).delete()
+ verify(podsWithNamespace).resource(podWithAttachedContainerForId(5, rp.id))
+ verify(labeledPods, times(1)).delete()
}
test("Initially request executors in batches. Do not request another batch if the" +
@@ -206,9 +225,10 @@ class ExecutorPodsAllocatorSuite extends SparkFunSuite with BeforeAndAfter {
podsAllocatorUnderTest.setTotalExpectedExecutors(Map(defaultProfile -> (podAllocationSize + 1)))
assert(podsAllocatorUnderTest.numOutstandingPods.get() == 5)
for (nextId <- 1 to podAllocationSize) {
- verify(podOperations).create(podWithAttachedContainerForId(nextId))
+ verify(podsWithNamespace).resource(podWithAttachedContainerForId(nextId))
}
- verify(podOperations, never()).create(podWithAttachedContainerForId(podAllocationSize + 1))
+ verify(podsWithNamespace, never())
+ .resource(podWithAttachedContainerForId(podAllocationSize + 1))
}
test("Request executors in batches. Allow another batch to be requested if" +
@@ -225,15 +245,17 @@ class ExecutorPodsAllocatorSuite extends SparkFunSuite with BeforeAndAfter {
assert(podsAllocatorUnderTest.invokePrivate(counter).get() === 5)
snapshotsStore.notifySubscribers()
assert(podsAllocatorUnderTest.numOutstandingPods.get() == 1)
- verify(podOperations, never()).create(podWithAttachedContainerForId(podAllocationSize + 1))
+ verify(podsWithNamespace, never())
+ .resource(podWithAttachedContainerForId(podAllocationSize + 1))
+ verify(podResource, times(podAllocationSize)).create()
snapshotsStore.updatePod(runningExecutor(podAllocationSize))
snapshotsStore.notifySubscribers()
assert(podsAllocatorUnderTest.numOutstandingPods.get() == 1)
- verify(podOperations).create(podWithAttachedContainerForId(podAllocationSize + 1))
+ verify(podsWithNamespace).resource(podWithAttachedContainerForId(podAllocationSize + 1))
snapshotsStore.updatePod(runningExecutor(podAllocationSize))
snapshotsStore.notifySubscribers()
assert(podsAllocatorUnderTest.numOutstandingPods.get() == 1)
- verify(podOperations, times(podAllocationSize + 1)).create(any(classOf[Pod]))
+ verify(podResource, times(podAllocationSize + 1)).create()
}
test("When a current batch reaches error states immediately, re-request" +
@@ -248,14 +270,14 @@ class ExecutorPodsAllocatorSuite extends SparkFunSuite with BeforeAndAfter {
snapshotsStore.updatePod(failedPod)
snapshotsStore.notifySubscribers()
assert(podsAllocatorUnderTest.numOutstandingPods.get() == 1)
- verify(podOperations).create(podWithAttachedContainerForId(podAllocationSize + 1))
+ verify(podsWithNamespace).resource(podWithAttachedContainerForId(podAllocationSize + 1))
}
test("Verify stopping deletes the labeled pods") {
- when(podOperations
+ when(podsWithNamespace
.withLabel(SPARK_APP_ID_LABEL, TEST_SPARK_APP_ID))
- .thenReturn(podOperations)
- when(podOperations
+ .thenReturn(labeledPods)
+ when(labeledPods
.withLabel(SPARK_ROLE_LABEL, SPARK_POD_EXECUTOR_ROLE))
.thenReturn(labeledPods)
podsAllocatorUnderTest.stop(TEST_SPARK_APP_ID)
@@ -264,39 +286,39 @@ class ExecutorPodsAllocatorSuite extends SparkFunSuite with BeforeAndAfter {
test("When an executor is requested but the API does not report it in a reasonable time, retry" +
" requesting that executor.") {
- when(podOperations
+ when(podsWithNamespace
.withLabel(SPARK_APP_ID_LABEL, TEST_SPARK_APP_ID))
- .thenReturn(podOperations)
- when(podOperations
+ .thenReturn(labeledPods)
+ when(labeledPods
.withLabel(SPARK_ROLE_LABEL, SPARK_POD_EXECUTOR_ROLE))
- .thenReturn(podOperations)
- when(podOperations
+ .thenReturn(labeledPods)
+ when(labeledPods
.withLabelIn(SPARK_EXECUTOR_ID_LABEL, "1"))
.thenReturn(labeledPods)
podsAllocatorUnderTest.setTotalExpectedExecutors(
Map(defaultProfile -> 1))
assert(podsAllocatorUnderTest.numOutstandingPods.get() == 1)
- verify(podOperations).create(podWithAttachedContainerForId(1))
+ verify(podsWithNamespace).resource(podWithAttachedContainerForId(1))
waitForExecutorPodsClock.setTime(podCreationTimeout + 1)
snapshotsStore.notifySubscribers()
assert(podsAllocatorUnderTest.numOutstandingPods.get() == 1)
verify(labeledPods).delete()
- verify(podOperations).create(podWithAttachedContainerForId(2))
+ verify(podsWithNamespace).resource(podWithAttachedContainerForId(2))
}
test("SPARK-28487: scale up and down on target executor count changes") {
- when(podOperations
+ when(podsWithNamespace
.withField("status.phase", "Pending"))
- .thenReturn(podOperations)
- when(podOperations
+ .thenReturn(labeledPods)
+ when(labeledPods
.withLabel(SPARK_APP_ID_LABEL, TEST_SPARK_APP_ID))
- .thenReturn(podOperations)
- when(podOperations
+ .thenReturn(labeledPods)
+ when(labeledPods
.withLabel(SPARK_ROLE_LABEL, SPARK_POD_EXECUTOR_ROLE))
- .thenReturn(podOperations)
- when(podOperations
+ .thenReturn(labeledPods)
+ when(labeledPods
.withLabelIn(meq(SPARK_EXECUTOR_ID_LABEL), any()))
- .thenReturn(podOperations)
+ .thenReturn(labeledPods)
val startTime = Instant.now.toEpochMilli
waitForExecutorPodsClock.setTime(startTime)
@@ -305,31 +327,31 @@ class ExecutorPodsAllocatorSuite extends SparkFunSuite with BeforeAndAfter {
podsAllocatorUnderTest.setTotalExpectedExecutors(
Map(defaultProfile -> 1))
assert(podsAllocatorUnderTest.numOutstandingPods.get() == 1)
- verify(podOperations).create(podWithAttachedContainerForId(1))
+ verify(podsWithNamespace).resource(podWithAttachedContainerForId(1))
// Mark executor as running, verify that subsequent allocation cycle is a no-op.
snapshotsStore.updatePod(runningExecutor(1))
snapshotsStore.notifySubscribers()
assert(podsAllocatorUnderTest.numOutstandingPods.get() == 0)
- verify(podOperations, times(1)).create(any())
- verify(podOperations, never()).delete()
+ verify(podResource, times(1)).create()
+ verify(labeledPods, never()).delete()
// Request 3 more executors, make sure all are requested.
podsAllocatorUnderTest.setTotalExpectedExecutors(
Map(defaultProfile -> 4))
snapshotsStore.notifySubscribers()
assert(podsAllocatorUnderTest.numOutstandingPods.get() == 3)
- verify(podOperations).create(podWithAttachedContainerForId(2))
- verify(podOperations).create(podWithAttachedContainerForId(3))
- verify(podOperations).create(podWithAttachedContainerForId(4))
+ verify(podsWithNamespace).resource(podWithAttachedContainerForId(2))
+ verify(podsWithNamespace).resource(podWithAttachedContainerForId(3))
+ verify(podsWithNamespace).resource(podWithAttachedContainerForId(4))
// Mark 2 as running, 3 as pending. Allocation cycle should do nothing.
snapshotsStore.updatePod(runningExecutor(2))
snapshotsStore.updatePod(pendingExecutor(3))
snapshotsStore.notifySubscribers()
assert(podsAllocatorUnderTest.numOutstandingPods.get() == 2)
- verify(podOperations, times(4)).create(any())
- verify(podOperations, never()).delete()
+ verify(podResource, times(4)).create()
+ verify(labeledPods, never()).delete()
// Scale down to 1. Pending executors (both acknowledged and not) should be deleted.
waitForExecutorPodsClock.advance(executorIdleTimeout * 2)
@@ -337,9 +359,9 @@ class ExecutorPodsAllocatorSuite extends SparkFunSuite with BeforeAndAfter {
Map(defaultProfile -> 1))
snapshotsStore.notifySubscribers()
assert(podsAllocatorUnderTest.numOutstandingPods.get() == 0)
- verify(podOperations, times(4)).create(any())
- verify(podOperations).withLabelIn(SPARK_EXECUTOR_ID_LABEL, "3", "4")
- verify(podOperations).delete()
+ verify(podResource, times(4)).create()
+ verify(labeledPods).withLabelIn(SPARK_EXECUTOR_ID_LABEL, "3", "4")
+ verify(labeledPods).delete()
assert(podsAllocatorUnderTest.isDeleted("3"))
assert(podsAllocatorUnderTest.isDeleted("4"))
@@ -355,25 +377,25 @@ class ExecutorPodsAllocatorSuite extends SparkFunSuite with BeforeAndAfter {
}
test("SPARK-34334: correctly identify timed out pending pod requests as excess") {
- when(podOperations
+ when(podsWithNamespace
.withField("status.phase", "Pending"))
- .thenReturn(podOperations)
- when(podOperations
+ .thenReturn(labeledPods)
+ when(labeledPods
.withLabel(SPARK_APP_ID_LABEL, TEST_SPARK_APP_ID))
- .thenReturn(podOperations)
- when(podOperations
+ .thenReturn(labeledPods)
+ when(labeledPods
.withLabel(SPARK_ROLE_LABEL, SPARK_POD_EXECUTOR_ROLE))
- .thenReturn(podOperations)
- when(podOperations
+ .thenReturn(labeledPods)
+ when(labeledPods
.withLabelIn(meq(SPARK_EXECUTOR_ID_LABEL), any()))
- .thenReturn(podOperations)
+ .thenReturn(labeledPods)
val startTime = Instant.now.toEpochMilli
waitForExecutorPodsClock.setTime(startTime)
podsAllocatorUnderTest.setTotalExpectedExecutors(Map(defaultProfile -> 1))
- verify(podOperations).create(podWithAttachedContainerForId(1))
- verify(podOperations).create(any())
+ verify(podsWithNamespace).resource(podWithAttachedContainerForId(1))
+ verify(podResource).create()
snapshotsStore.updatePod(pendingExecutor(1))
snapshotsStore.notifySubscribers()
@@ -382,48 +404,48 @@ class ExecutorPodsAllocatorSuite extends SparkFunSuite with BeforeAndAfter {
podsAllocatorUnderTest.setTotalExpectedExecutors(Map(defaultProfile -> 2))
snapshotsStore.notifySubscribers()
- verify(podOperations).create(podWithAttachedContainerForId(2))
+ verify(podsWithNamespace).resource(podWithAttachedContainerForId(2))
podsAllocatorUnderTest.setTotalExpectedExecutors(Map(defaultProfile -> 1))
snapshotsStore.notifySubscribers()
- verify(podOperations, never()).withLabelIn(SPARK_EXECUTOR_ID_LABEL, "1")
- verify(podOperations, never()).delete()
+ verify(labeledPods, never()).withLabelIn(SPARK_EXECUTOR_ID_LABEL, "1")
+ verify(labeledPods, never()).delete()
waitForExecutorPodsClock.advance(executorIdleTimeout)
snapshotsStore.notifySubscribers()
// before SPARK-34334 this verify() call failed as the non-timed out newly created request
// decreased the number of requests taken from timed out pending pod requests
- verify(podOperations).withLabelIn(SPARK_EXECUTOR_ID_LABEL, "1")
- verify(podOperations).delete()
+ verify(labeledPods).withLabelIn(SPARK_EXECUTOR_ID_LABEL, "1")
+ verify(labeledPods).delete()
}
test("SPARK-33099: Respect executor idle timeout configuration") {
- when(podOperations
+ when(podsWithNamespace
.withField("status.phase", "Pending"))
- .thenReturn(podOperations)
- when(podOperations
+ .thenReturn(labeledPods)
+ when(labeledPods
.withLabel(SPARK_APP_ID_LABEL, TEST_SPARK_APP_ID))
- .thenReturn(podOperations)
- when(podOperations
+ .thenReturn(labeledPods)
+ when(labeledPods
.withLabel(SPARK_ROLE_LABEL, SPARK_POD_EXECUTOR_ROLE))
- .thenReturn(podOperations)
- when(podOperations
+ .thenReturn(labeledPods)
+ when(labeledPods
.withLabelIn(meq(SPARK_EXECUTOR_ID_LABEL), any()))
- .thenReturn(podOperations)
+ .thenReturn(labeledPods)
val startTime = Instant.now.toEpochMilli
waitForExecutorPodsClock.setTime(startTime)
podsAllocatorUnderTest.setTotalExpectedExecutors(Map(defaultProfile -> 5))
assert(podsAllocatorUnderTest.numOutstandingPods.get() == 5)
- verify(podOperations).create(podWithAttachedContainerForId(1))
- verify(podOperations).create(podWithAttachedContainerForId(2))
- verify(podOperations).create(podWithAttachedContainerForId(3))
- verify(podOperations).create(podWithAttachedContainerForId(4))
- verify(podOperations).create(podWithAttachedContainerForId(5))
- verify(podOperations, times(5)).create(any())
+ verify(podsWithNamespace).resource(podWithAttachedContainerForId(1))
+ verify(podsWithNamespace).resource(podWithAttachedContainerForId(2))
+ verify(podsWithNamespace).resource(podWithAttachedContainerForId(3))
+ verify(podsWithNamespace).resource(podWithAttachedContainerForId(4))
+ verify(podsWithNamespace).resource(podWithAttachedContainerForId(5))
+ verify(podResource, times(5)).create()
snapshotsStore.updatePod(pendingExecutor(1))
snapshotsStore.updatePod(pendingExecutor(2))
@@ -433,7 +455,7 @@ class ExecutorPodsAllocatorSuite extends SparkFunSuite with BeforeAndAfter {
snapshotsStore.notifySubscribers()
assert(podsAllocatorUnderTest.numOutstandingPods.get() == 5)
verify(podOperations, never()).withLabelIn(SPARK_EXECUTOR_ID_LABEL, "1", "2", "3", "4", "5")
- verify(podOperations, never()).delete()
+ verify(podResource, never()).delete()
// Newly created executors (both acknowledged and not) are cleaned up.
waitForExecutorPodsClock.advance(executorIdleTimeout * 2)
@@ -444,8 +466,8 @@ class ExecutorPodsAllocatorSuite extends SparkFunSuite with BeforeAndAfter {
// though executor 1 is still in pending state and executor 3 and 4 are new request without
// any state reported by kubernetes and all the three are already timed out
assert(podsAllocatorUnderTest.numOutstandingPods.get() == 0)
- verify(podOperations).withLabelIn(SPARK_EXECUTOR_ID_LABEL, "2", "5")
- verify(podOperations).delete()
+ verify(labeledPods).withLabelIn(SPARK_EXECUTOR_ID_LABEL, "2", "5")
+ verify(labeledPods).delete()
}
/**
@@ -483,18 +505,18 @@ class ExecutorPodsAllocatorSuite extends SparkFunSuite with BeforeAndAfter {
* PODs: 8 and 9
*/
test("SPARK-34361: scheduler backend known pods with multiple resource profiles at downscaling") {
- when(podOperations
+ when(podsWithNamespace
.withField("status.phase", "Pending"))
- .thenReturn(podOperations)
- when(podOperations
+ .thenReturn(labeledPods)
+ when(labeledPods
.withLabel(SPARK_APP_ID_LABEL, TEST_SPARK_APP_ID))
- .thenReturn(podOperations)
- when(podOperations
+ .thenReturn(labeledPods)
+ when(labeledPods
.withLabel(SPARK_ROLE_LABEL, SPARK_POD_EXECUTOR_ROLE))
- .thenReturn(podOperations)
- when(podOperations
+ .thenReturn(labeledPods)
+ when(labeledPods
.withLabelIn(meq(SPARK_EXECUTOR_ID_LABEL), any()))
- .thenReturn(podOperations)
+ .thenReturn(labeledPods)
val startTime = Instant.now.toEpochMilli
waitForExecutorPodsClock.setTime(startTime)
@@ -510,20 +532,20 @@ class ExecutorPodsAllocatorSuite extends SparkFunSuite with BeforeAndAfter {
// 0) request 3 PODs for the default and 4 PODs for the other resource profile
podsAllocatorUnderTest.setTotalExpectedExecutors(Map(defaultProfile -> 3, rp -> 4))
assert(podsAllocatorUnderTest.numOutstandingPods.get() == 7)
- verify(podOperations).create(podWithAttachedContainerForId(1, defaultProfile.id))
- verify(podOperations).create(podWithAttachedContainerForId(2, defaultProfile.id))
- verify(podOperations).create(podWithAttachedContainerForId(3, defaultProfile.id))
- verify(podOperations).create(podWithAttachedContainerForId(4, rp.id))
- verify(podOperations).create(podWithAttachedContainerForId(5, rp.id))
- verify(podOperations).create(podWithAttachedContainerForId(6, rp.id))
- verify(podOperations).create(podWithAttachedContainerForId(7, rp.id))
+ verify(podsWithNamespace).resource(podWithAttachedContainerForId(1, defaultProfile.id))
+ verify(podsWithNamespace).resource(podWithAttachedContainerForId(2, defaultProfile.id))
+ verify(podsWithNamespace).resource(podWithAttachedContainerForId(3, defaultProfile.id))
+ verify(podsWithNamespace).resource(podWithAttachedContainerForId(4, rp.id))
+ verify(podsWithNamespace).resource(podWithAttachedContainerForId(5, rp.id))
+ verify(podsWithNamespace).resource(podWithAttachedContainerForId(6, rp.id))
+ verify(podsWithNamespace).resource(podWithAttachedContainerForId(7, rp.id))
// 1) make 1 POD known by the scheduler backend for each resource profile
when(schedulerBackend.getExecutorIds).thenReturn(Seq("1", "4"))
snapshotsStore.notifySubscribers()
assert(podsAllocatorUnderTest.numOutstandingPods.get() == 5,
"scheduler backend known PODs are not outstanding")
- verify(podOperations, times(7)).create(any())
+ verify(podResource, times(7)).create()
// 2) make 1 extra POD known by the scheduler backend for each resource profile
// and make some to pending
@@ -534,15 +556,15 @@ class ExecutorPodsAllocatorSuite extends SparkFunSuite with BeforeAndAfter {
snapshotsStore.updatePod(pendingExecutor(6, rp.id))
snapshotsStore.notifySubscribers()
assert(podsAllocatorUnderTest.numOutstandingPods.get() == 3)
- verify(podOperations, times(7)).create(any())
+ verify(podResource, times(7)).create()
// 3) downscale to 1 POD for default and 1 POD for the other resource profile
waitForExecutorPodsClock.advance(executorIdleTimeout * 2)
podsAllocatorUnderTest.setTotalExpectedExecutors(Map(defaultProfile -> 1, rp -> 1))
snapshotsStore.notifySubscribers()
assert(podsAllocatorUnderTest.numOutstandingPods.get() == 0)
- verify(podOperations, times(7)).create(any())
- verify(podOperations, times(2)).delete()
+ verify(podResource, times(7)).create()
+ verify(labeledPods, times(2)).delete()
assert(podsAllocatorUnderTest.isDeleted("3"))
assert(podsAllocatorUnderTest.isDeleted("6"))
assert(podsAllocatorUnderTest.isDeleted("7"))
@@ -551,32 +573,32 @@ class ExecutorPodsAllocatorSuite extends SparkFunSuite with BeforeAndAfter {
// 2 PODs known by the scheduler backend there must be no new POD requested to be created
podsAllocatorUnderTest.setTotalExpectedExecutors(Map(defaultProfile -> 2, rp -> 2))
snapshotsStore.notifySubscribers()
- verify(podOperations, times(7)).create(any())
+ verify(podResource, times(7)).create()
assert(podsAllocatorUnderTest.numOutstandingPods.get() == 0)
- verify(podOperations, times(7)).create(any())
+ verify(podResource, times(7)).create()
// 5) requesting 1 more executor for each resource
podsAllocatorUnderTest.setTotalExpectedExecutors(Map(defaultProfile -> 3, rp -> 3))
snapshotsStore.notifySubscribers()
assert(podsAllocatorUnderTest.numOutstandingPods.get() == 2)
- verify(podOperations, times(9)).create(any())
- verify(podOperations).create(podWithAttachedContainerForId(8, defaultProfile.id))
- verify(podOperations).create(podWithAttachedContainerForId(9, rp.id))
+ verify(podResource, times(9)).create()
+ verify(podsWithNamespace).resource(podWithAttachedContainerForId(8, defaultProfile.id))
+ verify(podsWithNamespace).resource(podWithAttachedContainerForId(9, rp.id))
}
test("SPARK-33288: multiple resource profiles") {
- when(podOperations
+ when(podsWithNamespace
.withField("status.phase", "Pending"))
- .thenReturn(podOperations)
- when(podOperations
+ .thenReturn(labeledPods)
+ when(labeledPods
.withLabel(SPARK_APP_ID_LABEL, TEST_SPARK_APP_ID))
- .thenReturn(podOperations)
- when(podOperations
+ .thenReturn(labeledPods)
+ when(labeledPods
.withLabel(SPARK_ROLE_LABEL, SPARK_POD_EXECUTOR_ROLE))
- .thenReturn(podOperations)
- when(podOperations
+ .thenReturn(labeledPods)
+ when(labeledPods
.withLabelIn(meq(SPARK_EXECUTOR_ID_LABEL), any()))
- .thenReturn(podOperations)
+ .thenReturn(labeledPods)
val startTime = Instant.now.toEpochMilli
waitForExecutorPodsClock.setTime(startTime)
@@ -593,9 +615,9 @@ class ExecutorPodsAllocatorSuite extends SparkFunSuite with BeforeAndAfter {
// make sure it's requested, even with an empty initial snapshot.
podsAllocatorUnderTest.setTotalExpectedExecutors(Map(defaultProfile -> 1, rp -> 2))
assert(podsAllocatorUnderTest.numOutstandingPods.get() == 3)
- verify(podOperations).create(podWithAttachedContainerForId(1, defaultProfile.id))
- verify(podOperations).create(podWithAttachedContainerForId(2, rp.id))
- verify(podOperations).create(podWithAttachedContainerForId(3, rp.id))
+ verify(podsWithNamespace).resource(podWithAttachedContainerForId(1, defaultProfile.id))
+ verify(podsWithNamespace).resource(podWithAttachedContainerForId(2, rp.id))
+ verify(podsWithNamespace).resource(podWithAttachedContainerForId(3, rp.id))
// Mark executor as running, verify that subsequent allocation cycle is a no-op.
snapshotsStore.updatePod(runningExecutor(1, defaultProfile.id))
@@ -603,18 +625,18 @@ class ExecutorPodsAllocatorSuite extends SparkFunSuite with BeforeAndAfter {
snapshotsStore.updatePod(runningExecutor(3, rp.id))
snapshotsStore.notifySubscribers()
assert(podsAllocatorUnderTest.numOutstandingPods.get() == 0)
- verify(podOperations, times(3)).create(any())
- verify(podOperations, never()).delete()
+ verify(podResource, times(3)).create()
+ verify(podResource, never()).delete()
// Request 3 more executors for default profile and 1 more for other profile,
// make sure all are requested.
podsAllocatorUnderTest.setTotalExpectedExecutors(Map(defaultProfile -> 4, rp -> 3))
snapshotsStore.notifySubscribers()
assert(podsAllocatorUnderTest.numOutstandingPods.get() == 4)
- verify(podOperations).create(podWithAttachedContainerForId(4, defaultProfile.id))
- verify(podOperations).create(podWithAttachedContainerForId(5, defaultProfile.id))
- verify(podOperations).create(podWithAttachedContainerForId(6, defaultProfile.id))
- verify(podOperations).create(podWithAttachedContainerForId(7, rp.id))
+ verify(podsWithNamespace).resource(podWithAttachedContainerForId(4, defaultProfile.id))
+ verify(podsWithNamespace).resource(podWithAttachedContainerForId(5, defaultProfile.id))
+ verify(podsWithNamespace).resource(podWithAttachedContainerForId(6, defaultProfile.id))
+ verify(podsWithNamespace).resource(podWithAttachedContainerForId(7, rp.id))
// Mark 4 as running, 5 and 7 as pending. Allocation cycle should do nothing.
snapshotsStore.updatePod(runningExecutor(4, defaultProfile.id))
@@ -622,8 +644,8 @@ class ExecutorPodsAllocatorSuite extends SparkFunSuite with BeforeAndAfter {
snapshotsStore.updatePod(pendingExecutor(7, rp.id))
snapshotsStore.notifySubscribers()
assert(podsAllocatorUnderTest.numOutstandingPods.get() == 3)
- verify(podOperations, times(7)).create(any())
- verify(podOperations, never()).delete()
+ verify(podResource, times(7)).create()
+ verify(podResource, never()).delete()
// Scale down to 1 for both resource profiles. Pending executors
// (both acknowledged and not) should be deleted.
@@ -631,10 +653,10 @@ class ExecutorPodsAllocatorSuite extends SparkFunSuite with BeforeAndAfter {
podsAllocatorUnderTest.setTotalExpectedExecutors(Map(defaultProfile -> 1, rp -> 1))
snapshotsStore.notifySubscribers()
assert(podsAllocatorUnderTest.numOutstandingPods.get() == 0)
- verify(podOperations, times(7)).create(any())
- verify(podOperations).withLabelIn(SPARK_EXECUTOR_ID_LABEL, "5", "6")
- verify(podOperations).withLabelIn(SPARK_EXECUTOR_ID_LABEL, "7")
- verify(podOperations, times(2)).delete()
+ verify(podResource, times(7)).create()
+ verify(labeledPods).withLabelIn(SPARK_EXECUTOR_ID_LABEL, "5", "6")
+ verify(labeledPods).withLabelIn(SPARK_EXECUTOR_ID_LABEL, "7")
+ verify(labeledPods, times(2)).delete()
assert(podsAllocatorUnderTest.isDeleted("5"))
assert(podsAllocatorUnderTest.isDeleted("6"))
assert(podsAllocatorUnderTest.isDeleted("7"))
@@ -653,27 +675,27 @@ class ExecutorPodsAllocatorSuite extends SparkFunSuite with BeforeAndAfter {
}
test("SPARK-33262: pod allocator does not stall with pending pods") {
- when(podOperations
+ when(podsWithNamespace
.withLabel(SPARK_APP_ID_LABEL, TEST_SPARK_APP_ID))
- .thenReturn(podOperations)
- when(podOperations
+ .thenReturn(labeledPods)
+ when(labeledPods
.withLabel(SPARK_ROLE_LABEL, SPARK_POD_EXECUTOR_ROLE))
- .thenReturn(podOperations)
- when(podOperations
+ .thenReturn(labeledPods)
+ when(labeledPods
.withLabelIn(SPARK_EXECUTOR_ID_LABEL, "1"))
.thenReturn(labeledPods)
- when(podOperations
+ when(labeledPods
.withLabelIn(SPARK_EXECUTOR_ID_LABEL, "2", "3", "4", "5", "6"))
- .thenReturn(podOperations)
+ .thenReturn(labeledPods)
podsAllocatorUnderTest.setTotalExpectedExecutors(Map(defaultProfile -> 6))
assert(podsAllocatorUnderTest.numOutstandingPods.get() == 5)
// Initial request of pods
- verify(podOperations).create(podWithAttachedContainerForId(1))
- verify(podOperations).create(podWithAttachedContainerForId(2))
- verify(podOperations).create(podWithAttachedContainerForId(3))
- verify(podOperations).create(podWithAttachedContainerForId(4))
- verify(podOperations).create(podWithAttachedContainerForId(5))
+ verify(podsWithNamespace).resource(podWithAttachedContainerForId(1))
+ verify(podsWithNamespace).resource(podWithAttachedContainerForId(2))
+ verify(podsWithNamespace).resource(podWithAttachedContainerForId(3))
+ verify(podsWithNamespace).resource(podWithAttachedContainerForId(4))
+ verify(podsWithNamespace).resource(podWithAttachedContainerForId(5))
// 4 come up, 1 pending
snapshotsStore.updatePod(pendingExecutor(1))
snapshotsStore.updatePod(runningExecutor(2))
@@ -685,7 +707,7 @@ class ExecutorPodsAllocatorSuite extends SparkFunSuite with BeforeAndAfter {
snapshotsStore.notifySubscribers()
assert(podsAllocatorUnderTest.numOutstandingPods.get() == 2)
// We request pod 6
- verify(podOperations).create(podWithAttachedContainerForId(6))
+ verify(podsWithNamespace).resource(podWithAttachedContainerForId(6))
}
test("SPARK-35416: Support PersistentVolumeClaim Reuse") {
@@ -715,18 +737,18 @@ class ExecutorPodsAllocatorSuite extends SparkFunSuite with BeforeAndAfter {
kubernetesClient, snapshotsStore, waitForExecutorPodsClock)
podsAllocatorUnderTest.start(TEST_SPARK_APP_ID, schedulerBackend)
- when(podOperations
+ when(podsWithNamespace
.withField("status.phase", "Pending"))
- .thenReturn(podOperations)
- when(podOperations
+ .thenReturn(labeledPods)
+ when(labeledPods
.withLabel(SPARK_APP_ID_LABEL, TEST_SPARK_APP_ID))
- .thenReturn(podOperations)
- when(podOperations
+ .thenReturn(labeledPods)
+ when(labeledPods
.withLabel(SPARK_ROLE_LABEL, SPARK_POD_EXECUTOR_ROLE))
- .thenReturn(podOperations)
- when(podOperations
+ .thenReturn(labeledPods)
+ when(labeledPods
.withLabelIn(meq(SPARK_EXECUTOR_ID_LABEL), any()))
- .thenReturn(podOperations)
+ .thenReturn(labeledPods)
val startTime = Instant.now.toEpochMilli
waitForExecutorPodsClock.setTime(startTime)
@@ -734,28 +756,27 @@ class ExecutorPodsAllocatorSuite extends SparkFunSuite with BeforeAndAfter {
// Target 1 executor, make sure it's requested, even with an empty initial snapshot.
podsAllocatorUnderTest.setTotalExpectedExecutors(Map(defaultProfile -> 1))
assert(podsAllocatorUnderTest.numOutstandingPods.get() == 1)
- verify(podOperations).create(podWithAttachedContainerForIdAndVolume(1))
+ verify(podsWithNamespace).resource(podWithAttachedContainerForIdAndVolume(1))
// Mark executor as running, verify that subsequent allocation cycle is a no-op.
snapshotsStore.updatePod(runningExecutor(1))
snapshotsStore.notifySubscribers()
assert(podsAllocatorUnderTest.numOutstandingPods.get() == 0)
- verify(podOperations, times(1)).create(any())
- verify(podOperations, never()).delete()
+ verify(podResource, times(1)).create()
+ verify(podResource, never()).delete()
// Request a new executor, make sure it's using reused PVC
podsAllocatorUnderTest.setTotalExpectedExecutors(Map(defaultProfile -> 2))
snapshotsStore.notifySubscribers()
assert(podsAllocatorUnderTest.numOutstandingPods.get() == 1)
- verify(podOperations).create(podWithAttachedContainerForIdAndVolume(2))
- verify(persistentVolumeClaims, never()).create(any())
+ verify(podsWithNamespace).resource(podWithAttachedContainerForIdAndVolume(2))
+ verify(pvcWithNamespace, never()).resource(any())
}
test("print the pod name instead of Some(name) if pod is absent") {
val nonexistentPod = "i-do-not-exist"
val conf = new SparkConf().set(KUBERNETES_DRIVER_POD_NAME, nonexistentPod)
- when(kubernetesClient.pods()).thenReturn(podOperations)
- when(podOperations.withName(nonexistentPod)).thenReturn(driverPodOperations)
+ when(podsWithNamespace.withName(nonexistentPod)).thenReturn(driverPodOperations)
when(driverPodOperations.get()).thenReturn(null)
val e = intercept[SparkException](new ExecutorPodsAllocator(
conf, secMgr, executorBuilder, kubernetesClient, snapshotsStore, waitForExecutorPodsClock))
diff --git a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsLifecycleManagerSuite.scala b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsLifecycleManagerSuite.scala
index e3ec53adef6ab..92d692c829ae1 100644
--- a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsLifecycleManagerSuite.scala
+++ b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsLifecycleManagerSuite.scala
@@ -16,11 +16,14 @@
*/
package org.apache.spark.scheduler.cluster.k8s
+import java.util.function.UnaryOperator
+
import io.fabric8.kubernetes.api.model.Pod
import io.fabric8.kubernetes.client.KubernetesClient
import io.fabric8.kubernetes.client.dsl.PodResource
-import org.mockito.{ArgumentCaptor, Mock, MockitoAnnotations}
+import org.mockito.{Mock, MockitoAnnotations}
import org.mockito.ArgumentMatchers.any
+import org.mockito.ArgumentMatchers.anyString
import org.mockito.Mockito.{mock, never, times, verify, when}
import org.mockito.invocation.InvocationOnMock
import org.mockito.stubbing.Answer
@@ -37,7 +40,7 @@ import org.apache.spark.scheduler.cluster.k8s.ExecutorLifecycleTestUtils._
class ExecutorPodsLifecycleManagerSuite extends SparkFunSuite with BeforeAndAfter {
- private var namedExecutorPods: mutable.Map[String, PodResource[Pod]] = _
+ private var namedExecutorPods: mutable.Map[String, PodResource] = _
@Mock
private var kubernetesClient: KubernetesClient = _
@@ -45,6 +48,9 @@ class ExecutorPodsLifecycleManagerSuite extends SparkFunSuite with BeforeAndAfte
@Mock
private var podOperations: PODS = _
+ @Mock
+ private var podsWithNamespace: PODS_WITH_NAMESPACE = _
+
@Mock
private var schedulerBackend: KubernetesClusterSchedulerBackend = _
@@ -54,10 +60,11 @@ class ExecutorPodsLifecycleManagerSuite extends SparkFunSuite with BeforeAndAfte
before {
MockitoAnnotations.openMocks(this).close()
snapshotsStore = new DeterministicExecutorPodsSnapshotsStore()
- namedExecutorPods = mutable.Map.empty[String, PodResource[Pod]]
+ namedExecutorPods = mutable.Map.empty[String, PodResource]
when(schedulerBackend.getExecutorsWithRegistrationTs()).thenReturn(Map.empty[String, Long])
when(kubernetesClient.pods()).thenReturn(podOperations)
- when(podOperations.withName(any(classOf[String]))).thenAnswer(namedPodsAnswer())
+ when(podOperations.inNamespace(anyString())).thenReturn(podsWithNamespace)
+ when(podsWithNamespace.withName(any(classOf[String]))).thenAnswer(namedPodsAnswer())
eventHandlerUnderTest = new ExecutorPodsLifecycleManager(
new SparkConf(),
kubernetesClient,
@@ -109,6 +116,12 @@ class ExecutorPodsLifecycleManagerSuite extends SparkFunSuite with BeforeAndAfte
verify(schedulerBackend).doRemoveExecutor("1", expectedLossReason)
}
+ test("SPARK-40458: test executor inactivation function") {
+ val failedPod = failedExecutorWithoutDeletion(1)
+ val inactivated = ExecutorPodsLifecycleManager.executorInactivationFn(failedPod)
+ assert(inactivated.getMetadata().getLabels().get(SPARK_EXECUTOR_INACTIVE_LABEL) === "true")
+ }
+
test("Keep executor pods in k8s if configured.") {
val failedPod = failedExecutorWithoutDeletion(1)
eventHandlerUnderTest.conf.set(Config.KUBERNETES_DELETE_EXECUTORS, false)
@@ -118,12 +131,8 @@ class ExecutorPodsLifecycleManagerSuite extends SparkFunSuite with BeforeAndAfte
val expectedLossReason = ExecutorExited(1, exitCausedByApp = true, msg)
verify(schedulerBackend).doRemoveExecutor("1", expectedLossReason)
verify(namedExecutorPods(failedPod.getMetadata.getName), never()).delete()
-
- val podCaptor = ArgumentCaptor.forClass(classOf[Pod])
- verify(namedExecutorPods(failedPod.getMetadata.getName)).patch(podCaptor.capture())
-
- val pod = podCaptor.getValue()
- assert(pod.getMetadata().getLabels().get(SPARK_EXECUTOR_INACTIVE_LABEL) === "true")
+ verify(namedExecutorPods(failedPod.getMetadata.getName))
+ .edit(any[UnaryOperator[Pod]]())
}
private def exitReasonMessage(execId: Int, failedPod: Pod, exitCode: Int): String = {
@@ -146,10 +155,10 @@ class ExecutorPodsLifecycleManagerSuite extends SparkFunSuite with BeforeAndAfte
""".stripMargin
}
- private def namedPodsAnswer(): Answer[PodResource[Pod]] =
+ private def namedPodsAnswer(): Answer[PodResource] =
(invocation: InvocationOnMock) => {
val podName: String = invocation.getArgument(0)
namedExecutorPods.getOrElseUpdate(
- podName, mock(classOf[PodResource[Pod]]))
+ podName, mock(classOf[PodResource]))
}
}
diff --git a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsWatchSnapshotSourceSuite.scala b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsWatchSnapshotSourceSuite.scala
index 8209bee7a02b7..61080268cde60 100644
--- a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsWatchSnapshotSourceSuite.scala
+++ b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsWatchSnapshotSourceSuite.scala
@@ -41,6 +41,9 @@ class ExecutorPodsWatchSnapshotSourceSuite extends SparkFunSuite with BeforeAndA
@Mock
private var podOperations: PODS = _
+ @Mock
+ private var podsWithNamespace: PODS_WITH_NAMESPACE = _
+
@Mock
private var appIdLabeledPods: LABELED_PODS = _
@@ -58,7 +61,8 @@ class ExecutorPodsWatchSnapshotSourceSuite extends SparkFunSuite with BeforeAndA
MockitoAnnotations.openMocks(this).close()
watch = ArgumentCaptor.forClass(classOf[Watcher[Pod]])
when(kubernetesClient.pods()).thenReturn(podOperations)
- when(podOperations.withLabel(SPARK_APP_ID_LABEL, TEST_SPARK_APP_ID))
+ when(podOperations.inNamespace("default")).thenReturn(podsWithNamespace)
+ when(podsWithNamespace.withLabel(SPARK_APP_ID_LABEL, TEST_SPARK_APP_ID))
.thenReturn(appIdLabeledPods)
when(appIdLabeledPods.withLabel(SPARK_ROLE_LABEL, SPARK_POD_EXECUTOR_ROLE))
.thenReturn(executorRoleLabeledPods)
diff --git a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesClusterSchedulerBackendSuite.scala b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesClusterSchedulerBackendSuite.scala
index c3af83118f18e..bb5e93c92acf0 100644
--- a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesClusterSchedulerBackendSuite.scala
+++ b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesClusterSchedulerBackendSuite.scala
@@ -19,9 +19,9 @@ package org.apache.spark.scheduler.cluster.k8s
import java.util.Arrays
import java.util.concurrent.TimeUnit
-import io.fabric8.kubernetes.api.model.{ObjectMeta, Pod, PodList}
+import io.fabric8.kubernetes.api.model.{ConfigMap, Pod, PodList}
import io.fabric8.kubernetes.client.KubernetesClient
-import io.fabric8.kubernetes.client.dsl.{NonNamespaceOperation, PodResource}
+import io.fabric8.kubernetes.client.dsl.PodResource
import org.jmock.lib.concurrent.DeterministicScheduler
import org.mockito.{ArgumentCaptor, Mock, MockitoAnnotations}
import org.mockito.ArgumentMatchers.{any, eq => mockitoEq}
@@ -66,12 +66,21 @@ class KubernetesClusterSchedulerBackendSuite extends SparkFunSuite with BeforeAn
@Mock
private var podOperations: PODS = _
+ @Mock
+ private var podsWithNamespace: PODS_WITH_NAMESPACE = _
+
@Mock
private var labeledPods: LABELED_PODS = _
@Mock
private var configMapsOperations: CONFIG_MAPS = _
+ @Mock
+ private var configMapsWithNamespace: CONFIG_MAPS_WITH_NAMESPACE = _
+
+ @Mock
+ private var configMapResource: CONFIG_MAPS_RESOURCE = _
+
@Mock
private var labeledConfigMaps: LABELED_CONFIG_MAPS = _
@@ -117,7 +126,10 @@ class KubernetesClusterSchedulerBackendSuite extends SparkFunSuite with BeforeAn
driverEndpoint.capture()))
.thenReturn(driverEndpointRef)
when(kubernetesClient.pods()).thenReturn(podOperations)
+ when(podOperations.inNamespace("default")).thenReturn(podsWithNamespace)
when(kubernetesClient.configMaps()).thenReturn(configMapsOperations)
+ when(configMapsOperations.inNamespace("default")).thenReturn(configMapsWithNamespace)
+ when(configMapsWithNamespace.resource(any[ConfigMap]())).thenReturn(configMapResource)
when(podAllocator.driverPod).thenReturn(None)
schedulerBackendUnderTest = new KubernetesClusterSchedulerBackend(
taskScheduler,
@@ -142,13 +154,13 @@ class KubernetesClusterSchedulerBackendSuite extends SparkFunSuite with BeforeAn
verify(lifecycleEventHandler).start(schedulerBackendUnderTest)
verify(watchEvents).start(TEST_SPARK_APP_ID)
verify(pollEvents).start(TEST_SPARK_APP_ID)
- verify(configMapsOperations).create(any())
+ verify(configMapResource).create()
}
test("Stop all components") {
- when(podOperations.withLabel(SPARK_APP_ID_LABEL, TEST_SPARK_APP_ID)).thenReturn(labeledPods)
+ when(podsWithNamespace.withLabel(SPARK_APP_ID_LABEL, TEST_SPARK_APP_ID)).thenReturn(labeledPods)
when(labeledPods.withLabel(SPARK_ROLE_LABEL, SPARK_POD_EXECUTOR_ROLE)).thenReturn(labeledPods)
- when(configMapsOperations.withLabel(SPARK_APP_ID_LABEL, TEST_SPARK_APP_ID))
+ when(configMapsWithNamespace.withLabel(SPARK_APP_ID_LABEL, TEST_SPARK_APP_ID))
.thenReturn(labeledConfigMaps)
when(labeledConfigMaps.withLabel(SPARK_ROLE_LABEL, SPARK_POD_EXECUTOR_ROLE))
.thenReturn(labeledConfigMaps)
@@ -177,36 +189,14 @@ class KubernetesClusterSchedulerBackendSuite extends SparkFunSuite with BeforeAn
test("Kill executors") {
schedulerBackendUnderTest.start()
- val operation = mock(classOf[NonNamespaceOperation[
- Pod, PodList, PodResource[Pod]]])
-
- when(podOperations.inNamespace(any())).thenReturn(operation)
- when(podOperations.withField(any(), any())).thenReturn(labeledPods)
- when(podOperations.withLabel(SPARK_APP_ID_LABEL, TEST_SPARK_APP_ID)).thenReturn(labeledPods)
+ when(podsWithNamespace.withField(any(), any())).thenReturn(labeledPods)
+ when(podsWithNamespace.withLabel(SPARK_APP_ID_LABEL, TEST_SPARK_APP_ID)).thenReturn(labeledPods)
when(labeledPods.withLabel(SPARK_APP_ID_LABEL, TEST_SPARK_APP_ID)).thenReturn(labeledPods)
when(labeledPods.withLabel(SPARK_ROLE_LABEL, SPARK_POD_EXECUTOR_ROLE)).thenReturn(labeledPods)
when(labeledPods.withLabelIn(SPARK_EXECUTOR_ID_LABEL, "1", "2")).thenReturn(labeledPods)
-
- val pod1 = mock(classOf[Pod])
- val pod1Metadata = mock(classOf[ObjectMeta])
- when(pod1Metadata.getNamespace).thenReturn("coffeeIsLife")
- when(pod1Metadata.getName).thenReturn("pod1")
- when(pod1.getMetadata).thenReturn(pod1Metadata)
-
- val pod2 = mock(classOf[Pod])
- val pod2Metadata = mock(classOf[ObjectMeta])
- when(pod2Metadata.getNamespace).thenReturn("coffeeIsLife")
- when(pod2Metadata.getName).thenReturn("pod2")
- when(pod2.getMetadata).thenReturn(pod2Metadata)
-
- val pod1op = mock(classOf[PodResource[Pod]])
- val pod2op = mock(classOf[PodResource[Pod]])
- when(operation.withName("pod1")).thenReturn(pod1op)
- when(operation.withName("pod2")).thenReturn(pod2op)
-
- val podList = mock(classOf[PodList])
- when(labeledPods.list()).thenReturn(podList)
- when(podList.getItems()).thenReturn(Arrays.asList[Pod]())
+ val pod1op = mock(classOf[PodResource])
+ val pod2op = mock(classOf[PodResource])
+ when(labeledPods.resources()).thenReturn(Arrays.asList[PodResource]().stream)
schedulerExecutorService.tick(sparkConf.get(KUBERNETES_DYN_ALLOC_KILL_GRACE_PERIOD) * 2,
TimeUnit.MILLISECONDS)
verify(labeledPods, never()).delete()
@@ -227,7 +217,13 @@ class KubernetesClusterSchedulerBackendSuite extends SparkFunSuite with BeforeAn
verify(pod2op, never()).edit(any(
classOf[java.util.function.UnaryOperator[io.fabric8.kubernetes.api.model.Pod]]))
- when(podList.getItems()).thenReturn(Arrays.asList(pod1))
+ when(labeledPods.resources()).thenReturn(Arrays.asList(pod1op).stream)
+ val podList = mock(classOf[PodList])
+ when(labeledPods.list()).thenReturn(podList)
+ val pod1 = mock(classOf[Pod])
+ val pod2 = mock(classOf[Pod])
+ when(podList.getItems).thenReturn(Arrays.asList(pod1, pod2))
+
schedulerBackendUnderTest.doKillExecutors(Seq("1", "2"))
verify(labeledPods, never()).delete()
schedulerExecutorService.runUntilIdle()
diff --git a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/StatefulSetAllocatorSuite.scala b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/StatefulSetAllocatorSuite.scala
index 748f509e01303..f74d2c9feee04 100644
--- a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/StatefulSetAllocatorSuite.scala
+++ b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/StatefulSetAllocatorSuite.scala
@@ -67,18 +67,25 @@ class StatefulSetAllocatorSuite extends SparkFunSuite with BeforeAndAfter {
private var appOperations: AppsAPIGroupDSL = _
@Mock
- private var statefulSetOperations: MixedOperation[
- apps.StatefulSet, apps.StatefulSetList, RollableScalableResource[apps.StatefulSet]] = _
+ private var statefulSetOperations: STATEFUL_SETS = _
@Mock
- private var editableSet: RollableScalableResource[apps.StatefulSet] = _
+ private var statefulSetNamespaced: STATEFUL_SETS_NAMESPACED = _
+
+ @Mock
+ private var editableSet: STATEFUL_SET_RES = _
@Mock
private var podOperations: PODS = _
+ @Mock
+ private var podsWithNamespace: PODS_WITH_NAMESPACE = _
+
+ @Mock
+ private var podResource: PodResource = _
@Mock
- private var driverPodOperations: PodResource[Pod] = _
+ private var driverPodOperations: PodResource = _
private var podsAllocatorUnderTest: StatefulSetPodsAllocator = _
@@ -102,10 +109,14 @@ class StatefulSetAllocatorSuite extends SparkFunSuite with BeforeAndAfter {
before {
MockitoAnnotations.openMocks(this).close()
when(kubernetesClient.pods()).thenReturn(podOperations)
+ when(podOperations.inNamespace("default")).thenReturn(podsWithNamespace)
when(kubernetesClient.apps()).thenReturn(appOperations)
when(appOperations.statefulSets()).thenReturn(statefulSetOperations)
- when(statefulSetOperations.withName(any())).thenReturn(editableSet)
- when(podOperations.withName(driverPodName)).thenReturn(driverPodOperations)
+ when(statefulSetOperations.inNamespace("default")).thenReturn(statefulSetNamespaced)
+ when(statefulSetNamespaced.resource(any())).thenReturn(editableSet)
+ when(statefulSetNamespaced.withName(any())).thenReturn(editableSet)
+ when(podsWithNamespace.withName(driverPodName)).thenReturn(driverPodOperations)
+ when(podsWithNamespace.resource(any())).thenReturn(podResource)
when(driverPodOperations.get).thenReturn(driverPod)
when(driverPodOperations.waitUntilReady(any(), any())).thenReturn(driverPod)
when(executorBuilder.buildFromFeatures(any(classOf[KubernetesExecutorConf]), meq(secMgr),
@@ -128,7 +139,8 @@ class StatefulSetAllocatorSuite extends SparkFunSuite with BeforeAndAfter {
Map(defaultProfile -> (10),
immrprof -> (420)))
val captor = ArgumentCaptor.forClass(classOf[StatefulSet])
- verify(statefulSetOperations, times(2)).create(any())
+ verify(statefulSetNamespaced, times(2)).resource(any())
+ verify(editableSet, times(2)).create()
podsAllocatorUnderTest.stop(appId)
verify(editableSet, times(2)).delete()
}
@@ -137,7 +149,8 @@ class StatefulSetAllocatorSuite extends SparkFunSuite with BeforeAndAfter {
podsAllocatorUnderTest.setTotalExpectedExecutors(
Map(defaultProfile -> (10)))
val captor = ArgumentCaptor.forClass(classOf[StatefulSet])
- verify(statefulSetOperations, times(1)).create(captor.capture())
+ verify(statefulSetNamespaced, times(1)).resource(captor.capture())
+ verify(editableSet, times(1)).create()
val set = captor.getValue()
val setName = set.getMetadata().getName()
val namespace = set.getMetadata().getNamespace()
@@ -145,7 +158,7 @@ class StatefulSetAllocatorSuite extends SparkFunSuite with BeforeAndAfter {
val spec = set.getSpec()
assert(spec.getReplicas() === 10)
assert(spec.getPodManagementPolicy() === "Parallel")
- verify(podOperations, never()).create(any())
+ verify(podResource, never()).create()
podsAllocatorUnderTest.setTotalExpectedExecutors(
Map(defaultProfile -> (20)))
verify(editableSet, times(1)).scale(any(), any())
diff --git a/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/ClientModeTestsSuite.scala b/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/ClientModeTestsSuite.scala
index 1a9724afe30c9..93200ea1297d3 100644
--- a/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/ClientModeTestsSuite.scala
+++ b/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/ClientModeTestsSuite.scala
@@ -122,7 +122,8 @@ private[spark] trait ClientModeTestsSuite { k8sSuite: KubernetesSuite =>
.kubernetesClient
.services()
.inNamespace(kubernetesTestComponents.namespace)
- .delete(driverService)
+ .resource(driverService)
+ .delete()
// Delete all executors, since the test explicitly asks them not to be deleted by the app.
kubernetesTestComponents
.kubernetesClient
diff --git a/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/SparkReadinessWatcher.scala b/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/SparkReadinessWatcher.scala
index 6a3dfd5bf791d..efda414318134 100644
--- a/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/SparkReadinessWatcher.scala
+++ b/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/SparkReadinessWatcher.scala
@@ -22,7 +22,7 @@ import com.google.common.util.concurrent.SettableFuture
import io.fabric8.kubernetes.api.model.HasMetadata
import io.fabric8.kubernetes.client.{Watcher, WatcherException}
import io.fabric8.kubernetes.client.Watcher.Action
-import io.fabric8.kubernetes.client.internal.readiness.Readiness
+import io.fabric8.kubernetes.client.readiness.Readiness
private[spark] class SparkReadinessWatcher[T <: HasMetadata] extends Watcher[T] {