Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-40458][K8S] Bump Kubernetes Client Version to 6.1.1 #37990

Closed
wants to merge 8 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
47 changes: 24 additions & 23 deletions dev/deps/spark-deps-hadoop-2-hive-2.3
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@ arrow-memory-core/9.0.0//arrow-memory-core-9.0.0.jar
arrow-memory-netty/9.0.0//arrow-memory-netty-9.0.0.jar
arrow-vector/9.0.0//arrow-vector-9.0.0.jar
audience-annotations/0.5.0//audience-annotations-0.5.0.jar
automaton/1.11-8//automaton-1.11-8.jar
avro-ipc/1.11.1//avro-ipc-1.11.1.jar
avro-mapred/1.11.1//avro-mapred-1.11.1.jar
avro/1.11.1//avro-1.11.1.jar
Expand Down Expand Up @@ -69,7 +68,6 @@ error_prone_annotations/2.10.0//error_prone_annotations-2.10.0.jar
failureaccess/1.0.1//failureaccess-1.0.1.jar
flatbuffers-java/1.12.0//flatbuffers-java-1.12.0.jar
gcs-connector/hadoop2-2.2.7/shaded/gcs-connector-hadoop2-2.2.7-shaded.jar
generex/1.0.2//generex-1.0.2.jar
gmetric4j/1.0.10//gmetric4j-1.0.10.jar
grpc-api/1.47.0//grpc-api-1.47.0.jar
grpc-context/1.47.0//grpc-context-1.47.0.jar
Expand Down Expand Up @@ -175,27 +173,30 @@ jsr305/3.0.0//jsr305-3.0.0.jar
jta/1.1//jta-1.1.jar
jul-to-slf4j/1.7.36//jul-to-slf4j-1.7.36.jar
kryo-shaded/4.0.2//kryo-shaded-4.0.2.jar
kubernetes-client/5.12.3//kubernetes-client-5.12.3.jar
kubernetes-model-admissionregistration/5.12.3//kubernetes-model-admissionregistration-5.12.3.jar
kubernetes-model-apiextensions/5.12.3//kubernetes-model-apiextensions-5.12.3.jar
kubernetes-model-apps/5.12.3//kubernetes-model-apps-5.12.3.jar
kubernetes-model-autoscaling/5.12.3//kubernetes-model-autoscaling-5.12.3.jar
kubernetes-model-batch/5.12.3//kubernetes-model-batch-5.12.3.jar
kubernetes-model-certificates/5.12.3//kubernetes-model-certificates-5.12.3.jar
kubernetes-model-common/5.12.3//kubernetes-model-common-5.12.3.jar
kubernetes-model-coordination/5.12.3//kubernetes-model-coordination-5.12.3.jar
kubernetes-model-core/5.12.3//kubernetes-model-core-5.12.3.jar
kubernetes-model-discovery/5.12.3//kubernetes-model-discovery-5.12.3.jar
kubernetes-model-events/5.12.3//kubernetes-model-events-5.12.3.jar
kubernetes-model-extensions/5.12.3//kubernetes-model-extensions-5.12.3.jar
kubernetes-model-flowcontrol/5.12.3//kubernetes-model-flowcontrol-5.12.3.jar
kubernetes-model-metrics/5.12.3//kubernetes-model-metrics-5.12.3.jar
kubernetes-model-networking/5.12.3//kubernetes-model-networking-5.12.3.jar
kubernetes-model-node/5.12.3//kubernetes-model-node-5.12.3.jar
kubernetes-model-policy/5.12.3//kubernetes-model-policy-5.12.3.jar
kubernetes-model-rbac/5.12.3//kubernetes-model-rbac-5.12.3.jar
kubernetes-model-scheduling/5.12.3//kubernetes-model-scheduling-5.12.3.jar
kubernetes-model-storageclass/5.12.3//kubernetes-model-storageclass-5.12.3.jar
kubernetes-client-api/6.1.1//kubernetes-client-api-6.1.1.jar
kubernetes-client/6.1.1//kubernetes-client-6.1.1.jar
kubernetes-httpclient-okhttp/6.1.1//kubernetes-httpclient-okhttp-6.1.1.jar
kubernetes-model-admissionregistration/6.1.1//kubernetes-model-admissionregistration-6.1.1.jar
kubernetes-model-apiextensions/6.1.1//kubernetes-model-apiextensions-6.1.1.jar
kubernetes-model-apps/6.1.1//kubernetes-model-apps-6.1.1.jar
kubernetes-model-autoscaling/6.1.1//kubernetes-model-autoscaling-6.1.1.jar
kubernetes-model-batch/6.1.1//kubernetes-model-batch-6.1.1.jar
kubernetes-model-certificates/6.1.1//kubernetes-model-certificates-6.1.1.jar
kubernetes-model-common/6.1.1//kubernetes-model-common-6.1.1.jar
kubernetes-model-coordination/6.1.1//kubernetes-model-coordination-6.1.1.jar
kubernetes-model-core/6.1.1//kubernetes-model-core-6.1.1.jar
kubernetes-model-discovery/6.1.1//kubernetes-model-discovery-6.1.1.jar
kubernetes-model-events/6.1.1//kubernetes-model-events-6.1.1.jar
kubernetes-model-extensions/6.1.1//kubernetes-model-extensions-6.1.1.jar
kubernetes-model-flowcontrol/6.1.1//kubernetes-model-flowcontrol-6.1.1.jar
kubernetes-model-gatewayapi/6.1.1//kubernetes-model-gatewayapi-6.1.1.jar
kubernetes-model-metrics/6.1.1//kubernetes-model-metrics-6.1.1.jar
kubernetes-model-networking/6.1.1//kubernetes-model-networking-6.1.1.jar
kubernetes-model-node/6.1.1//kubernetes-model-node-6.1.1.jar
kubernetes-model-policy/6.1.1//kubernetes-model-policy-6.1.1.jar
kubernetes-model-rbac/6.1.1//kubernetes-model-rbac-6.1.1.jar
kubernetes-model-scheduling/6.1.1//kubernetes-model-scheduling-6.1.1.jar
kubernetes-model-storageclass/6.1.1//kubernetes-model-storageclass-6.1.1.jar
lapack/3.0.2//lapack-3.0.2.jar
leveldbjni-all/1.8//leveldbjni-all-1.8.jar
libfb303/0.9.3//libfb303-0.9.3.jar
Expand Down
47 changes: 24 additions & 23 deletions dev/deps/spark-deps-hadoop-3-hive-2.3
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@ arrow-memory-core/9.0.0//arrow-memory-core-9.0.0.jar
arrow-memory-netty/9.0.0//arrow-memory-netty-9.0.0.jar
arrow-vector/9.0.0//arrow-vector-9.0.0.jar
audience-annotations/0.5.0//audience-annotations-0.5.0.jar
automaton/1.11-8//automaton-1.11-8.jar
avro-ipc/1.11.1//avro-ipc-1.11.1.jar
avro-mapred/1.11.1//avro-mapred-1.11.1.jar
avro/1.11.1//avro-1.11.1.jar
Expand Down Expand Up @@ -66,7 +65,6 @@ error_prone_annotations/2.10.0//error_prone_annotations-2.10.0.jar
failureaccess/1.0.1//failureaccess-1.0.1.jar
flatbuffers-java/1.12.0//flatbuffers-java-1.12.0.jar
gcs-connector/hadoop3-2.2.7/shaded/gcs-connector-hadoop3-2.2.7-shaded.jar
generex/1.0.2//generex-1.0.2.jar
gmetric4j/1.0.10//gmetric4j-1.0.10.jar
grpc-api/1.47.0//grpc-api-1.47.0.jar
grpc-context/1.47.0//grpc-context-1.47.0.jar
Expand Down Expand Up @@ -159,27 +157,30 @@ jsr305/3.0.0//jsr305-3.0.0.jar
jta/1.1//jta-1.1.jar
jul-to-slf4j/1.7.36//jul-to-slf4j-1.7.36.jar
kryo-shaded/4.0.2//kryo-shaded-4.0.2.jar
kubernetes-client/5.12.3//kubernetes-client-5.12.3.jar
kubernetes-model-admissionregistration/5.12.3//kubernetes-model-admissionregistration-5.12.3.jar
kubernetes-model-apiextensions/5.12.3//kubernetes-model-apiextensions-5.12.3.jar
kubernetes-model-apps/5.12.3//kubernetes-model-apps-5.12.3.jar
kubernetes-model-autoscaling/5.12.3//kubernetes-model-autoscaling-5.12.3.jar
kubernetes-model-batch/5.12.3//kubernetes-model-batch-5.12.3.jar
kubernetes-model-certificates/5.12.3//kubernetes-model-certificates-5.12.3.jar
kubernetes-model-common/5.12.3//kubernetes-model-common-5.12.3.jar
kubernetes-model-coordination/5.12.3//kubernetes-model-coordination-5.12.3.jar
kubernetes-model-core/5.12.3//kubernetes-model-core-5.12.3.jar
kubernetes-model-discovery/5.12.3//kubernetes-model-discovery-5.12.3.jar
kubernetes-model-events/5.12.3//kubernetes-model-events-5.12.3.jar
kubernetes-model-extensions/5.12.3//kubernetes-model-extensions-5.12.3.jar
kubernetes-model-flowcontrol/5.12.3//kubernetes-model-flowcontrol-5.12.3.jar
kubernetes-model-metrics/5.12.3//kubernetes-model-metrics-5.12.3.jar
kubernetes-model-networking/5.12.3//kubernetes-model-networking-5.12.3.jar
kubernetes-model-node/5.12.3//kubernetes-model-node-5.12.3.jar
kubernetes-model-policy/5.12.3//kubernetes-model-policy-5.12.3.jar
kubernetes-model-rbac/5.12.3//kubernetes-model-rbac-5.12.3.jar
kubernetes-model-scheduling/5.12.3//kubernetes-model-scheduling-5.12.3.jar
kubernetes-model-storageclass/5.12.3//kubernetes-model-storageclass-5.12.3.jar
kubernetes-client-api/6.1.1//kubernetes-client-api-6.1.1.jar
kubernetes-client/6.1.1//kubernetes-client-6.1.1.jar
kubernetes-httpclient-okhttp/6.1.1//kubernetes-httpclient-okhttp-6.1.1.jar
kubernetes-model-admissionregistration/6.1.1//kubernetes-model-admissionregistration-6.1.1.jar
kubernetes-model-apiextensions/6.1.1//kubernetes-model-apiextensions-6.1.1.jar
kubernetes-model-apps/6.1.1//kubernetes-model-apps-6.1.1.jar
kubernetes-model-autoscaling/6.1.1//kubernetes-model-autoscaling-6.1.1.jar
kubernetes-model-batch/6.1.1//kubernetes-model-batch-6.1.1.jar
kubernetes-model-certificates/6.1.1//kubernetes-model-certificates-6.1.1.jar
kubernetes-model-common/6.1.1//kubernetes-model-common-6.1.1.jar
kubernetes-model-coordination/6.1.1//kubernetes-model-coordination-6.1.1.jar
kubernetes-model-core/6.1.1//kubernetes-model-core-6.1.1.jar
kubernetes-model-discovery/6.1.1//kubernetes-model-discovery-6.1.1.jar
kubernetes-model-events/6.1.1//kubernetes-model-events-6.1.1.jar
kubernetes-model-extensions/6.1.1//kubernetes-model-extensions-6.1.1.jar
kubernetes-model-flowcontrol/6.1.1//kubernetes-model-flowcontrol-6.1.1.jar
kubernetes-model-gatewayapi/6.1.1//kubernetes-model-gatewayapi-6.1.1.jar
kubernetes-model-metrics/6.1.1//kubernetes-model-metrics-6.1.1.jar
kubernetes-model-networking/6.1.1//kubernetes-model-networking-6.1.1.jar
kubernetes-model-node/6.1.1//kubernetes-model-node-6.1.1.jar
kubernetes-model-policy/6.1.1//kubernetes-model-policy-6.1.1.jar
kubernetes-model-rbac/6.1.1//kubernetes-model-rbac-6.1.1.jar
kubernetes-model-scheduling/6.1.1//kubernetes-model-scheduling-6.1.1.jar
kubernetes-model-storageclass/6.1.1//kubernetes-model-storageclass-6.1.1.jar
lapack/3.0.2//lapack-3.0.2.jar
leveldbjni-all/1.8//leveldbjni-all-1.8.jar
libfb303/0.9.3//libfb303-0.9.3.jar
Expand Down
2 changes: 1 addition & 1 deletion pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -216,7 +216,7 @@
<arrow.version>9.0.0</arrow.version>
<!-- org.fusesource.leveldbjni will be used except on arm64 platform. -->
<leveldbjni.group>org.fusesource.leveldbjni</leveldbjni.group>
<kubernetes-client.version>5.12.3</kubernetes-client.version>
<kubernetes-client.version>6.1.1</kubernetes-client.version>

<test.java.home>${java.home}</test.java.home>

Expand Down
5 changes: 5 additions & 0 deletions resource-managers/kubernetes/core/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,11 @@
<scope>test</scope>
</dependency>

<dependency>
Copy link
Contributor Author

@attilapiros attilapiros Sep 25, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

https://github.com/fabric8io/kubernetes-client/blob/master/doc/MIGRATION-v6.md#okhttp-httpclient:

The -client dependencies still default to the OkHttp client If you are doing any customization to OkHttp clients directly, you'll need to include the kubernetes-httpclient-okhttp dependency in the compile scope - instead of the default runtime scope

<groupId>io.fabric8</groupId>
<artifactId>kubernetes-httpclient-okhttp</artifactId>
<version>${kubernetes-client.version}</version>
</dependency>
<dependency>
<groupId>io.fabric8</groupId>
<artifactId>kubernetes-client</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ import java.io.File
import com.fasterxml.jackson.databind.ObjectMapper
import com.google.common.base.Charsets
import com.google.common.io.Files
import io.fabric8.kubernetes.client.{ConfigBuilder, DefaultKubernetesClient, KubernetesClient}
import io.fabric8.kubernetes.client.{ConfigBuilder, KubernetesClient, KubernetesClientBuilder}
import io.fabric8.kubernetes.client.Config.KUBERNETES_REQUEST_RETRY_BACKOFFLIMIT_SYSTEM_PROPERTY
import io.fabric8.kubernetes.client.Config.autoConfigure
import io.fabric8.kubernetes.client.okhttp.OkHttpClientFactory
Expand Down Expand Up @@ -115,7 +115,10 @@ private[spark] object SparkKubernetesClientFactory extends Logging {
}
logDebug("Kubernetes client config: " +
new ObjectMapper().writerWithDefaultPrettyPrinter().writeValueAsString(config))
new DefaultKubernetesClient(factoryWithCustomDispatcher.createHttpClient(config), config)
new KubernetesClientBuilder()
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Because of apiimpl-split:

When you rely solely on a compile dependency to the respective -api dependencies you will not be able to use DefaultKubernetesClient nor DefaultOpenShiftClient directly to create your client instances. You should instead use KubernetesClientBuilder.

.withHttpClientFactory(factoryWithCustomDispatcher)
.withConfig(config)
.build()
}

private implicit class OptionConfigurableConfigBuilder(val configBuilder: ConfigBuilder)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,9 @@ package org.apache.spark.deploy.k8s.submit
import scala.collection.JavaConverters._

import K8SSparkSubmitOperation.getGracePeriod
import io.fabric8.kubernetes.api.model.{Pod, PodList}
import io.fabric8.kubernetes.api.model.Pod
import io.fabric8.kubernetes.client.KubernetesClient
import io.fabric8.kubernetes.client.dsl.{NonNamespaceOperation, PodResource}
import io.fabric8.kubernetes.client.dsl.PodResource

import org.apache.spark.SparkConf
import org.apache.spark.deploy.SparkSubmitOperation
Expand All @@ -32,25 +32,23 @@ import org.apache.spark.deploy.k8s.KubernetesUtils.formatPodState
import org.apache.spark.util.{CommandLineLoggingUtils, Utils}

private sealed trait K8sSubmitOp extends CommandLineLoggingUtils {
type NON_NAMESPACED_PODS =
NonNamespaceOperation[Pod, PodList, PodResource[Pod]]
def executeOnPod(pName: String, namespace: Option[String], sparkConf: SparkConf)
(implicit client: KubernetesClient): Unit
def executeOnGlob(pods: List[Pod], ns: Option[String], sparkConf: SparkConf)
(implicit client: KubernetesClient): Unit
def listPodsInNameSpace(namespace: Option[String])
(implicit client: KubernetesClient): NON_NAMESPACED_PODS = {
def getPod(namespace: Option[String], name: String)
(implicit client: KubernetesClient): PodResource = {
namespace match {
case Some(ns) => client.pods.inNamespace(ns)
case None => client.pods
case Some(ns) => client.pods.inNamespace(ns).withName(name)
case None => client.pods.withName(name)
}
}
}

private class KillApplication extends K8sSubmitOp {
override def executeOnPod(pName: String, namespace: Option[String], sparkConf: SparkConf)
(implicit client: KubernetesClient): Unit = {
val podToDelete = listPodsInNameSpace(namespace).withName(pName)
val podToDelete = getPod(namespace, pName)

if (Option(podToDelete).isDefined) {
getGracePeriod(sparkConf) match {
Expand All @@ -66,19 +64,11 @@ private class KillApplication extends K8sSubmitOp {
(implicit client: KubernetesClient): Unit = {
if (pods.nonEmpty) {
pods.foreach { pod => printMessage(s"Deleting driver pod: ${pod.getMetadata.getName}.") }
val listedPods = listPodsInNameSpace(namespace)

getGracePeriod(sparkConf) match {
case Some(period) =>
// this is not using the batch api because no option is provided
// when using the grace period.
pods.foreach { pod =>
listedPods
.withName(pod.getMetadata.getName)
.withGracePeriod(period)
.delete()
}
case _ => listedPods.delete(pods.asJava)
client.resourceList(pods.asJava).withGracePeriod(period).delete()
case _ =>
client.resourceList(pods.asJava).delete()
}
} else {
printMessage("No applications found.")
Expand All @@ -89,7 +79,7 @@ private class KillApplication extends K8sSubmitOp {
private class ListStatus extends K8sSubmitOp {
override def executeOnPod(pName: String, namespace: Option[String], sparkConf: SparkConf)
(implicit client: KubernetesClient): Unit = {
val pod = listPodsInNameSpace(namespace).withName(pName).get()
val pod = getPod(namespace, pName).get()
if (Option(pod).isDefined) {
printMessage("Application status (driver): " +
Option(pod).map(formatPodState).getOrElse("unknown."))
Expand Down Expand Up @@ -145,13 +135,12 @@ private[spark] class K8SSparkSubmitOperation extends SparkSubmitOperation
.pods
}
val pods = ops
.withLabel(SPARK_ROLE_LABEL, SPARK_POD_DRIVER_ROLE)
.list()
.getItems
.asScala
.filter { pod =>
val meta = pod.getMetadata
meta.getName.startsWith(pName.stripSuffix("*")) &&
meta.getLabels.get(SPARK_ROLE_LABEL) == SPARK_POD_DRIVER_ROLE
pod.getMetadata.getName.startsWith(pName.stripSuffix("*"))
}.toList
op.executeOnGlob(pods, namespace, sparkConf)
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -149,7 +149,8 @@ private[spark] class Client(
var watch: Watch = null
var createdDriverPod: Pod = null
try {
createdDriverPod = kubernetesClient.pods().create(resolvedDriverPod)
createdDriverPod =
kubernetesClient.pods().inNamespace(conf.namespace).resource(resolvedDriverPod).create()
} catch {
case NonFatal(e) =>
kubernetesClient.resourceList(preKubernetesResources: _*).delete()
Expand All @@ -163,7 +164,7 @@ private[spark] class Client(
kubernetesClient.resourceList(preKubernetesResources: _*).createOrReplace()
} catch {
case NonFatal(e) =>
kubernetesClient.pods().delete(createdDriverPod)
kubernetesClient.pods().resource(createdDriverPod).delete()
kubernetesClient.resourceList(preKubernetesResources: _*).delete()
throw e
}
Expand All @@ -175,7 +176,7 @@ private[spark] class Client(
kubernetesClient.resourceList(otherKubernetesResources: _*).createOrReplace()
} catch {
case NonFatal(e) =>
kubernetesClient.pods().delete(createdDriverPod)
kubernetesClient.pods().resource(createdDriverPod).delete()
throw e
}

Expand All @@ -185,6 +186,7 @@ private[spark] class Client(
while (true) {
val podWithName = kubernetesClient
.pods()
.inNamespace(conf.namespace)
.withName(driverPodName)
// Reset resource to old before we start the watch, this is important for race conditions
watcher.reset()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,7 @@ class ExecutorPodsAllocator(

val driverPod = kubernetesDriverPodName
.map(name => Option(kubernetesClient.pods()
.inNamespace(namespace)
.withName(name)
.get())
.getOrElse(throw new SparkException(
Expand Down Expand Up @@ -112,6 +113,7 @@ class ExecutorPodsAllocator(
Utils.tryLogNonFatalError {
kubernetesClient
.pods()
.inNamespace(namespace)
.withName(pod.getMetadata.getName)
.waitUntilReady(driverPodReadinessTimeout, TimeUnit.SECONDS)
}
Expand Down Expand Up @@ -185,6 +187,7 @@ class ExecutorPodsAllocator(
Utils.tryLogNonFatalError {
kubernetesClient
.pods()
.inNamespace(namespace)
.withLabel(SPARK_APP_ID_LABEL, applicationId)
.withLabel(SPARK_ROLE_LABEL, SPARK_POD_EXECUTOR_ROLE)
.withLabelIn(SPARK_EXECUTOR_ID_LABEL, timedOut.toSeq.map(_.toString): _*)
Expand Down Expand Up @@ -299,6 +302,7 @@ class ExecutorPodsAllocator(
Utils.tryLogNonFatalError {
kubernetesClient
.pods()
.inNamespace(namespace)
.withField("status.phase", "Pending")
.withLabel(SPARK_APP_ID_LABEL, applicationId)
.withLabel(SPARK_ROLE_LABEL, SPARK_POD_EXECUTOR_ROLE)
Expand Down Expand Up @@ -363,6 +367,7 @@ class ExecutorPodsAllocator(
try {
val createdPVCs = kubernetesClient
.persistentVolumeClaims
.inNamespace(namespace)
.withLabel("spark-app-selector", applicationId)
.list()
.getItems
Expand Down Expand Up @@ -406,7 +411,8 @@ class ExecutorPodsAllocator(
.build()
val resources = replacePVCsIfNeeded(
podWithAttachedContainer, resolvedExecutorSpec.executorKubernetesResources, reusablePVCs)
val createdExecutorPod = kubernetesClient.pods().create(podWithAttachedContainer)
val createdExecutorPod =
kubernetesClient.pods().inNamespace(namespace).resource(podWithAttachedContainer).create()
try {
addOwnerReference(createdExecutorPod, resources)
resources
Expand All @@ -418,13 +424,16 @@ class ExecutorPodsAllocator(
val pvc = resource.asInstanceOf[PersistentVolumeClaim]
logInfo(s"Trying to create PersistentVolumeClaim ${pvc.getMetadata.getName} with " +
s"StorageClass ${pvc.getSpec.getStorageClassName}")
kubernetesClient.persistentVolumeClaims().create(pvc)
kubernetesClient.persistentVolumeClaims().inNamespace(namespace).resource(pvc).create()
}
newlyCreatedExecutors(newExecutorId) = (resourceProfileId, clock.getTimeMillis())
logDebug(s"Requested executor with id $newExecutorId from Kubernetes.")
} catch {
case NonFatal(e) =>
kubernetesClient.pods().delete(createdExecutorPod)
kubernetesClient.pods()
.inNamespace(namespace)
.resource(createdExecutorPod)
.delete()
throw e
}
}
Expand Down Expand Up @@ -475,6 +484,7 @@ class ExecutorPodsAllocator(
Utils.tryLogNonFatalError {
kubernetesClient
.pods()
.inNamespace(namespace)
.withLabel(SPARK_APP_ID_LABEL, applicationId)
.withLabel(SPARK_ROLE_LABEL, SPARK_POD_EXECUTOR_ROLE)
.delete()
Expand Down
Loading