diff --git a/dev/deps/spark-deps-hadoop-2-hive-2.3 b/dev/deps/spark-deps-hadoop-2-hive-2.3 index 3d21dfd3a68a9..8a9621e5364be 100644 --- a/dev/deps/spark-deps-hadoop-2-hive-2.3 +++ b/dev/deps/spark-deps-hadoop-2-hive-2.3 @@ -24,7 +24,6 @@ arrow-memory-core/9.0.0//arrow-memory-core-9.0.0.jar arrow-memory-netty/9.0.0//arrow-memory-netty-9.0.0.jar arrow-vector/9.0.0//arrow-vector-9.0.0.jar audience-annotations/0.5.0//audience-annotations-0.5.0.jar -automaton/1.11-8//automaton-1.11-8.jar avro-ipc/1.11.1//avro-ipc-1.11.1.jar avro-mapred/1.11.1//avro-mapred-1.11.1.jar avro/1.11.1//avro-1.11.1.jar @@ -69,7 +68,6 @@ error_prone_annotations/2.10.0//error_prone_annotations-2.10.0.jar failureaccess/1.0.1//failureaccess-1.0.1.jar flatbuffers-java/1.12.0//flatbuffers-java-1.12.0.jar gcs-connector/hadoop2-2.2.7/shaded/gcs-connector-hadoop2-2.2.7-shaded.jar -generex/1.0.2//generex-1.0.2.jar gmetric4j/1.0.10//gmetric4j-1.0.10.jar grpc-api/1.47.0//grpc-api-1.47.0.jar grpc-context/1.47.0//grpc-context-1.47.0.jar @@ -175,27 +173,30 @@ jsr305/3.0.0//jsr305-3.0.0.jar jta/1.1//jta-1.1.jar jul-to-slf4j/1.7.36//jul-to-slf4j-1.7.36.jar kryo-shaded/4.0.2//kryo-shaded-4.0.2.jar -kubernetes-client/5.12.3//kubernetes-client-5.12.3.jar -kubernetes-model-admissionregistration/5.12.3//kubernetes-model-admissionregistration-5.12.3.jar -kubernetes-model-apiextensions/5.12.3//kubernetes-model-apiextensions-5.12.3.jar -kubernetes-model-apps/5.12.3//kubernetes-model-apps-5.12.3.jar -kubernetes-model-autoscaling/5.12.3//kubernetes-model-autoscaling-5.12.3.jar -kubernetes-model-batch/5.12.3//kubernetes-model-batch-5.12.3.jar -kubernetes-model-certificates/5.12.3//kubernetes-model-certificates-5.12.3.jar -kubernetes-model-common/5.12.3//kubernetes-model-common-5.12.3.jar -kubernetes-model-coordination/5.12.3//kubernetes-model-coordination-5.12.3.jar -kubernetes-model-core/5.12.3//kubernetes-model-core-5.12.3.jar -kubernetes-model-discovery/5.12.3//kubernetes-model-discovery-5.12.3.jar -kubernetes-model-events/5.12.3//kubernetes-model-events-5.12.3.jar -kubernetes-model-extensions/5.12.3//kubernetes-model-extensions-5.12.3.jar -kubernetes-model-flowcontrol/5.12.3//kubernetes-model-flowcontrol-5.12.3.jar -kubernetes-model-metrics/5.12.3//kubernetes-model-metrics-5.12.3.jar -kubernetes-model-networking/5.12.3//kubernetes-model-networking-5.12.3.jar -kubernetes-model-node/5.12.3//kubernetes-model-node-5.12.3.jar -kubernetes-model-policy/5.12.3//kubernetes-model-policy-5.12.3.jar -kubernetes-model-rbac/5.12.3//kubernetes-model-rbac-5.12.3.jar -kubernetes-model-scheduling/5.12.3//kubernetes-model-scheduling-5.12.3.jar -kubernetes-model-storageclass/5.12.3//kubernetes-model-storageclass-5.12.3.jar +kubernetes-client-api/6.1.1//kubernetes-client-api-6.1.1.jar +kubernetes-client/6.1.1//kubernetes-client-6.1.1.jar +kubernetes-httpclient-okhttp/6.1.1//kubernetes-httpclient-okhttp-6.1.1.jar +kubernetes-model-admissionregistration/6.1.1//kubernetes-model-admissionregistration-6.1.1.jar +kubernetes-model-apiextensions/6.1.1//kubernetes-model-apiextensions-6.1.1.jar +kubernetes-model-apps/6.1.1//kubernetes-model-apps-6.1.1.jar +kubernetes-model-autoscaling/6.1.1//kubernetes-model-autoscaling-6.1.1.jar +kubernetes-model-batch/6.1.1//kubernetes-model-batch-6.1.1.jar +kubernetes-model-certificates/6.1.1//kubernetes-model-certificates-6.1.1.jar +kubernetes-model-common/6.1.1//kubernetes-model-common-6.1.1.jar +kubernetes-model-coordination/6.1.1//kubernetes-model-coordination-6.1.1.jar +kubernetes-model-core/6.1.1//kubernetes-model-core-6.1.1.jar +kubernetes-model-discovery/6.1.1//kubernetes-model-discovery-6.1.1.jar +kubernetes-model-events/6.1.1//kubernetes-model-events-6.1.1.jar +kubernetes-model-extensions/6.1.1//kubernetes-model-extensions-6.1.1.jar +kubernetes-model-flowcontrol/6.1.1//kubernetes-model-flowcontrol-6.1.1.jar +kubernetes-model-gatewayapi/6.1.1//kubernetes-model-gatewayapi-6.1.1.jar +kubernetes-model-metrics/6.1.1//kubernetes-model-metrics-6.1.1.jar +kubernetes-model-networking/6.1.1//kubernetes-model-networking-6.1.1.jar +kubernetes-model-node/6.1.1//kubernetes-model-node-6.1.1.jar +kubernetes-model-policy/6.1.1//kubernetes-model-policy-6.1.1.jar +kubernetes-model-rbac/6.1.1//kubernetes-model-rbac-6.1.1.jar +kubernetes-model-scheduling/6.1.1//kubernetes-model-scheduling-6.1.1.jar +kubernetes-model-storageclass/6.1.1//kubernetes-model-storageclass-6.1.1.jar lapack/3.0.2//lapack-3.0.2.jar leveldbjni-all/1.8//leveldbjni-all-1.8.jar libfb303/0.9.3//libfb303-0.9.3.jar diff --git a/dev/deps/spark-deps-hadoop-3-hive-2.3 b/dev/deps/spark-deps-hadoop-3-hive-2.3 index 1fab542bad938..c26dfa3f9ce7f 100644 --- a/dev/deps/spark-deps-hadoop-3-hive-2.3 +++ b/dev/deps/spark-deps-hadoop-3-hive-2.3 @@ -23,7 +23,6 @@ arrow-memory-core/9.0.0//arrow-memory-core-9.0.0.jar arrow-memory-netty/9.0.0//arrow-memory-netty-9.0.0.jar arrow-vector/9.0.0//arrow-vector-9.0.0.jar audience-annotations/0.5.0//audience-annotations-0.5.0.jar -automaton/1.11-8//automaton-1.11-8.jar avro-ipc/1.11.1//avro-ipc-1.11.1.jar avro-mapred/1.11.1//avro-mapred-1.11.1.jar avro/1.11.1//avro-1.11.1.jar @@ -66,7 +65,6 @@ error_prone_annotations/2.10.0//error_prone_annotations-2.10.0.jar failureaccess/1.0.1//failureaccess-1.0.1.jar flatbuffers-java/1.12.0//flatbuffers-java-1.12.0.jar gcs-connector/hadoop3-2.2.7/shaded/gcs-connector-hadoop3-2.2.7-shaded.jar -generex/1.0.2//generex-1.0.2.jar gmetric4j/1.0.10//gmetric4j-1.0.10.jar grpc-api/1.47.0//grpc-api-1.47.0.jar grpc-context/1.47.0//grpc-context-1.47.0.jar @@ -159,27 +157,30 @@ jsr305/3.0.0//jsr305-3.0.0.jar jta/1.1//jta-1.1.jar jul-to-slf4j/1.7.36//jul-to-slf4j-1.7.36.jar kryo-shaded/4.0.2//kryo-shaded-4.0.2.jar -kubernetes-client/5.12.3//kubernetes-client-5.12.3.jar -kubernetes-model-admissionregistration/5.12.3//kubernetes-model-admissionregistration-5.12.3.jar -kubernetes-model-apiextensions/5.12.3//kubernetes-model-apiextensions-5.12.3.jar -kubernetes-model-apps/5.12.3//kubernetes-model-apps-5.12.3.jar -kubernetes-model-autoscaling/5.12.3//kubernetes-model-autoscaling-5.12.3.jar -kubernetes-model-batch/5.12.3//kubernetes-model-batch-5.12.3.jar -kubernetes-model-certificates/5.12.3//kubernetes-model-certificates-5.12.3.jar -kubernetes-model-common/5.12.3//kubernetes-model-common-5.12.3.jar -kubernetes-model-coordination/5.12.3//kubernetes-model-coordination-5.12.3.jar -kubernetes-model-core/5.12.3//kubernetes-model-core-5.12.3.jar -kubernetes-model-discovery/5.12.3//kubernetes-model-discovery-5.12.3.jar -kubernetes-model-events/5.12.3//kubernetes-model-events-5.12.3.jar -kubernetes-model-extensions/5.12.3//kubernetes-model-extensions-5.12.3.jar -kubernetes-model-flowcontrol/5.12.3//kubernetes-model-flowcontrol-5.12.3.jar -kubernetes-model-metrics/5.12.3//kubernetes-model-metrics-5.12.3.jar -kubernetes-model-networking/5.12.3//kubernetes-model-networking-5.12.3.jar -kubernetes-model-node/5.12.3//kubernetes-model-node-5.12.3.jar -kubernetes-model-policy/5.12.3//kubernetes-model-policy-5.12.3.jar -kubernetes-model-rbac/5.12.3//kubernetes-model-rbac-5.12.3.jar -kubernetes-model-scheduling/5.12.3//kubernetes-model-scheduling-5.12.3.jar -kubernetes-model-storageclass/5.12.3//kubernetes-model-storageclass-5.12.3.jar +kubernetes-client-api/6.1.1//kubernetes-client-api-6.1.1.jar +kubernetes-client/6.1.1//kubernetes-client-6.1.1.jar +kubernetes-httpclient-okhttp/6.1.1//kubernetes-httpclient-okhttp-6.1.1.jar +kubernetes-model-admissionregistration/6.1.1//kubernetes-model-admissionregistration-6.1.1.jar +kubernetes-model-apiextensions/6.1.1//kubernetes-model-apiextensions-6.1.1.jar +kubernetes-model-apps/6.1.1//kubernetes-model-apps-6.1.1.jar +kubernetes-model-autoscaling/6.1.1//kubernetes-model-autoscaling-6.1.1.jar +kubernetes-model-batch/6.1.1//kubernetes-model-batch-6.1.1.jar +kubernetes-model-certificates/6.1.1//kubernetes-model-certificates-6.1.1.jar +kubernetes-model-common/6.1.1//kubernetes-model-common-6.1.1.jar +kubernetes-model-coordination/6.1.1//kubernetes-model-coordination-6.1.1.jar +kubernetes-model-core/6.1.1//kubernetes-model-core-6.1.1.jar +kubernetes-model-discovery/6.1.1//kubernetes-model-discovery-6.1.1.jar +kubernetes-model-events/6.1.1//kubernetes-model-events-6.1.1.jar +kubernetes-model-extensions/6.1.1//kubernetes-model-extensions-6.1.1.jar +kubernetes-model-flowcontrol/6.1.1//kubernetes-model-flowcontrol-6.1.1.jar +kubernetes-model-gatewayapi/6.1.1//kubernetes-model-gatewayapi-6.1.1.jar +kubernetes-model-metrics/6.1.1//kubernetes-model-metrics-6.1.1.jar +kubernetes-model-networking/6.1.1//kubernetes-model-networking-6.1.1.jar +kubernetes-model-node/6.1.1//kubernetes-model-node-6.1.1.jar +kubernetes-model-policy/6.1.1//kubernetes-model-policy-6.1.1.jar +kubernetes-model-rbac/6.1.1//kubernetes-model-rbac-6.1.1.jar +kubernetes-model-scheduling/6.1.1//kubernetes-model-scheduling-6.1.1.jar +kubernetes-model-storageclass/6.1.1//kubernetes-model-storageclass-6.1.1.jar lapack/3.0.2//lapack-3.0.2.jar leveldbjni-all/1.8//leveldbjni-all-1.8.jar libfb303/0.9.3//libfb303-0.9.3.jar diff --git a/pom.xml b/pom.xml index 5fbd82ad57add..22abe0f7d3cb7 100644 --- a/pom.xml +++ b/pom.xml @@ -216,7 +216,7 @@ 9.0.0 org.fusesource.leveldbjni - 5.12.3 + 6.1.1 ${java.home} diff --git a/resource-managers/kubernetes/core/pom.xml b/resource-managers/kubernetes/core/pom.xml index 1c729cc441ee3..30d5f7d2bb8f8 100644 --- a/resource-managers/kubernetes/core/pom.xml +++ b/resource-managers/kubernetes/core/pom.xml @@ -75,6 +75,11 @@ test + + io.fabric8 + kubernetes-httpclient-okhttp + ${kubernetes-client.version} + io.fabric8 kubernetes-client diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/SparkKubernetesClientFactory.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/SparkKubernetesClientFactory.scala index 51040857c64f8..0b806f046402e 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/SparkKubernetesClientFactory.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/SparkKubernetesClientFactory.scala @@ -21,7 +21,7 @@ import java.io.File import com.fasterxml.jackson.databind.ObjectMapper import com.google.common.base.Charsets import com.google.common.io.Files -import io.fabric8.kubernetes.client.{ConfigBuilder, DefaultKubernetesClient, KubernetesClient} +import io.fabric8.kubernetes.client.{ConfigBuilder, KubernetesClient, KubernetesClientBuilder} import io.fabric8.kubernetes.client.Config.KUBERNETES_REQUEST_RETRY_BACKOFFLIMIT_SYSTEM_PROPERTY import io.fabric8.kubernetes.client.Config.autoConfigure import io.fabric8.kubernetes.client.okhttp.OkHttpClientFactory @@ -115,7 +115,10 @@ private[spark] object SparkKubernetesClientFactory extends Logging { } logDebug("Kubernetes client config: " + new ObjectMapper().writerWithDefaultPrettyPrinter().writeValueAsString(config)) - new DefaultKubernetesClient(factoryWithCustomDispatcher.createHttpClient(config), config) + new KubernetesClientBuilder() + .withHttpClientFactory(factoryWithCustomDispatcher) + .withConfig(config) + .build() } private implicit class OptionConfigurableConfigBuilder(val configBuilder: ConfigBuilder) diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/K8sSubmitOps.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/K8sSubmitOps.scala index 0238d5eafdea1..2ce6181a2fe9c 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/K8sSubmitOps.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/K8sSubmitOps.scala @@ -19,9 +19,9 @@ package org.apache.spark.deploy.k8s.submit import scala.collection.JavaConverters._ import K8SSparkSubmitOperation.getGracePeriod -import io.fabric8.kubernetes.api.model.{Pod, PodList} +import io.fabric8.kubernetes.api.model.Pod import io.fabric8.kubernetes.client.KubernetesClient -import io.fabric8.kubernetes.client.dsl.{NonNamespaceOperation, PodResource} +import io.fabric8.kubernetes.client.dsl.PodResource import org.apache.spark.SparkConf import org.apache.spark.deploy.SparkSubmitOperation @@ -32,17 +32,15 @@ import org.apache.spark.deploy.k8s.KubernetesUtils.formatPodState import org.apache.spark.util.{CommandLineLoggingUtils, Utils} private sealed trait K8sSubmitOp extends CommandLineLoggingUtils { - type NON_NAMESPACED_PODS = - NonNamespaceOperation[Pod, PodList, PodResource[Pod]] def executeOnPod(pName: String, namespace: Option[String], sparkConf: SparkConf) (implicit client: KubernetesClient): Unit def executeOnGlob(pods: List[Pod], ns: Option[String], sparkConf: SparkConf) (implicit client: KubernetesClient): Unit - def listPodsInNameSpace(namespace: Option[String]) - (implicit client: KubernetesClient): NON_NAMESPACED_PODS = { + def getPod(namespace: Option[String], name: String) + (implicit client: KubernetesClient): PodResource = { namespace match { - case Some(ns) => client.pods.inNamespace(ns) - case None => client.pods + case Some(ns) => client.pods.inNamespace(ns).withName(name) + case None => client.pods.withName(name) } } } @@ -50,7 +48,7 @@ private sealed trait K8sSubmitOp extends CommandLineLoggingUtils { private class KillApplication extends K8sSubmitOp { override def executeOnPod(pName: String, namespace: Option[String], sparkConf: SparkConf) (implicit client: KubernetesClient): Unit = { - val podToDelete = listPodsInNameSpace(namespace).withName(pName) + val podToDelete = getPod(namespace, pName) if (Option(podToDelete).isDefined) { getGracePeriod(sparkConf) match { @@ -66,19 +64,11 @@ private class KillApplication extends K8sSubmitOp { (implicit client: KubernetesClient): Unit = { if (pods.nonEmpty) { pods.foreach { pod => printMessage(s"Deleting driver pod: ${pod.getMetadata.getName}.") } - val listedPods = listPodsInNameSpace(namespace) - getGracePeriod(sparkConf) match { case Some(period) => - // this is not using the batch api because no option is provided - // when using the grace period. - pods.foreach { pod => - listedPods - .withName(pod.getMetadata.getName) - .withGracePeriod(period) - .delete() - } - case _ => listedPods.delete(pods.asJava) + client.resourceList(pods.asJava).withGracePeriod(period).delete() + case _ => + client.resourceList(pods.asJava).delete() } } else { printMessage("No applications found.") @@ -89,7 +79,7 @@ private class KillApplication extends K8sSubmitOp { private class ListStatus extends K8sSubmitOp { override def executeOnPod(pName: String, namespace: Option[String], sparkConf: SparkConf) (implicit client: KubernetesClient): Unit = { - val pod = listPodsInNameSpace(namespace).withName(pName).get() + val pod = getPod(namespace, pName).get() if (Option(pod).isDefined) { printMessage("Application status (driver): " + Option(pod).map(formatPodState).getOrElse("unknown.")) @@ -145,13 +135,12 @@ private[spark] class K8SSparkSubmitOperation extends SparkSubmitOperation .pods } val pods = ops + .withLabel(SPARK_ROLE_LABEL, SPARK_POD_DRIVER_ROLE) .list() .getItems .asScala .filter { pod => - val meta = pod.getMetadata - meta.getName.startsWith(pName.stripSuffix("*")) && - meta.getLabels.get(SPARK_ROLE_LABEL) == SPARK_POD_DRIVER_ROLE + pod.getMetadata.getName.startsWith(pName.stripSuffix("*")) }.toList op.executeOnGlob(pods, namespace, sparkConf) } else { diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/KubernetesClientApplication.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/KubernetesClientApplication.scala index 3a3ab081fe843..14d3c4d1f42f4 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/KubernetesClientApplication.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/KubernetesClientApplication.scala @@ -149,7 +149,8 @@ private[spark] class Client( var watch: Watch = null var createdDriverPod: Pod = null try { - createdDriverPod = kubernetesClient.pods().create(resolvedDriverPod) + createdDriverPod = + kubernetesClient.pods().inNamespace(conf.namespace).resource(resolvedDriverPod).create() } catch { case NonFatal(e) => kubernetesClient.resourceList(preKubernetesResources: _*).delete() @@ -163,7 +164,7 @@ private[spark] class Client( kubernetesClient.resourceList(preKubernetesResources: _*).createOrReplace() } catch { case NonFatal(e) => - kubernetesClient.pods().delete(createdDriverPod) + kubernetesClient.pods().resource(createdDriverPod).delete() kubernetesClient.resourceList(preKubernetesResources: _*).delete() throw e } @@ -175,7 +176,7 @@ private[spark] class Client( kubernetesClient.resourceList(otherKubernetesResources: _*).createOrReplace() } catch { case NonFatal(e) => - kubernetesClient.pods().delete(createdDriverPod) + kubernetesClient.pods().resource(createdDriverPod).delete() throw e } @@ -185,6 +186,7 @@ private[spark] class Client( while (true) { val podWithName = kubernetesClient .pods() + .inNamespace(conf.namespace) .withName(driverPodName) // Reset resource to old before we start the watch, this is important for race conditions watcher.reset() diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsAllocator.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsAllocator.scala index 9bdc30e446646..524ab0c845c6d 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsAllocator.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsAllocator.scala @@ -76,6 +76,7 @@ class ExecutorPodsAllocator( val driverPod = kubernetesDriverPodName .map(name => Option(kubernetesClient.pods() + .inNamespace(namespace) .withName(name) .get()) .getOrElse(throw new SparkException( @@ -112,6 +113,7 @@ class ExecutorPodsAllocator( Utils.tryLogNonFatalError { kubernetesClient .pods() + .inNamespace(namespace) .withName(pod.getMetadata.getName) .waitUntilReady(driverPodReadinessTimeout, TimeUnit.SECONDS) } @@ -185,6 +187,7 @@ class ExecutorPodsAllocator( Utils.tryLogNonFatalError { kubernetesClient .pods() + .inNamespace(namespace) .withLabel(SPARK_APP_ID_LABEL, applicationId) .withLabel(SPARK_ROLE_LABEL, SPARK_POD_EXECUTOR_ROLE) .withLabelIn(SPARK_EXECUTOR_ID_LABEL, timedOut.toSeq.map(_.toString): _*) @@ -299,6 +302,7 @@ class ExecutorPodsAllocator( Utils.tryLogNonFatalError { kubernetesClient .pods() + .inNamespace(namespace) .withField("status.phase", "Pending") .withLabel(SPARK_APP_ID_LABEL, applicationId) .withLabel(SPARK_ROLE_LABEL, SPARK_POD_EXECUTOR_ROLE) @@ -363,6 +367,7 @@ class ExecutorPodsAllocator( try { val createdPVCs = kubernetesClient .persistentVolumeClaims + .inNamespace(namespace) .withLabel("spark-app-selector", applicationId) .list() .getItems @@ -406,7 +411,8 @@ class ExecutorPodsAllocator( .build() val resources = replacePVCsIfNeeded( podWithAttachedContainer, resolvedExecutorSpec.executorKubernetesResources, reusablePVCs) - val createdExecutorPod = kubernetesClient.pods().create(podWithAttachedContainer) + val createdExecutorPod = + kubernetesClient.pods().inNamespace(namespace).resource(podWithAttachedContainer).create() try { addOwnerReference(createdExecutorPod, resources) resources @@ -418,13 +424,16 @@ class ExecutorPodsAllocator( val pvc = resource.asInstanceOf[PersistentVolumeClaim] logInfo(s"Trying to create PersistentVolumeClaim ${pvc.getMetadata.getName} with " + s"StorageClass ${pvc.getSpec.getStorageClassName}") - kubernetesClient.persistentVolumeClaims().create(pvc) + kubernetesClient.persistentVolumeClaims().inNamespace(namespace).resource(pvc).create() } newlyCreatedExecutors(newExecutorId) = (resourceProfileId, clock.getTimeMillis()) logDebug(s"Requested executor with id $newExecutorId from Kubernetes.") } catch { case NonFatal(e) => - kubernetesClient.pods().delete(createdExecutorPod) + kubernetesClient.pods() + .inNamespace(namespace) + .resource(createdExecutorPod) + .delete() throw e } } @@ -475,6 +484,7 @@ class ExecutorPodsAllocator( Utils.tryLogNonFatalError { kubernetesClient .pods() + .inNamespace(namespace) .withLabel(SPARK_APP_ID_LABEL, applicationId) .withLabel(SPARK_ROLE_LABEL, SPARK_POD_EXECUTOR_ROLE) .delete() diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsLifecycleManager.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsLifecycleManager.scala index e255de4d2dd9e..7d32b35eab95f 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsLifecycleManager.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsLifecycleManager.scala @@ -17,6 +17,7 @@ package org.apache.spark.scheduler.cluster.k8s import java.util.concurrent.TimeUnit +import java.util.function.UnaryOperator import com.google.common.cache.CacheBuilder import io.fabric8.kubernetes.api.model.{Pod, PodBuilder} @@ -57,6 +58,8 @@ private[spark] class ExecutorPodsLifecycleManager( // This set is cleaned up when a snapshot containing the updated pod is processed. private val inactivatedPods = mutable.HashSet.empty[Long] + private val namespace = conf.get(KUBERNETES_NAMESPACE) + def start(schedulerBackend: KubernetesClusterSchedulerBackend): Unit = { val eventProcessingInterval = conf.get(KUBERNETES_EXECUTOR_EVENT_PROCESSING_INTERVAL) snapshotsStore.addSubscriber(eventProcessingInterval) { @@ -168,6 +171,7 @@ private[spark] class ExecutorPodsLifecycleManager( // of getting rid of the pod is what matters. kubernetesClient .pods() + .inNamespace(namespace) .withName(updatedPod.getMetadata.getName) .delete() } else if (!inactivatedPods.contains(execId) && !isPodInactive(updatedPod)) { @@ -175,16 +179,11 @@ private[spark] class ExecutorPodsLifecycleManager( // can be ignored in future updates from the API server. logDebug(s"Marking executor ${updatedPod.getMetadata.getName} as inactive since " + "deletion is disabled.") - val inactivatedPod = new PodBuilder(updatedPod) - .editMetadata() - .addToLabels(Map(SPARK_EXECUTOR_INACTIVE_LABEL -> "true").asJava) - .endMetadata() - .build() - kubernetesClient .pods() + .inNamespace(namespace) .withName(updatedPod.getMetadata.getName) - .patch(inactivatedPod) + .edit(executorInactivationFn) inactivatedPods += execId } @@ -274,4 +273,9 @@ private object ExecutorPodsLifecycleManager { s"${code}${humanStr}" } + def executorInactivationFn: UnaryOperator[Pod] = (p: Pod) => new PodBuilder(p) + .editOrNewMetadata() + .addToLabels(SPARK_EXECUTOR_INACTIVE_LABEL, "true") + .endMetadata() + .build() } diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsWatchSnapshotSource.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsWatchSnapshotSource.scala index a334ece565377..4809222650d82 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsWatchSnapshotSource.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsWatchSnapshotSource.scala @@ -25,6 +25,7 @@ import io.fabric8.kubernetes.client.Watcher.Action import org.apache.spark.{SparkConf, SparkContext} import org.apache.spark.annotation.{DeveloperApi, Since, Stable} import org.apache.spark.deploy.k8s.Config.KUBERNETES_EXECUTOR_ENABLE_API_WATCHER +import org.apache.spark.deploy.k8s.Config.KUBERNETES_NAMESPACE import org.apache.spark.deploy.k8s.Constants._ import org.apache.spark.internal.Logging import org.apache.spark.util.Utils @@ -46,6 +47,8 @@ class ExecutorPodsWatchSnapshotSource( private var watchConnection: Closeable = _ private val enableWatching = conf.get(KUBERNETES_EXECUTOR_ENABLE_API_WATCHER) + private val namespace = conf.get(KUBERNETES_NAMESPACE) + // If we're constructed with the old API get the SparkConf from the running SparkContext. def this(snapshotsStore: ExecutorPodsSnapshotsStore, kubernetesClient: KubernetesClient) = { this(snapshotsStore, kubernetesClient, SparkContext.getOrCreate().conf) @@ -58,6 +61,7 @@ class ExecutorPodsWatchSnapshotSource( logDebug(s"Starting to watch for pods with labels $SPARK_APP_ID_LABEL=$applicationId," + s" $SPARK_ROLE_LABEL=$SPARK_POD_EXECUTOR_ROLE.") watchConnection = kubernetesClient.pods() + .inNamespace(namespace) .withLabel(SPARK_APP_ID_LABEL, applicationId) .withLabel(SPARK_ROLE_LABEL, SPARK_POD_EXECUTOR_ROLE) .watch(new ExecutorPodsWatcher()) diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesClusterSchedulerBackend.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesClusterSchedulerBackend.scala index 985b8b7bef051..4a8cb6d705051 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesClusterSchedulerBackend.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesClusterSchedulerBackend.scala @@ -19,7 +19,6 @@ package org.apache.spark.scheduler.cluster.k8s import java.util.concurrent.{ScheduledExecutorService, TimeUnit} import java.util.concurrent.atomic.AtomicInteger -import scala.collection.JavaConverters._ import scala.concurrent.Future import io.fabric8.kubernetes.api.model.Pod @@ -69,6 +68,8 @@ private[spark] class KubernetesClusterSchedulerBackend( private val defaultProfile = scheduler.sc.resourceProfileManager.defaultResourceProfile + private val namespace = conf.get(KUBERNETES_NAMESPACE) + // Allow removeExecutor to be accessible by ExecutorPodsLifecycleEventHandler private[k8s] def doRemoveExecutor(executorId: String, reason: ExecutorLossReason): Unit = { removeExecutor(executorId, reason) @@ -77,7 +78,7 @@ private[spark] class KubernetesClusterSchedulerBackend( private def setUpExecutorConfigMap(driverPod: Option[Pod]): Unit = { val configMapName = KubernetesClientUtils.configMapNameExecutor val resolvedExecutorProperties = - Map(KUBERNETES_NAMESPACE.key -> conf.get(KUBERNETES_NAMESPACE)) + Map(KUBERNETES_NAMESPACE.key -> namespace) val confFilesMap = KubernetesClientUtils .buildSparkConfDirFilesMap(configMapName, conf, resolvedExecutorProperties) ++ resolvedExecutorProperties @@ -85,7 +86,7 @@ private[spark] class KubernetesClusterSchedulerBackend( Map(SPARK_APP_ID_LABEL -> applicationId(), SPARK_ROLE_LABEL -> SPARK_POD_EXECUTOR_ROLE) val configMap = KubernetesClientUtils.buildConfigMap(configMapName, confFilesMap, labels) KubernetesUtils.addOwnerReference(driverPod.orNull, Seq(configMap)) - kubernetesClient.configMaps().create(configMap) + kubernetesClient.configMaps().inNamespace(namespace).resource(configMap).create() } /** @@ -136,6 +137,7 @@ private[spark] class KubernetesClusterSchedulerBackend( Utils.tryLogNonFatalError { kubernetesClient .services() + .inNamespace(namespace) .withLabel(SPARK_APP_ID_LABEL, applicationId()) .delete() } @@ -145,6 +147,7 @@ private[spark] class KubernetesClusterSchedulerBackend( Utils.tryLogNonFatalError { kubernetesClient .persistentVolumeClaims() + .inNamespace(namespace) .withLabel(SPARK_APP_ID_LABEL, applicationId()) .delete() } @@ -158,6 +161,7 @@ private[spark] class KubernetesClusterSchedulerBackend( Utils.tryLogNonFatalError { kubernetesClient .configMaps() + .inNamespace(namespace) .withLabel(SPARK_APP_ID_LABEL, applicationId()) .withLabel(SPARK_ROLE_LABEL, SPARK_POD_EXECUTOR_ROLE) .delete() @@ -193,22 +197,19 @@ private[spark] class KubernetesClusterSchedulerBackend( conf.get(KUBERNETES_EXECUTOR_DECOMMISSION_LABEL).foreach { label => val labelTask = new Runnable() { override def run(): Unit = Utils.tryLogNonFatalError { - - val podsToLabel = kubernetesClient.pods() + kubernetesClient.pods() + .inNamespace(namespace) .withLabel(SPARK_APP_ID_LABEL, applicationId()) .withLabel(SPARK_ROLE_LABEL, SPARK_POD_EXECUTOR_ROLE) .withLabelIn(SPARK_EXECUTOR_ID_LABEL, execIds: _*) - .list().getItems().asScala - - podsToLabel.foreach { pod => - kubernetesClient.pods() - .inNamespace(pod.getMetadata.getNamespace) - .withName(pod.getMetadata.getName) - .edit({p: Pod => new PodBuilder(p).editMetadata() - .addToLabels(label, - conf.get(KUBERNETES_EXECUTOR_DECOMMISSION_LABEL_VALUE).getOrElse("")) - .endMetadata() - .build()}) + .resources() + .forEach { podResource => + podResource.edit({ p: Pod => + new PodBuilder(p).editOrNewMetadata() + .addToLabels(label, + conf.get(KUBERNETES_EXECUTOR_DECOMMISSION_LABEL_VALUE).getOrElse("")) + .endMetadata() + .build()}) } } } @@ -246,6 +247,7 @@ private[spark] class KubernetesClusterSchedulerBackend( override def run(): Unit = Utils.tryLogNonFatalError { val running = kubernetesClient .pods() + .inNamespace(namespace) .withField("status.phase", "Running") .withLabel(SPARK_APP_ID_LABEL, applicationId()) .withLabel(SPARK_ROLE_LABEL, SPARK_POD_EXECUTOR_ROLE) @@ -302,6 +304,7 @@ private[spark] class KubernetesClusterSchedulerBackend( override def run(): Unit = Utils.tryLogNonFatalError { // Label the pod with it's exec ID kubernetesClient.pods() + .inNamespace(namespace) .withName(x.podName) .edit({p: Pod => new PodBuilder(p).editMetadata() .addToLabels(SPARK_EXECUTOR_ID_LABEL, newId) diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/StatefulSetPodsAllocator.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/StatefulSetPodsAllocator.scala index 294ee70168b23..5eeab5501e47f 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/StatefulSetPodsAllocator.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/StatefulSetPodsAllocator.scala @@ -53,6 +53,7 @@ class StatefulSetPodsAllocator( val driverPod = kubernetesDriverPodName .map(name => Option(kubernetesClient.pods() + .inNamespace(namespace) .withName(name) .get()) .getOrElse(throw new SparkException( @@ -69,6 +70,7 @@ class StatefulSetPodsAllocator( Utils.tryLogNonFatalError { kubernetesClient .pods() + .inNamespace(namespace) .withName(pod.getMetadata.getName) .waitUntilReady(driverPodReadinessTimeout, TimeUnit.SECONDS) } @@ -99,7 +101,7 @@ class StatefulSetPodsAllocator( applicationId: String, resourceProfileId: Int): Unit = { if (setsCreated.contains(resourceProfileId)) { - val statefulset = kubernetesClient.apps().statefulSets().withName( + val statefulset = kubernetesClient.apps().statefulSets().inNamespace(namespace).withName( setName(applicationId, resourceProfileId: Int)) statefulset.scale(expected, false /* wait */) } else { @@ -169,7 +171,7 @@ class StatefulSetPodsAllocator( val statefulSet = new io.fabric8.kubernetes.api.model.apps.StatefulSetBuilder() .withNewMetadata() .withName(setName(applicationId, resourceProfileId)) - .withNamespace(conf.get(KUBERNETES_NAMESPACE)) + .withNamespace(namespace) .endMetadata() .withNewSpec() .withPodManagementPolicy("Parallel") @@ -185,7 +187,7 @@ class StatefulSetPodsAllocator( .build() addOwnerReference(driverPod.get, Seq(statefulSet)) - kubernetesClient.apps().statefulSets().create(statefulSet) + kubernetesClient.apps().statefulSets().inNamespace(namespace).resource(statefulSet).create() setsCreated += (resourceProfileId) } } @@ -194,7 +196,12 @@ class StatefulSetPodsAllocator( // Cleanup the statefulsets when we stop setsCreated.foreach { rpid => Utils.tryLogNonFatalError { - kubernetesClient.apps().statefulSets().withName(setName(applicationId, rpid)).delete() + kubernetesClient + .apps() + .statefulSets() + .inNamespace(namespace) + .withName(setName(applicationId, rpid)) + .delete() } } } diff --git a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/Fabric8Aliases.scala b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/Fabric8Aliases.scala index 14405da72810c..1a4bc9781da2f 100644 --- a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/Fabric8Aliases.scala +++ b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/Fabric8Aliases.scala @@ -17,19 +17,31 @@ package org.apache.spark.deploy.k8s import io.fabric8.kubernetes.api.model.{ConfigMap, ConfigMapList, HasMetadata, PersistentVolumeClaim, PersistentVolumeClaimList, Pod, PodList} -import io.fabric8.kubernetes.client.dsl.{FilterWatchListDeletable, MixedOperation, NamespaceListVisitFromServerGetDeleteRecreateWaitApplicable, PodResource, Resource} +import io.fabric8.kubernetes.api.model.apps.StatefulSet +import io.fabric8.kubernetes.api.model.apps.StatefulSetList +import io.fabric8.kubernetes.client.dsl.{FilterWatchListDeletable, MixedOperation, NamespaceListVisitFromServerGetDeleteRecreateWaitApplicable, NonNamespaceOperation, PodResource, Resource, RollableScalableResource} object Fabric8Aliases { - type PODS = MixedOperation[Pod, PodList, PodResource[Pod]] + type PODS = MixedOperation[Pod, PodList, PodResource] + type PODS_WITH_NAMESPACE = NonNamespaceOperation[Pod, PodList, PodResource] type CONFIG_MAPS = MixedOperation[ ConfigMap, ConfigMapList, Resource[ConfigMap]] - type LABELED_PODS = FilterWatchListDeletable[Pod, PodList] - type LABELED_CONFIG_MAPS = FilterWatchListDeletable[ConfigMap, ConfigMapList] - type SINGLE_POD = PodResource[Pod] + type CONFIG_MAPS_WITH_NAMESPACE = + NonNamespaceOperation[ConfigMap, ConfigMapList, Resource[ConfigMap]] + type CONFIG_MAPS_RESOURCE = Resource[ConfigMap] + type LABELED_PODS = FilterWatchListDeletable[Pod, PodList, PodResource] + type LABELED_CONFIG_MAPS = FilterWatchListDeletable[ConfigMap, ConfigMapList, Resource[ConfigMap]] + type SINGLE_POD = PodResource type RESOURCE_LIST = NamespaceListVisitFromServerGetDeleteRecreateWaitApplicable[ HasMetadata] + type STATEFUL_SET_RES = RollableScalableResource[StatefulSet] + type STATEFUL_SETS = MixedOperation[StatefulSet, StatefulSetList, STATEFUL_SET_RES] + type STATEFUL_SETS_NAMESPACED = + NonNamespaceOperation[StatefulSet, StatefulSetList, STATEFUL_SET_RES] type PERSISTENT_VOLUME_CLAIMS = MixedOperation[PersistentVolumeClaim, PersistentVolumeClaimList, Resource[PersistentVolumeClaim]] - type LABELED_PERSISTENT_VOLUME_CLAIMS = - FilterWatchListDeletable[PersistentVolumeClaim, PersistentVolumeClaimList] + type PVC_WITH_NAMESPACE = NonNamespaceOperation[PersistentVolumeClaim, PersistentVolumeClaimList, + Resource[PersistentVolumeClaim]] + type LABELED_PERSISTENT_VOLUME_CLAIMS = FilterWatchListDeletable[PersistentVolumeClaim, + PersistentVolumeClaimList, Resource[PersistentVolumeClaim]] } diff --git a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/PodBuilderSuite.scala b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/PodBuilderSuite.scala index 642c18db541e1..dc2d354af9891 100644 --- a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/PodBuilderSuite.scala +++ b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/PodBuilderSuite.scala @@ -20,12 +20,13 @@ import java.io.File import io.fabric8.kubernetes.api.model.{Config => _, _} import io.fabric8.kubernetes.client.KubernetesClient -import io.fabric8.kubernetes.client.dsl.{MixedOperation, PodResource} +import io.fabric8.kubernetes.client.dsl.PodResource import org.mockito.ArgumentMatchers.any import org.mockito.Mockito.{mock, never, verify, when} import scala.collection.JavaConverters._ import org.apache.spark.{SparkConf, SparkException, SparkFunSuite} +import org.apache.spark.deploy.k8s.Fabric8Aliases._ import org.apache.spark.deploy.k8s.features.{KubernetesDriverCustomFeatureConfigStep, KubernetesExecutorCustomFeatureConfigStep, KubernetesFeatureConfigStep} import org.apache.spark.internal.config.ConfigEntry @@ -156,9 +157,8 @@ abstract class PodBuilderSuite extends SparkFunSuite { protected def mockKubernetesClient(pod: Pod = podWithSupportedFeatures()): KubernetesClient = { val kubernetesClient = mock(classOf[KubernetesClient]) - val pods = - mock(classOf[MixedOperation[Pod, PodList, PodResource[Pod]]]) - val podResource = mock(classOf[PodResource[Pod]]) + val pods = mock(classOf[PODS]) + val podResource = mock(classOf[PodResource]) when(kubernetesClient.pods()).thenReturn(pods) when(pods.load(any(classOf[File]))).thenReturn(podResource) when(podResource.get()).thenReturn(pod) diff --git a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/submit/ClientSuite.scala b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/submit/ClientSuite.scala index 12a5202b9d067..a8c25ab5002c0 100644 --- a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/submit/ClientSuite.scala +++ b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/submit/ClientSuite.scala @@ -149,7 +149,10 @@ class ClientSuite extends SparkFunSuite with BeforeAndAfter { private var podOperations: PODS = _ @Mock - private var namedPods: PodResource[Pod] = _ + private var podsWithNamespace: PODS_WITH_NAMESPACE = _ + + @Mock + private var namedPods: PodResource = _ @Mock private var loggingPodStatusWatcher: LoggingPodStatusWatcher = _ @@ -170,11 +173,13 @@ class ClientSuite extends SparkFunSuite with BeforeAndAfter { resourceNamePrefix = Some(KUBERNETES_RESOURCE_PREFIX)) when(driverBuilder.buildFromFeatures(kconf, kubernetesClient)).thenReturn(BUILT_KUBERNETES_SPEC) when(kubernetesClient.pods()).thenReturn(podOperations) - when(podOperations.withName(POD_NAME)).thenReturn(namedPods) + when(podOperations.inNamespace(kconf.namespace)).thenReturn(podsWithNamespace) + when(podsWithNamespace.withName(POD_NAME)).thenReturn(namedPods) createdPodArgumentCaptor = ArgumentCaptor.forClass(classOf[Pod]) createdResourcesArgumentCaptor = ArgumentCaptor.forClass(classOf[HasMetadata]) - when(podOperations.create(fullExpectedPod())).thenReturn(podWithOwnerReference()) + when(podsWithNamespace.resource(fullExpectedPod())).thenReturn(namedPods) + when(namedPods.create()).thenReturn(podWithOwnerReference()) when(namedPods.watch(loggingPodStatusWatcher)).thenReturn(mock[Watch]) when(loggingPodStatusWatcher.watchOrStop(kconf.namespace + ":" + POD_NAME)).thenReturn(true) doReturn(resourceList) @@ -189,7 +194,8 @@ class ClientSuite extends SparkFunSuite with BeforeAndAfter { kubernetesClient, loggingPodStatusWatcher) submissionClient.run() - verify(podOperations).create(fullExpectedPod()) + verify(podsWithNamespace).resource(fullExpectedPod()) + verify(namedPods).create() } test("The client should create Kubernetes resources") { @@ -298,8 +304,9 @@ class ClientSuite extends SparkFunSuite with BeforeAndAfter { val expectedKeyToPaths = (expectedConfFiles.map(x => new KeyToPath(x, 420, x)).toList ++ List(KEY_TO_PATH)).sortBy(x => x.getKey) - when(podOperations.create(fullExpectedPod(expectedKeyToPaths))) - .thenReturn(podWithOwnerReference(expectedKeyToPaths)) + when(podsWithNamespace.resource(fullExpectedPod(expectedKeyToPaths))) + .thenReturn(namedPods) + when(namedPods.create()).thenReturn(podWithOwnerReference(expectedKeyToPaths)) kconf = KubernetesTestConf.createDriverConf(sparkConf = sparkConf, resourceNamePrefix = Some(KUBERNETES_RESOURCE_PREFIX)) diff --git a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/submit/K8sSubmitOpSuite.scala b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/submit/K8sSubmitOpSuite.scala index 142d3fe112d69..3d30fb320d641 100644 --- a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/submit/K8sSubmitOpSuite.scala +++ b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/submit/K8sSubmitOpSuite.scala @@ -17,12 +17,13 @@ package org.apache.spark.deploy.k8s.submit import java.io.PrintStream +import java.util.Arrays import scala.collection.JavaConverters._ import io.fabric8.kubernetes.api.model._ -import io.fabric8.kubernetes.client.KubernetesClient -import io.fabric8.kubernetes.client.dsl.PodResource +import io.fabric8.kubernetes.client.{KubernetesClient, PropagationPolicyConfigurable} +import io.fabric8.kubernetes.client.dsl.{Deletable, NamespaceListVisitFromServerGetDeleteRecreateWaitApplicable, PodResource} import org.mockito.{ArgumentMatchers, Mock, MockitoAnnotations} import org.mockito.Mockito.{times, verify, when} import org.scalatest.BeforeAndAfter @@ -30,9 +31,10 @@ import org.scalatest.BeforeAndAfter import org.apache.spark.{SparkConf, SparkFunSuite} import org.apache.spark.deploy.k8s.Config.KUBERNETES_SUBMIT_GRACE_PERIOD import org.apache.spark.deploy.k8s.Constants.{SPARK_APP_ID_LABEL, SPARK_POD_DRIVER_ROLE, SPARK_ROLE_LABEL} -import org.apache.spark.deploy.k8s.Fabric8Aliases.PODS +import org.apache.spark.deploy.k8s.Fabric8Aliases.{PODS, PODS_WITH_NAMESPACE} import org.apache.spark.scheduler.cluster.k8s.ExecutorLifecycleTestUtils.TEST_SPARK_APP_ID + class K8sSubmitOpSuite extends SparkFunSuite with BeforeAndAfter { private val driverPodName1 = "driver1" private val driverPodName2 = "driver2" @@ -45,28 +47,39 @@ class K8sSubmitOpSuite extends SparkFunSuite with BeforeAndAfter { private var podOperations: PODS = _ @Mock - private var driverPodOperations1: PodResource[Pod] = _ + private var podsWithNamespace: PODS_WITH_NAMESPACE = _ + + @Mock + private var driverPodOperations1: PodResource = _ @Mock - private var driverPodOperations2: PodResource[Pod] = _ + private var driverPodOperations2: PodResource = _ @Mock private var kubernetesClient: KubernetesClient = _ + @Mock + private var deletable: PropagationPolicyConfigurable[_ <: Deletable] = _ + + @Mock + private var deletableList: + NamespaceListVisitFromServerGetDeleteRecreateWaitApplicable[HasMetadata] = _ + @Mock private var err: PrintStream = _ + private def doReturn(value: Any) = org.mockito.Mockito.doReturn(value, Seq.empty: _*) + before { MockitoAnnotations.openMocks(this).close() when(kubernetesClient.pods()).thenReturn(podOperations) - when(podOperations.inNamespace(namespace)).thenReturn(podOperations) - when(podOperations.delete(podList.asJava)).thenReturn(true) - when(podOperations.withName(driverPodName1)).thenReturn(driverPodOperations1) - when(podOperations.withName(driverPodName2)).thenReturn(driverPodOperations2) + when(podOperations.inNamespace(namespace)).thenReturn(podsWithNamespace) + when(podsWithNamespace.withName(driverPodName1)).thenReturn(driverPodOperations1) + when(podsWithNamespace.withName(driverPodName2)).thenReturn(driverPodOperations2) when(driverPodOperations1.get).thenReturn(driverPod1) - when(driverPodOperations1.delete()).thenReturn(true) + when(driverPodOperations1.delete()).thenReturn(Arrays.asList(new StatusDetails)) when(driverPodOperations2.get).thenReturn(driverPod2) - when(driverPodOperations2.delete()).thenReturn(true) + when(driverPodOperations2.delete()).thenReturn(Arrays.asList(new StatusDetails)) } test("List app status") { @@ -101,18 +114,19 @@ class K8sSubmitOpSuite extends SparkFunSuite with BeforeAndAfter { implicit val kubeClient: KubernetesClient = kubernetesClient val killApp = new KillApplication val conf = new SparkConf().set(KUBERNETES_SUBMIT_GRACE_PERIOD, 1L) - when(driverPodOperations1.withGracePeriod(1L)).thenReturn(driverPodOperations1) + doReturn(deletable).when(driverPodOperations1).withGracePeriod(1L) killApp.executeOnPod(driverPodName1, Option(namespace), conf) verify(driverPodOperations1, times(1)).withGracePeriod(1L) - verify(driverPodOperations1, times(1)).delete() + verify(deletable, times(1)).delete() } test("Kill multiple apps with glob without gracePeriod") { implicit val kubeClient: KubernetesClient = kubernetesClient val killApp = new KillApplication killApp.printStream = err + doReturn(deletableList).when(kubernetesClient).resourceList(podList.asJava) killApp.executeOnGlob(podList, Option(namespace), new SparkConf()) - verify(podOperations, times(1)).delete(podList.asJava) + verify(deletableList, times(1)).delete() // scalastyle:off verify(err).println(ArgumentMatchers.eq(s"Deleting driver pod: $driverPodName1.")) verify(err).println(ArgumentMatchers.eq(s"Deleting driver pod: $driverPodName2.")) diff --git a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsAllocatorSuite.scala b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsAllocatorSuite.scala index 7ce0b57d1e9f3..caec9ef920121 100644 --- a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsAllocatorSuite.scala +++ b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsAllocatorSuite.scala @@ -26,7 +26,7 @@ import io.fabric8.kubernetes.api.model._ import io.fabric8.kubernetes.client.{KubernetesClient, KubernetesClientException} import io.fabric8.kubernetes.client.dsl.PodResource import org.mockito.{Mock, MockitoAnnotations} -import org.mockito.ArgumentMatchers.{any, eq => meq} +import org.mockito.ArgumentMatchers.{any, anyString, eq => meq} import org.mockito.Mockito.{never, times, verify, when} import org.mockito.invocation.InvocationOnMock import org.mockito.stubbing.Answer @@ -77,9 +77,18 @@ class ExecutorPodsAllocatorSuite extends SparkFunSuite with BeforeAndAfter { @Mock private var podOperations: PODS = _ + @Mock + private var podsWithNamespace: PODS_WITH_NAMESPACE = _ + + @Mock + private var podResource: PodResource = _ + @Mock private var persistentVolumeClaims: PERSISTENT_VOLUME_CLAIMS = _ + @Mock + private var pvcWithNamespace: PVC_WITH_NAMESPACE = _ + @Mock private var labeledPersistentVolumeClaims: LABELED_PERSISTENT_VOLUME_CLAIMS = _ @@ -90,7 +99,7 @@ class ExecutorPodsAllocatorSuite extends SparkFunSuite with BeforeAndAfter { private var labeledPods: LABELED_PODS = _ @Mock - private var driverPodOperations: PodResource[Pod] = _ + private var driverPodOperations: PodResource = _ @Mock private var executorBuilder: KubernetesExecutorBuilder = _ @@ -107,7 +116,15 @@ class ExecutorPodsAllocatorSuite extends SparkFunSuite with BeforeAndAfter { before { MockitoAnnotations.openMocks(this).close() when(kubernetesClient.pods()).thenReturn(podOperations) - when(podOperations.withName(driverPodName)).thenReturn(driverPodOperations) + when(podOperations.inNamespace("default")).thenReturn(podsWithNamespace) + when(podsWithNamespace.withName(driverPodName)).thenReturn(driverPodOperations) + when(podsWithNamespace.resource(any())).thenReturn(podResource) + when(podsWithNamespace.withLabel(anyString(), anyString())).thenReturn(labeledPods) + when(podsWithNamespace.withLabelIn(anyString(), any())).thenReturn(labeledPods) + when(podsWithNamespace.withField(anyString(), anyString())).thenReturn(labeledPods) + when(labeledPods.withLabel(anyString(), anyString())).thenReturn(labeledPods) + when(labeledPods.withLabelIn(anyString(), any())).thenReturn(labeledPods) + when(labeledPods.withField(anyString(), anyString())).thenReturn(labeledPods) when(driverPodOperations.get).thenReturn(driverPod) when(driverPodOperations.waitUntilReady(any(), any())).thenReturn(driverPod) when(executorBuilder.buildFromFeatures(any(classOf[KubernetesExecutorConf]), meq(secMgr), @@ -119,7 +136,8 @@ class ExecutorPodsAllocatorSuite extends SparkFunSuite with BeforeAndAfter { when(schedulerBackend.getExecutorIds).thenReturn(Seq.empty) podsAllocatorUnderTest.start(TEST_SPARK_APP_ID, schedulerBackend) when(kubernetesClient.persistentVolumeClaims()).thenReturn(persistentVolumeClaims) - when(persistentVolumeClaims.withLabel(any(), any())).thenReturn(labeledPersistentVolumeClaims) + when(persistentVolumeClaims.inNamespace("default")).thenReturn(pvcWithNamespace) + when(pvcWithNamespace.withLabel(any(), any())).thenReturn(labeledPersistentVolumeClaims) when(labeledPersistentVolumeClaims.list()).thenReturn(persistentVolumeClaimList) when(persistentVolumeClaimList.getItems).thenReturn(Seq.empty[PersistentVolumeClaim].asJava) } @@ -170,9 +188,10 @@ class ExecutorPodsAllocatorSuite extends SparkFunSuite with BeforeAndAfter { podsAllocatorUnderTest.setTotalExpectedExecutors(Map(defaultProfile -> 2, rp -> 3)) assert(podsAllocatorUnderTest.numOutstandingPods.get() == 3) - verify(podOperations).create(podWithAttachedContainerForId(1, defaultProfile.id)) - verify(podOperations).create(podWithAttachedContainerForId(2, defaultProfile.id)) - verify(podOperations).create(podWithAttachedContainerForId(3, rp.id)) + verify(podsWithNamespace).resource(podWithAttachedContainerForId(1, defaultProfile.id)) + verify(podsWithNamespace).resource(podWithAttachedContainerForId(2, defaultProfile.id)) + verify(podsWithNamespace).resource(podWithAttachedContainerForId(3, rp.id)) + verify(podResource, times(3)).create() // Mark executor 2 and 3 as pending, leave 1 as newly created but this does not free up // any pending pod slot so no new pod is requested @@ -180,8 +199,8 @@ class ExecutorPodsAllocatorSuite extends SparkFunSuite with BeforeAndAfter { snapshotsStore.updatePod(pendingExecutor(3, rp.id)) snapshotsStore.notifySubscribers() assert(podsAllocatorUnderTest.numOutstandingPods.get() == 3) - verify(podOperations, times(3)).create(any()) - verify(podOperations, never()).delete() + verify(podResource, times(3)).create() + verify(labeledPods, never()).delete() // Downscaling for defaultProfile resource ID with 1 executor to make one free slot // for pendings pods @@ -189,16 +208,16 @@ class ExecutorPodsAllocatorSuite extends SparkFunSuite with BeforeAndAfter { podsAllocatorUnderTest.setTotalExpectedExecutors(Map(defaultProfile -> 1, rp -> 3)) snapshotsStore.notifySubscribers() assert(podsAllocatorUnderTest.numOutstandingPods.get() == 3) - verify(podOperations).create(podWithAttachedContainerForId(4, rp.id)) - verify(podOperations, times(1)).delete() + verify(labeledPods, times(1)).delete() + verify(podsWithNamespace).resource(podWithAttachedContainerForId(4, rp.id)) // Make one pod running this way we have one more free slot for pending pods snapshotsStore.updatePod(runningExecutor(3, rp.id)) snapshotsStore.updatePod(pendingExecutor(4, rp.id)) snapshotsStore.notifySubscribers() assert(podsAllocatorUnderTest.numOutstandingPods.get() == 3) - verify(podOperations).create(podWithAttachedContainerForId(5, rp.id)) - verify(podOperations, times(1)).delete() + verify(podsWithNamespace).resource(podWithAttachedContainerForId(5, rp.id)) + verify(labeledPods, times(1)).delete() } test("Initially request executors in batches. Do not request another batch if the" + @@ -206,9 +225,10 @@ class ExecutorPodsAllocatorSuite extends SparkFunSuite with BeforeAndAfter { podsAllocatorUnderTest.setTotalExpectedExecutors(Map(defaultProfile -> (podAllocationSize + 1))) assert(podsAllocatorUnderTest.numOutstandingPods.get() == 5) for (nextId <- 1 to podAllocationSize) { - verify(podOperations).create(podWithAttachedContainerForId(nextId)) + verify(podsWithNamespace).resource(podWithAttachedContainerForId(nextId)) } - verify(podOperations, never()).create(podWithAttachedContainerForId(podAllocationSize + 1)) + verify(podsWithNamespace, never()) + .resource(podWithAttachedContainerForId(podAllocationSize + 1)) } test("Request executors in batches. Allow another batch to be requested if" + @@ -225,15 +245,17 @@ class ExecutorPodsAllocatorSuite extends SparkFunSuite with BeforeAndAfter { assert(podsAllocatorUnderTest.invokePrivate(counter).get() === 5) snapshotsStore.notifySubscribers() assert(podsAllocatorUnderTest.numOutstandingPods.get() == 1) - verify(podOperations, never()).create(podWithAttachedContainerForId(podAllocationSize + 1)) + verify(podsWithNamespace, never()) + .resource(podWithAttachedContainerForId(podAllocationSize + 1)) + verify(podResource, times(podAllocationSize)).create() snapshotsStore.updatePod(runningExecutor(podAllocationSize)) snapshotsStore.notifySubscribers() assert(podsAllocatorUnderTest.numOutstandingPods.get() == 1) - verify(podOperations).create(podWithAttachedContainerForId(podAllocationSize + 1)) + verify(podsWithNamespace).resource(podWithAttachedContainerForId(podAllocationSize + 1)) snapshotsStore.updatePod(runningExecutor(podAllocationSize)) snapshotsStore.notifySubscribers() assert(podsAllocatorUnderTest.numOutstandingPods.get() == 1) - verify(podOperations, times(podAllocationSize + 1)).create(any(classOf[Pod])) + verify(podResource, times(podAllocationSize + 1)).create() } test("When a current batch reaches error states immediately, re-request" + @@ -248,14 +270,14 @@ class ExecutorPodsAllocatorSuite extends SparkFunSuite with BeforeAndAfter { snapshotsStore.updatePod(failedPod) snapshotsStore.notifySubscribers() assert(podsAllocatorUnderTest.numOutstandingPods.get() == 1) - verify(podOperations).create(podWithAttachedContainerForId(podAllocationSize + 1)) + verify(podsWithNamespace).resource(podWithAttachedContainerForId(podAllocationSize + 1)) } test("Verify stopping deletes the labeled pods") { - when(podOperations + when(podsWithNamespace .withLabel(SPARK_APP_ID_LABEL, TEST_SPARK_APP_ID)) - .thenReturn(podOperations) - when(podOperations + .thenReturn(labeledPods) + when(labeledPods .withLabel(SPARK_ROLE_LABEL, SPARK_POD_EXECUTOR_ROLE)) .thenReturn(labeledPods) podsAllocatorUnderTest.stop(TEST_SPARK_APP_ID) @@ -264,39 +286,39 @@ class ExecutorPodsAllocatorSuite extends SparkFunSuite with BeforeAndAfter { test("When an executor is requested but the API does not report it in a reasonable time, retry" + " requesting that executor.") { - when(podOperations + when(podsWithNamespace .withLabel(SPARK_APP_ID_LABEL, TEST_SPARK_APP_ID)) - .thenReturn(podOperations) - when(podOperations + .thenReturn(labeledPods) + when(labeledPods .withLabel(SPARK_ROLE_LABEL, SPARK_POD_EXECUTOR_ROLE)) - .thenReturn(podOperations) - when(podOperations + .thenReturn(labeledPods) + when(labeledPods .withLabelIn(SPARK_EXECUTOR_ID_LABEL, "1")) .thenReturn(labeledPods) podsAllocatorUnderTest.setTotalExpectedExecutors( Map(defaultProfile -> 1)) assert(podsAllocatorUnderTest.numOutstandingPods.get() == 1) - verify(podOperations).create(podWithAttachedContainerForId(1)) + verify(podsWithNamespace).resource(podWithAttachedContainerForId(1)) waitForExecutorPodsClock.setTime(podCreationTimeout + 1) snapshotsStore.notifySubscribers() assert(podsAllocatorUnderTest.numOutstandingPods.get() == 1) verify(labeledPods).delete() - verify(podOperations).create(podWithAttachedContainerForId(2)) + verify(podsWithNamespace).resource(podWithAttachedContainerForId(2)) } test("SPARK-28487: scale up and down on target executor count changes") { - when(podOperations + when(podsWithNamespace .withField("status.phase", "Pending")) - .thenReturn(podOperations) - when(podOperations + .thenReturn(labeledPods) + when(labeledPods .withLabel(SPARK_APP_ID_LABEL, TEST_SPARK_APP_ID)) - .thenReturn(podOperations) - when(podOperations + .thenReturn(labeledPods) + when(labeledPods .withLabel(SPARK_ROLE_LABEL, SPARK_POD_EXECUTOR_ROLE)) - .thenReturn(podOperations) - when(podOperations + .thenReturn(labeledPods) + when(labeledPods .withLabelIn(meq(SPARK_EXECUTOR_ID_LABEL), any())) - .thenReturn(podOperations) + .thenReturn(labeledPods) val startTime = Instant.now.toEpochMilli waitForExecutorPodsClock.setTime(startTime) @@ -305,31 +327,31 @@ class ExecutorPodsAllocatorSuite extends SparkFunSuite with BeforeAndAfter { podsAllocatorUnderTest.setTotalExpectedExecutors( Map(defaultProfile -> 1)) assert(podsAllocatorUnderTest.numOutstandingPods.get() == 1) - verify(podOperations).create(podWithAttachedContainerForId(1)) + verify(podsWithNamespace).resource(podWithAttachedContainerForId(1)) // Mark executor as running, verify that subsequent allocation cycle is a no-op. snapshotsStore.updatePod(runningExecutor(1)) snapshotsStore.notifySubscribers() assert(podsAllocatorUnderTest.numOutstandingPods.get() == 0) - verify(podOperations, times(1)).create(any()) - verify(podOperations, never()).delete() + verify(podResource, times(1)).create() + verify(labeledPods, never()).delete() // Request 3 more executors, make sure all are requested. podsAllocatorUnderTest.setTotalExpectedExecutors( Map(defaultProfile -> 4)) snapshotsStore.notifySubscribers() assert(podsAllocatorUnderTest.numOutstandingPods.get() == 3) - verify(podOperations).create(podWithAttachedContainerForId(2)) - verify(podOperations).create(podWithAttachedContainerForId(3)) - verify(podOperations).create(podWithAttachedContainerForId(4)) + verify(podsWithNamespace).resource(podWithAttachedContainerForId(2)) + verify(podsWithNamespace).resource(podWithAttachedContainerForId(3)) + verify(podsWithNamespace).resource(podWithAttachedContainerForId(4)) // Mark 2 as running, 3 as pending. Allocation cycle should do nothing. snapshotsStore.updatePod(runningExecutor(2)) snapshotsStore.updatePod(pendingExecutor(3)) snapshotsStore.notifySubscribers() assert(podsAllocatorUnderTest.numOutstandingPods.get() == 2) - verify(podOperations, times(4)).create(any()) - verify(podOperations, never()).delete() + verify(podResource, times(4)).create() + verify(labeledPods, never()).delete() // Scale down to 1. Pending executors (both acknowledged and not) should be deleted. waitForExecutorPodsClock.advance(executorIdleTimeout * 2) @@ -337,9 +359,9 @@ class ExecutorPodsAllocatorSuite extends SparkFunSuite with BeforeAndAfter { Map(defaultProfile -> 1)) snapshotsStore.notifySubscribers() assert(podsAllocatorUnderTest.numOutstandingPods.get() == 0) - verify(podOperations, times(4)).create(any()) - verify(podOperations).withLabelIn(SPARK_EXECUTOR_ID_LABEL, "3", "4") - verify(podOperations).delete() + verify(podResource, times(4)).create() + verify(labeledPods).withLabelIn(SPARK_EXECUTOR_ID_LABEL, "3", "4") + verify(labeledPods).delete() assert(podsAllocatorUnderTest.isDeleted("3")) assert(podsAllocatorUnderTest.isDeleted("4")) @@ -355,25 +377,25 @@ class ExecutorPodsAllocatorSuite extends SparkFunSuite with BeforeAndAfter { } test("SPARK-34334: correctly identify timed out pending pod requests as excess") { - when(podOperations + when(podsWithNamespace .withField("status.phase", "Pending")) - .thenReturn(podOperations) - when(podOperations + .thenReturn(labeledPods) + when(labeledPods .withLabel(SPARK_APP_ID_LABEL, TEST_SPARK_APP_ID)) - .thenReturn(podOperations) - when(podOperations + .thenReturn(labeledPods) + when(labeledPods .withLabel(SPARK_ROLE_LABEL, SPARK_POD_EXECUTOR_ROLE)) - .thenReturn(podOperations) - when(podOperations + .thenReturn(labeledPods) + when(labeledPods .withLabelIn(meq(SPARK_EXECUTOR_ID_LABEL), any())) - .thenReturn(podOperations) + .thenReturn(labeledPods) val startTime = Instant.now.toEpochMilli waitForExecutorPodsClock.setTime(startTime) podsAllocatorUnderTest.setTotalExpectedExecutors(Map(defaultProfile -> 1)) - verify(podOperations).create(podWithAttachedContainerForId(1)) - verify(podOperations).create(any()) + verify(podsWithNamespace).resource(podWithAttachedContainerForId(1)) + verify(podResource).create() snapshotsStore.updatePod(pendingExecutor(1)) snapshotsStore.notifySubscribers() @@ -382,48 +404,48 @@ class ExecutorPodsAllocatorSuite extends SparkFunSuite with BeforeAndAfter { podsAllocatorUnderTest.setTotalExpectedExecutors(Map(defaultProfile -> 2)) snapshotsStore.notifySubscribers() - verify(podOperations).create(podWithAttachedContainerForId(2)) + verify(podsWithNamespace).resource(podWithAttachedContainerForId(2)) podsAllocatorUnderTest.setTotalExpectedExecutors(Map(defaultProfile -> 1)) snapshotsStore.notifySubscribers() - verify(podOperations, never()).withLabelIn(SPARK_EXECUTOR_ID_LABEL, "1") - verify(podOperations, never()).delete() + verify(labeledPods, never()).withLabelIn(SPARK_EXECUTOR_ID_LABEL, "1") + verify(labeledPods, never()).delete() waitForExecutorPodsClock.advance(executorIdleTimeout) snapshotsStore.notifySubscribers() // before SPARK-34334 this verify() call failed as the non-timed out newly created request // decreased the number of requests taken from timed out pending pod requests - verify(podOperations).withLabelIn(SPARK_EXECUTOR_ID_LABEL, "1") - verify(podOperations).delete() + verify(labeledPods).withLabelIn(SPARK_EXECUTOR_ID_LABEL, "1") + verify(labeledPods).delete() } test("SPARK-33099: Respect executor idle timeout configuration") { - when(podOperations + when(podsWithNamespace .withField("status.phase", "Pending")) - .thenReturn(podOperations) - when(podOperations + .thenReturn(labeledPods) + when(labeledPods .withLabel(SPARK_APP_ID_LABEL, TEST_SPARK_APP_ID)) - .thenReturn(podOperations) - when(podOperations + .thenReturn(labeledPods) + when(labeledPods .withLabel(SPARK_ROLE_LABEL, SPARK_POD_EXECUTOR_ROLE)) - .thenReturn(podOperations) - when(podOperations + .thenReturn(labeledPods) + when(labeledPods .withLabelIn(meq(SPARK_EXECUTOR_ID_LABEL), any())) - .thenReturn(podOperations) + .thenReturn(labeledPods) val startTime = Instant.now.toEpochMilli waitForExecutorPodsClock.setTime(startTime) podsAllocatorUnderTest.setTotalExpectedExecutors(Map(defaultProfile -> 5)) assert(podsAllocatorUnderTest.numOutstandingPods.get() == 5) - verify(podOperations).create(podWithAttachedContainerForId(1)) - verify(podOperations).create(podWithAttachedContainerForId(2)) - verify(podOperations).create(podWithAttachedContainerForId(3)) - verify(podOperations).create(podWithAttachedContainerForId(4)) - verify(podOperations).create(podWithAttachedContainerForId(5)) - verify(podOperations, times(5)).create(any()) + verify(podsWithNamespace).resource(podWithAttachedContainerForId(1)) + verify(podsWithNamespace).resource(podWithAttachedContainerForId(2)) + verify(podsWithNamespace).resource(podWithAttachedContainerForId(3)) + verify(podsWithNamespace).resource(podWithAttachedContainerForId(4)) + verify(podsWithNamespace).resource(podWithAttachedContainerForId(5)) + verify(podResource, times(5)).create() snapshotsStore.updatePod(pendingExecutor(1)) snapshotsStore.updatePod(pendingExecutor(2)) @@ -433,7 +455,7 @@ class ExecutorPodsAllocatorSuite extends SparkFunSuite with BeforeAndAfter { snapshotsStore.notifySubscribers() assert(podsAllocatorUnderTest.numOutstandingPods.get() == 5) verify(podOperations, never()).withLabelIn(SPARK_EXECUTOR_ID_LABEL, "1", "2", "3", "4", "5") - verify(podOperations, never()).delete() + verify(podResource, never()).delete() // Newly created executors (both acknowledged and not) are cleaned up. waitForExecutorPodsClock.advance(executorIdleTimeout * 2) @@ -444,8 +466,8 @@ class ExecutorPodsAllocatorSuite extends SparkFunSuite with BeforeAndAfter { // though executor 1 is still in pending state and executor 3 and 4 are new request without // any state reported by kubernetes and all the three are already timed out assert(podsAllocatorUnderTest.numOutstandingPods.get() == 0) - verify(podOperations).withLabelIn(SPARK_EXECUTOR_ID_LABEL, "2", "5") - verify(podOperations).delete() + verify(labeledPods).withLabelIn(SPARK_EXECUTOR_ID_LABEL, "2", "5") + verify(labeledPods).delete() } /** @@ -483,18 +505,18 @@ class ExecutorPodsAllocatorSuite extends SparkFunSuite with BeforeAndAfter { * PODs: 8 and 9 */ test("SPARK-34361: scheduler backend known pods with multiple resource profiles at downscaling") { - when(podOperations + when(podsWithNamespace .withField("status.phase", "Pending")) - .thenReturn(podOperations) - when(podOperations + .thenReturn(labeledPods) + when(labeledPods .withLabel(SPARK_APP_ID_LABEL, TEST_SPARK_APP_ID)) - .thenReturn(podOperations) - when(podOperations + .thenReturn(labeledPods) + when(labeledPods .withLabel(SPARK_ROLE_LABEL, SPARK_POD_EXECUTOR_ROLE)) - .thenReturn(podOperations) - when(podOperations + .thenReturn(labeledPods) + when(labeledPods .withLabelIn(meq(SPARK_EXECUTOR_ID_LABEL), any())) - .thenReturn(podOperations) + .thenReturn(labeledPods) val startTime = Instant.now.toEpochMilli waitForExecutorPodsClock.setTime(startTime) @@ -510,20 +532,20 @@ class ExecutorPodsAllocatorSuite extends SparkFunSuite with BeforeAndAfter { // 0) request 3 PODs for the default and 4 PODs for the other resource profile podsAllocatorUnderTest.setTotalExpectedExecutors(Map(defaultProfile -> 3, rp -> 4)) assert(podsAllocatorUnderTest.numOutstandingPods.get() == 7) - verify(podOperations).create(podWithAttachedContainerForId(1, defaultProfile.id)) - verify(podOperations).create(podWithAttachedContainerForId(2, defaultProfile.id)) - verify(podOperations).create(podWithAttachedContainerForId(3, defaultProfile.id)) - verify(podOperations).create(podWithAttachedContainerForId(4, rp.id)) - verify(podOperations).create(podWithAttachedContainerForId(5, rp.id)) - verify(podOperations).create(podWithAttachedContainerForId(6, rp.id)) - verify(podOperations).create(podWithAttachedContainerForId(7, rp.id)) + verify(podsWithNamespace).resource(podWithAttachedContainerForId(1, defaultProfile.id)) + verify(podsWithNamespace).resource(podWithAttachedContainerForId(2, defaultProfile.id)) + verify(podsWithNamespace).resource(podWithAttachedContainerForId(3, defaultProfile.id)) + verify(podsWithNamespace).resource(podWithAttachedContainerForId(4, rp.id)) + verify(podsWithNamespace).resource(podWithAttachedContainerForId(5, rp.id)) + verify(podsWithNamespace).resource(podWithAttachedContainerForId(6, rp.id)) + verify(podsWithNamespace).resource(podWithAttachedContainerForId(7, rp.id)) // 1) make 1 POD known by the scheduler backend for each resource profile when(schedulerBackend.getExecutorIds).thenReturn(Seq("1", "4")) snapshotsStore.notifySubscribers() assert(podsAllocatorUnderTest.numOutstandingPods.get() == 5, "scheduler backend known PODs are not outstanding") - verify(podOperations, times(7)).create(any()) + verify(podResource, times(7)).create() // 2) make 1 extra POD known by the scheduler backend for each resource profile // and make some to pending @@ -534,15 +556,15 @@ class ExecutorPodsAllocatorSuite extends SparkFunSuite with BeforeAndAfter { snapshotsStore.updatePod(pendingExecutor(6, rp.id)) snapshotsStore.notifySubscribers() assert(podsAllocatorUnderTest.numOutstandingPods.get() == 3) - verify(podOperations, times(7)).create(any()) + verify(podResource, times(7)).create() // 3) downscale to 1 POD for default and 1 POD for the other resource profile waitForExecutorPodsClock.advance(executorIdleTimeout * 2) podsAllocatorUnderTest.setTotalExpectedExecutors(Map(defaultProfile -> 1, rp -> 1)) snapshotsStore.notifySubscribers() assert(podsAllocatorUnderTest.numOutstandingPods.get() == 0) - verify(podOperations, times(7)).create(any()) - verify(podOperations, times(2)).delete() + verify(podResource, times(7)).create() + verify(labeledPods, times(2)).delete() assert(podsAllocatorUnderTest.isDeleted("3")) assert(podsAllocatorUnderTest.isDeleted("6")) assert(podsAllocatorUnderTest.isDeleted("7")) @@ -551,32 +573,32 @@ class ExecutorPodsAllocatorSuite extends SparkFunSuite with BeforeAndAfter { // 2 PODs known by the scheduler backend there must be no new POD requested to be created podsAllocatorUnderTest.setTotalExpectedExecutors(Map(defaultProfile -> 2, rp -> 2)) snapshotsStore.notifySubscribers() - verify(podOperations, times(7)).create(any()) + verify(podResource, times(7)).create() assert(podsAllocatorUnderTest.numOutstandingPods.get() == 0) - verify(podOperations, times(7)).create(any()) + verify(podResource, times(7)).create() // 5) requesting 1 more executor for each resource podsAllocatorUnderTest.setTotalExpectedExecutors(Map(defaultProfile -> 3, rp -> 3)) snapshotsStore.notifySubscribers() assert(podsAllocatorUnderTest.numOutstandingPods.get() == 2) - verify(podOperations, times(9)).create(any()) - verify(podOperations).create(podWithAttachedContainerForId(8, defaultProfile.id)) - verify(podOperations).create(podWithAttachedContainerForId(9, rp.id)) + verify(podResource, times(9)).create() + verify(podsWithNamespace).resource(podWithAttachedContainerForId(8, defaultProfile.id)) + verify(podsWithNamespace).resource(podWithAttachedContainerForId(9, rp.id)) } test("SPARK-33288: multiple resource profiles") { - when(podOperations + when(podsWithNamespace .withField("status.phase", "Pending")) - .thenReturn(podOperations) - when(podOperations + .thenReturn(labeledPods) + when(labeledPods .withLabel(SPARK_APP_ID_LABEL, TEST_SPARK_APP_ID)) - .thenReturn(podOperations) - when(podOperations + .thenReturn(labeledPods) + when(labeledPods .withLabel(SPARK_ROLE_LABEL, SPARK_POD_EXECUTOR_ROLE)) - .thenReturn(podOperations) - when(podOperations + .thenReturn(labeledPods) + when(labeledPods .withLabelIn(meq(SPARK_EXECUTOR_ID_LABEL), any())) - .thenReturn(podOperations) + .thenReturn(labeledPods) val startTime = Instant.now.toEpochMilli waitForExecutorPodsClock.setTime(startTime) @@ -593,9 +615,9 @@ class ExecutorPodsAllocatorSuite extends SparkFunSuite with BeforeAndAfter { // make sure it's requested, even with an empty initial snapshot. podsAllocatorUnderTest.setTotalExpectedExecutors(Map(defaultProfile -> 1, rp -> 2)) assert(podsAllocatorUnderTest.numOutstandingPods.get() == 3) - verify(podOperations).create(podWithAttachedContainerForId(1, defaultProfile.id)) - verify(podOperations).create(podWithAttachedContainerForId(2, rp.id)) - verify(podOperations).create(podWithAttachedContainerForId(3, rp.id)) + verify(podsWithNamespace).resource(podWithAttachedContainerForId(1, defaultProfile.id)) + verify(podsWithNamespace).resource(podWithAttachedContainerForId(2, rp.id)) + verify(podsWithNamespace).resource(podWithAttachedContainerForId(3, rp.id)) // Mark executor as running, verify that subsequent allocation cycle is a no-op. snapshotsStore.updatePod(runningExecutor(1, defaultProfile.id)) @@ -603,18 +625,18 @@ class ExecutorPodsAllocatorSuite extends SparkFunSuite with BeforeAndAfter { snapshotsStore.updatePod(runningExecutor(3, rp.id)) snapshotsStore.notifySubscribers() assert(podsAllocatorUnderTest.numOutstandingPods.get() == 0) - verify(podOperations, times(3)).create(any()) - verify(podOperations, never()).delete() + verify(podResource, times(3)).create() + verify(podResource, never()).delete() // Request 3 more executors for default profile and 1 more for other profile, // make sure all are requested. podsAllocatorUnderTest.setTotalExpectedExecutors(Map(defaultProfile -> 4, rp -> 3)) snapshotsStore.notifySubscribers() assert(podsAllocatorUnderTest.numOutstandingPods.get() == 4) - verify(podOperations).create(podWithAttachedContainerForId(4, defaultProfile.id)) - verify(podOperations).create(podWithAttachedContainerForId(5, defaultProfile.id)) - verify(podOperations).create(podWithAttachedContainerForId(6, defaultProfile.id)) - verify(podOperations).create(podWithAttachedContainerForId(7, rp.id)) + verify(podsWithNamespace).resource(podWithAttachedContainerForId(4, defaultProfile.id)) + verify(podsWithNamespace).resource(podWithAttachedContainerForId(5, defaultProfile.id)) + verify(podsWithNamespace).resource(podWithAttachedContainerForId(6, defaultProfile.id)) + verify(podsWithNamespace).resource(podWithAttachedContainerForId(7, rp.id)) // Mark 4 as running, 5 and 7 as pending. Allocation cycle should do nothing. snapshotsStore.updatePod(runningExecutor(4, defaultProfile.id)) @@ -622,8 +644,8 @@ class ExecutorPodsAllocatorSuite extends SparkFunSuite with BeforeAndAfter { snapshotsStore.updatePod(pendingExecutor(7, rp.id)) snapshotsStore.notifySubscribers() assert(podsAllocatorUnderTest.numOutstandingPods.get() == 3) - verify(podOperations, times(7)).create(any()) - verify(podOperations, never()).delete() + verify(podResource, times(7)).create() + verify(podResource, never()).delete() // Scale down to 1 for both resource profiles. Pending executors // (both acknowledged and not) should be deleted. @@ -631,10 +653,10 @@ class ExecutorPodsAllocatorSuite extends SparkFunSuite with BeforeAndAfter { podsAllocatorUnderTest.setTotalExpectedExecutors(Map(defaultProfile -> 1, rp -> 1)) snapshotsStore.notifySubscribers() assert(podsAllocatorUnderTest.numOutstandingPods.get() == 0) - verify(podOperations, times(7)).create(any()) - verify(podOperations).withLabelIn(SPARK_EXECUTOR_ID_LABEL, "5", "6") - verify(podOperations).withLabelIn(SPARK_EXECUTOR_ID_LABEL, "7") - verify(podOperations, times(2)).delete() + verify(podResource, times(7)).create() + verify(labeledPods).withLabelIn(SPARK_EXECUTOR_ID_LABEL, "5", "6") + verify(labeledPods).withLabelIn(SPARK_EXECUTOR_ID_LABEL, "7") + verify(labeledPods, times(2)).delete() assert(podsAllocatorUnderTest.isDeleted("5")) assert(podsAllocatorUnderTest.isDeleted("6")) assert(podsAllocatorUnderTest.isDeleted("7")) @@ -653,27 +675,27 @@ class ExecutorPodsAllocatorSuite extends SparkFunSuite with BeforeAndAfter { } test("SPARK-33262: pod allocator does not stall with pending pods") { - when(podOperations + when(podsWithNamespace .withLabel(SPARK_APP_ID_LABEL, TEST_SPARK_APP_ID)) - .thenReturn(podOperations) - when(podOperations + .thenReturn(labeledPods) + when(labeledPods .withLabel(SPARK_ROLE_LABEL, SPARK_POD_EXECUTOR_ROLE)) - .thenReturn(podOperations) - when(podOperations + .thenReturn(labeledPods) + when(labeledPods .withLabelIn(SPARK_EXECUTOR_ID_LABEL, "1")) .thenReturn(labeledPods) - when(podOperations + when(labeledPods .withLabelIn(SPARK_EXECUTOR_ID_LABEL, "2", "3", "4", "5", "6")) - .thenReturn(podOperations) + .thenReturn(labeledPods) podsAllocatorUnderTest.setTotalExpectedExecutors(Map(defaultProfile -> 6)) assert(podsAllocatorUnderTest.numOutstandingPods.get() == 5) // Initial request of pods - verify(podOperations).create(podWithAttachedContainerForId(1)) - verify(podOperations).create(podWithAttachedContainerForId(2)) - verify(podOperations).create(podWithAttachedContainerForId(3)) - verify(podOperations).create(podWithAttachedContainerForId(4)) - verify(podOperations).create(podWithAttachedContainerForId(5)) + verify(podsWithNamespace).resource(podWithAttachedContainerForId(1)) + verify(podsWithNamespace).resource(podWithAttachedContainerForId(2)) + verify(podsWithNamespace).resource(podWithAttachedContainerForId(3)) + verify(podsWithNamespace).resource(podWithAttachedContainerForId(4)) + verify(podsWithNamespace).resource(podWithAttachedContainerForId(5)) // 4 come up, 1 pending snapshotsStore.updatePod(pendingExecutor(1)) snapshotsStore.updatePod(runningExecutor(2)) @@ -685,7 +707,7 @@ class ExecutorPodsAllocatorSuite extends SparkFunSuite with BeforeAndAfter { snapshotsStore.notifySubscribers() assert(podsAllocatorUnderTest.numOutstandingPods.get() == 2) // We request pod 6 - verify(podOperations).create(podWithAttachedContainerForId(6)) + verify(podsWithNamespace).resource(podWithAttachedContainerForId(6)) } test("SPARK-35416: Support PersistentVolumeClaim Reuse") { @@ -715,18 +737,18 @@ class ExecutorPodsAllocatorSuite extends SparkFunSuite with BeforeAndAfter { kubernetesClient, snapshotsStore, waitForExecutorPodsClock) podsAllocatorUnderTest.start(TEST_SPARK_APP_ID, schedulerBackend) - when(podOperations + when(podsWithNamespace .withField("status.phase", "Pending")) - .thenReturn(podOperations) - when(podOperations + .thenReturn(labeledPods) + when(labeledPods .withLabel(SPARK_APP_ID_LABEL, TEST_SPARK_APP_ID)) - .thenReturn(podOperations) - when(podOperations + .thenReturn(labeledPods) + when(labeledPods .withLabel(SPARK_ROLE_LABEL, SPARK_POD_EXECUTOR_ROLE)) - .thenReturn(podOperations) - when(podOperations + .thenReturn(labeledPods) + when(labeledPods .withLabelIn(meq(SPARK_EXECUTOR_ID_LABEL), any())) - .thenReturn(podOperations) + .thenReturn(labeledPods) val startTime = Instant.now.toEpochMilli waitForExecutorPodsClock.setTime(startTime) @@ -734,28 +756,27 @@ class ExecutorPodsAllocatorSuite extends SparkFunSuite with BeforeAndAfter { // Target 1 executor, make sure it's requested, even with an empty initial snapshot. podsAllocatorUnderTest.setTotalExpectedExecutors(Map(defaultProfile -> 1)) assert(podsAllocatorUnderTest.numOutstandingPods.get() == 1) - verify(podOperations).create(podWithAttachedContainerForIdAndVolume(1)) + verify(podsWithNamespace).resource(podWithAttachedContainerForIdAndVolume(1)) // Mark executor as running, verify that subsequent allocation cycle is a no-op. snapshotsStore.updatePod(runningExecutor(1)) snapshotsStore.notifySubscribers() assert(podsAllocatorUnderTest.numOutstandingPods.get() == 0) - verify(podOperations, times(1)).create(any()) - verify(podOperations, never()).delete() + verify(podResource, times(1)).create() + verify(podResource, never()).delete() // Request a new executor, make sure it's using reused PVC podsAllocatorUnderTest.setTotalExpectedExecutors(Map(defaultProfile -> 2)) snapshotsStore.notifySubscribers() assert(podsAllocatorUnderTest.numOutstandingPods.get() == 1) - verify(podOperations).create(podWithAttachedContainerForIdAndVolume(2)) - verify(persistentVolumeClaims, never()).create(any()) + verify(podsWithNamespace).resource(podWithAttachedContainerForIdAndVolume(2)) + verify(pvcWithNamespace, never()).resource(any()) } test("print the pod name instead of Some(name) if pod is absent") { val nonexistentPod = "i-do-not-exist" val conf = new SparkConf().set(KUBERNETES_DRIVER_POD_NAME, nonexistentPod) - when(kubernetesClient.pods()).thenReturn(podOperations) - when(podOperations.withName(nonexistentPod)).thenReturn(driverPodOperations) + when(podsWithNamespace.withName(nonexistentPod)).thenReturn(driverPodOperations) when(driverPodOperations.get()).thenReturn(null) val e = intercept[SparkException](new ExecutorPodsAllocator( conf, secMgr, executorBuilder, kubernetesClient, snapshotsStore, waitForExecutorPodsClock)) diff --git a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsLifecycleManagerSuite.scala b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsLifecycleManagerSuite.scala index e3ec53adef6ab..92d692c829ae1 100644 --- a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsLifecycleManagerSuite.scala +++ b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsLifecycleManagerSuite.scala @@ -16,11 +16,14 @@ */ package org.apache.spark.scheduler.cluster.k8s +import java.util.function.UnaryOperator + import io.fabric8.kubernetes.api.model.Pod import io.fabric8.kubernetes.client.KubernetesClient import io.fabric8.kubernetes.client.dsl.PodResource -import org.mockito.{ArgumentCaptor, Mock, MockitoAnnotations} +import org.mockito.{Mock, MockitoAnnotations} import org.mockito.ArgumentMatchers.any +import org.mockito.ArgumentMatchers.anyString import org.mockito.Mockito.{mock, never, times, verify, when} import org.mockito.invocation.InvocationOnMock import org.mockito.stubbing.Answer @@ -37,7 +40,7 @@ import org.apache.spark.scheduler.cluster.k8s.ExecutorLifecycleTestUtils._ class ExecutorPodsLifecycleManagerSuite extends SparkFunSuite with BeforeAndAfter { - private var namedExecutorPods: mutable.Map[String, PodResource[Pod]] = _ + private var namedExecutorPods: mutable.Map[String, PodResource] = _ @Mock private var kubernetesClient: KubernetesClient = _ @@ -45,6 +48,9 @@ class ExecutorPodsLifecycleManagerSuite extends SparkFunSuite with BeforeAndAfte @Mock private var podOperations: PODS = _ + @Mock + private var podsWithNamespace: PODS_WITH_NAMESPACE = _ + @Mock private var schedulerBackend: KubernetesClusterSchedulerBackend = _ @@ -54,10 +60,11 @@ class ExecutorPodsLifecycleManagerSuite extends SparkFunSuite with BeforeAndAfte before { MockitoAnnotations.openMocks(this).close() snapshotsStore = new DeterministicExecutorPodsSnapshotsStore() - namedExecutorPods = mutable.Map.empty[String, PodResource[Pod]] + namedExecutorPods = mutable.Map.empty[String, PodResource] when(schedulerBackend.getExecutorsWithRegistrationTs()).thenReturn(Map.empty[String, Long]) when(kubernetesClient.pods()).thenReturn(podOperations) - when(podOperations.withName(any(classOf[String]))).thenAnswer(namedPodsAnswer()) + when(podOperations.inNamespace(anyString())).thenReturn(podsWithNamespace) + when(podsWithNamespace.withName(any(classOf[String]))).thenAnswer(namedPodsAnswer()) eventHandlerUnderTest = new ExecutorPodsLifecycleManager( new SparkConf(), kubernetesClient, @@ -109,6 +116,12 @@ class ExecutorPodsLifecycleManagerSuite extends SparkFunSuite with BeforeAndAfte verify(schedulerBackend).doRemoveExecutor("1", expectedLossReason) } + test("SPARK-40458: test executor inactivation function") { + val failedPod = failedExecutorWithoutDeletion(1) + val inactivated = ExecutorPodsLifecycleManager.executorInactivationFn(failedPod) + assert(inactivated.getMetadata().getLabels().get(SPARK_EXECUTOR_INACTIVE_LABEL) === "true") + } + test("Keep executor pods in k8s if configured.") { val failedPod = failedExecutorWithoutDeletion(1) eventHandlerUnderTest.conf.set(Config.KUBERNETES_DELETE_EXECUTORS, false) @@ -118,12 +131,8 @@ class ExecutorPodsLifecycleManagerSuite extends SparkFunSuite with BeforeAndAfte val expectedLossReason = ExecutorExited(1, exitCausedByApp = true, msg) verify(schedulerBackend).doRemoveExecutor("1", expectedLossReason) verify(namedExecutorPods(failedPod.getMetadata.getName), never()).delete() - - val podCaptor = ArgumentCaptor.forClass(classOf[Pod]) - verify(namedExecutorPods(failedPod.getMetadata.getName)).patch(podCaptor.capture()) - - val pod = podCaptor.getValue() - assert(pod.getMetadata().getLabels().get(SPARK_EXECUTOR_INACTIVE_LABEL) === "true") + verify(namedExecutorPods(failedPod.getMetadata.getName)) + .edit(any[UnaryOperator[Pod]]()) } private def exitReasonMessage(execId: Int, failedPod: Pod, exitCode: Int): String = { @@ -146,10 +155,10 @@ class ExecutorPodsLifecycleManagerSuite extends SparkFunSuite with BeforeAndAfte """.stripMargin } - private def namedPodsAnswer(): Answer[PodResource[Pod]] = + private def namedPodsAnswer(): Answer[PodResource] = (invocation: InvocationOnMock) => { val podName: String = invocation.getArgument(0) namedExecutorPods.getOrElseUpdate( - podName, mock(classOf[PodResource[Pod]])) + podName, mock(classOf[PodResource])) } } diff --git a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsWatchSnapshotSourceSuite.scala b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsWatchSnapshotSourceSuite.scala index 8209bee7a02b7..61080268cde60 100644 --- a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsWatchSnapshotSourceSuite.scala +++ b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsWatchSnapshotSourceSuite.scala @@ -41,6 +41,9 @@ class ExecutorPodsWatchSnapshotSourceSuite extends SparkFunSuite with BeforeAndA @Mock private var podOperations: PODS = _ + @Mock + private var podsWithNamespace: PODS_WITH_NAMESPACE = _ + @Mock private var appIdLabeledPods: LABELED_PODS = _ @@ -58,7 +61,8 @@ class ExecutorPodsWatchSnapshotSourceSuite extends SparkFunSuite with BeforeAndA MockitoAnnotations.openMocks(this).close() watch = ArgumentCaptor.forClass(classOf[Watcher[Pod]]) when(kubernetesClient.pods()).thenReturn(podOperations) - when(podOperations.withLabel(SPARK_APP_ID_LABEL, TEST_SPARK_APP_ID)) + when(podOperations.inNamespace("default")).thenReturn(podsWithNamespace) + when(podsWithNamespace.withLabel(SPARK_APP_ID_LABEL, TEST_SPARK_APP_ID)) .thenReturn(appIdLabeledPods) when(appIdLabeledPods.withLabel(SPARK_ROLE_LABEL, SPARK_POD_EXECUTOR_ROLE)) .thenReturn(executorRoleLabeledPods) diff --git a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesClusterSchedulerBackendSuite.scala b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesClusterSchedulerBackendSuite.scala index c3af83118f18e..bb5e93c92acf0 100644 --- a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesClusterSchedulerBackendSuite.scala +++ b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesClusterSchedulerBackendSuite.scala @@ -19,9 +19,9 @@ package org.apache.spark.scheduler.cluster.k8s import java.util.Arrays import java.util.concurrent.TimeUnit -import io.fabric8.kubernetes.api.model.{ObjectMeta, Pod, PodList} +import io.fabric8.kubernetes.api.model.{ConfigMap, Pod, PodList} import io.fabric8.kubernetes.client.KubernetesClient -import io.fabric8.kubernetes.client.dsl.{NonNamespaceOperation, PodResource} +import io.fabric8.kubernetes.client.dsl.PodResource import org.jmock.lib.concurrent.DeterministicScheduler import org.mockito.{ArgumentCaptor, Mock, MockitoAnnotations} import org.mockito.ArgumentMatchers.{any, eq => mockitoEq} @@ -66,12 +66,21 @@ class KubernetesClusterSchedulerBackendSuite extends SparkFunSuite with BeforeAn @Mock private var podOperations: PODS = _ + @Mock + private var podsWithNamespace: PODS_WITH_NAMESPACE = _ + @Mock private var labeledPods: LABELED_PODS = _ @Mock private var configMapsOperations: CONFIG_MAPS = _ + @Mock + private var configMapsWithNamespace: CONFIG_MAPS_WITH_NAMESPACE = _ + + @Mock + private var configMapResource: CONFIG_MAPS_RESOURCE = _ + @Mock private var labeledConfigMaps: LABELED_CONFIG_MAPS = _ @@ -117,7 +126,10 @@ class KubernetesClusterSchedulerBackendSuite extends SparkFunSuite with BeforeAn driverEndpoint.capture())) .thenReturn(driverEndpointRef) when(kubernetesClient.pods()).thenReturn(podOperations) + when(podOperations.inNamespace("default")).thenReturn(podsWithNamespace) when(kubernetesClient.configMaps()).thenReturn(configMapsOperations) + when(configMapsOperations.inNamespace("default")).thenReturn(configMapsWithNamespace) + when(configMapsWithNamespace.resource(any[ConfigMap]())).thenReturn(configMapResource) when(podAllocator.driverPod).thenReturn(None) schedulerBackendUnderTest = new KubernetesClusterSchedulerBackend( taskScheduler, @@ -142,13 +154,13 @@ class KubernetesClusterSchedulerBackendSuite extends SparkFunSuite with BeforeAn verify(lifecycleEventHandler).start(schedulerBackendUnderTest) verify(watchEvents).start(TEST_SPARK_APP_ID) verify(pollEvents).start(TEST_SPARK_APP_ID) - verify(configMapsOperations).create(any()) + verify(configMapResource).create() } test("Stop all components") { - when(podOperations.withLabel(SPARK_APP_ID_LABEL, TEST_SPARK_APP_ID)).thenReturn(labeledPods) + when(podsWithNamespace.withLabel(SPARK_APP_ID_LABEL, TEST_SPARK_APP_ID)).thenReturn(labeledPods) when(labeledPods.withLabel(SPARK_ROLE_LABEL, SPARK_POD_EXECUTOR_ROLE)).thenReturn(labeledPods) - when(configMapsOperations.withLabel(SPARK_APP_ID_LABEL, TEST_SPARK_APP_ID)) + when(configMapsWithNamespace.withLabel(SPARK_APP_ID_LABEL, TEST_SPARK_APP_ID)) .thenReturn(labeledConfigMaps) when(labeledConfigMaps.withLabel(SPARK_ROLE_LABEL, SPARK_POD_EXECUTOR_ROLE)) .thenReturn(labeledConfigMaps) @@ -177,36 +189,14 @@ class KubernetesClusterSchedulerBackendSuite extends SparkFunSuite with BeforeAn test("Kill executors") { schedulerBackendUnderTest.start() - val operation = mock(classOf[NonNamespaceOperation[ - Pod, PodList, PodResource[Pod]]]) - - when(podOperations.inNamespace(any())).thenReturn(operation) - when(podOperations.withField(any(), any())).thenReturn(labeledPods) - when(podOperations.withLabel(SPARK_APP_ID_LABEL, TEST_SPARK_APP_ID)).thenReturn(labeledPods) + when(podsWithNamespace.withField(any(), any())).thenReturn(labeledPods) + when(podsWithNamespace.withLabel(SPARK_APP_ID_LABEL, TEST_SPARK_APP_ID)).thenReturn(labeledPods) when(labeledPods.withLabel(SPARK_APP_ID_LABEL, TEST_SPARK_APP_ID)).thenReturn(labeledPods) when(labeledPods.withLabel(SPARK_ROLE_LABEL, SPARK_POD_EXECUTOR_ROLE)).thenReturn(labeledPods) when(labeledPods.withLabelIn(SPARK_EXECUTOR_ID_LABEL, "1", "2")).thenReturn(labeledPods) - - val pod1 = mock(classOf[Pod]) - val pod1Metadata = mock(classOf[ObjectMeta]) - when(pod1Metadata.getNamespace).thenReturn("coffeeIsLife") - when(pod1Metadata.getName).thenReturn("pod1") - when(pod1.getMetadata).thenReturn(pod1Metadata) - - val pod2 = mock(classOf[Pod]) - val pod2Metadata = mock(classOf[ObjectMeta]) - when(pod2Metadata.getNamespace).thenReturn("coffeeIsLife") - when(pod2Metadata.getName).thenReturn("pod2") - when(pod2.getMetadata).thenReturn(pod2Metadata) - - val pod1op = mock(classOf[PodResource[Pod]]) - val pod2op = mock(classOf[PodResource[Pod]]) - when(operation.withName("pod1")).thenReturn(pod1op) - when(operation.withName("pod2")).thenReturn(pod2op) - - val podList = mock(classOf[PodList]) - when(labeledPods.list()).thenReturn(podList) - when(podList.getItems()).thenReturn(Arrays.asList[Pod]()) + val pod1op = mock(classOf[PodResource]) + val pod2op = mock(classOf[PodResource]) + when(labeledPods.resources()).thenReturn(Arrays.asList[PodResource]().stream) schedulerExecutorService.tick(sparkConf.get(KUBERNETES_DYN_ALLOC_KILL_GRACE_PERIOD) * 2, TimeUnit.MILLISECONDS) verify(labeledPods, never()).delete() @@ -227,7 +217,13 @@ class KubernetesClusterSchedulerBackendSuite extends SparkFunSuite with BeforeAn verify(pod2op, never()).edit(any( classOf[java.util.function.UnaryOperator[io.fabric8.kubernetes.api.model.Pod]])) - when(podList.getItems()).thenReturn(Arrays.asList(pod1)) + when(labeledPods.resources()).thenReturn(Arrays.asList(pod1op).stream) + val podList = mock(classOf[PodList]) + when(labeledPods.list()).thenReturn(podList) + val pod1 = mock(classOf[Pod]) + val pod2 = mock(classOf[Pod]) + when(podList.getItems).thenReturn(Arrays.asList(pod1, pod2)) + schedulerBackendUnderTest.doKillExecutors(Seq("1", "2")) verify(labeledPods, never()).delete() schedulerExecutorService.runUntilIdle() diff --git a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/StatefulSetAllocatorSuite.scala b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/StatefulSetAllocatorSuite.scala index 748f509e01303..f74d2c9feee04 100644 --- a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/StatefulSetAllocatorSuite.scala +++ b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/StatefulSetAllocatorSuite.scala @@ -67,18 +67,25 @@ class StatefulSetAllocatorSuite extends SparkFunSuite with BeforeAndAfter { private var appOperations: AppsAPIGroupDSL = _ @Mock - private var statefulSetOperations: MixedOperation[ - apps.StatefulSet, apps.StatefulSetList, RollableScalableResource[apps.StatefulSet]] = _ + private var statefulSetOperations: STATEFUL_SETS = _ @Mock - private var editableSet: RollableScalableResource[apps.StatefulSet] = _ + private var statefulSetNamespaced: STATEFUL_SETS_NAMESPACED = _ + + @Mock + private var editableSet: STATEFUL_SET_RES = _ @Mock private var podOperations: PODS = _ + @Mock + private var podsWithNamespace: PODS_WITH_NAMESPACE = _ + + @Mock + private var podResource: PodResource = _ @Mock - private var driverPodOperations: PodResource[Pod] = _ + private var driverPodOperations: PodResource = _ private var podsAllocatorUnderTest: StatefulSetPodsAllocator = _ @@ -102,10 +109,14 @@ class StatefulSetAllocatorSuite extends SparkFunSuite with BeforeAndAfter { before { MockitoAnnotations.openMocks(this).close() when(kubernetesClient.pods()).thenReturn(podOperations) + when(podOperations.inNamespace("default")).thenReturn(podsWithNamespace) when(kubernetesClient.apps()).thenReturn(appOperations) when(appOperations.statefulSets()).thenReturn(statefulSetOperations) - when(statefulSetOperations.withName(any())).thenReturn(editableSet) - when(podOperations.withName(driverPodName)).thenReturn(driverPodOperations) + when(statefulSetOperations.inNamespace("default")).thenReturn(statefulSetNamespaced) + when(statefulSetNamespaced.resource(any())).thenReturn(editableSet) + when(statefulSetNamespaced.withName(any())).thenReturn(editableSet) + when(podsWithNamespace.withName(driverPodName)).thenReturn(driverPodOperations) + when(podsWithNamespace.resource(any())).thenReturn(podResource) when(driverPodOperations.get).thenReturn(driverPod) when(driverPodOperations.waitUntilReady(any(), any())).thenReturn(driverPod) when(executorBuilder.buildFromFeatures(any(classOf[KubernetesExecutorConf]), meq(secMgr), @@ -128,7 +139,8 @@ class StatefulSetAllocatorSuite extends SparkFunSuite with BeforeAndAfter { Map(defaultProfile -> (10), immrprof -> (420))) val captor = ArgumentCaptor.forClass(classOf[StatefulSet]) - verify(statefulSetOperations, times(2)).create(any()) + verify(statefulSetNamespaced, times(2)).resource(any()) + verify(editableSet, times(2)).create() podsAllocatorUnderTest.stop(appId) verify(editableSet, times(2)).delete() } @@ -137,7 +149,8 @@ class StatefulSetAllocatorSuite extends SparkFunSuite with BeforeAndAfter { podsAllocatorUnderTest.setTotalExpectedExecutors( Map(defaultProfile -> (10))) val captor = ArgumentCaptor.forClass(classOf[StatefulSet]) - verify(statefulSetOperations, times(1)).create(captor.capture()) + verify(statefulSetNamespaced, times(1)).resource(captor.capture()) + verify(editableSet, times(1)).create() val set = captor.getValue() val setName = set.getMetadata().getName() val namespace = set.getMetadata().getNamespace() @@ -145,7 +158,7 @@ class StatefulSetAllocatorSuite extends SparkFunSuite with BeforeAndAfter { val spec = set.getSpec() assert(spec.getReplicas() === 10) assert(spec.getPodManagementPolicy() === "Parallel") - verify(podOperations, never()).create(any()) + verify(podResource, never()).create() podsAllocatorUnderTest.setTotalExpectedExecutors( Map(defaultProfile -> (20))) verify(editableSet, times(1)).scale(any(), any()) diff --git a/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/ClientModeTestsSuite.scala b/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/ClientModeTestsSuite.scala index 1a9724afe30c9..93200ea1297d3 100644 --- a/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/ClientModeTestsSuite.scala +++ b/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/ClientModeTestsSuite.scala @@ -122,7 +122,8 @@ private[spark] trait ClientModeTestsSuite { k8sSuite: KubernetesSuite => .kubernetesClient .services() .inNamespace(kubernetesTestComponents.namespace) - .delete(driverService) + .resource(driverService) + .delete() // Delete all executors, since the test explicitly asks them not to be deleted by the app. kubernetesTestComponents .kubernetesClient diff --git a/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/SparkReadinessWatcher.scala b/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/SparkReadinessWatcher.scala index 6a3dfd5bf791d..efda414318134 100644 --- a/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/SparkReadinessWatcher.scala +++ b/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/SparkReadinessWatcher.scala @@ -22,7 +22,7 @@ import com.google.common.util.concurrent.SettableFuture import io.fabric8.kubernetes.api.model.HasMetadata import io.fabric8.kubernetes.client.{Watcher, WatcherException} import io.fabric8.kubernetes.client.Watcher.Action -import io.fabric8.kubernetes.client.internal.readiness.Readiness +import io.fabric8.kubernetes.client.readiness.Readiness private[spark] class SparkReadinessWatcher[T <: HasMetadata] extends Watcher[T] {