Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-40458][K8S] Bump Kubernetes Client Version to 6.1.1 #37990

Closed
wants to merge 8 commits into from
Closed
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -214,7 +214,7 @@
<arrow.version>9.0.0</arrow.version>
<!-- org.fusesource.leveldbjni will be used except on arm64 platform. -->
<leveldbjni.group>org.fusesource.leveldbjni</leveldbjni.group>
<kubernetes-client.version>5.12.3</kubernetes-client.version>
<kubernetes-client.version>6.1.1</kubernetes-client.version>

<test.java.home>${java.home}</test.java.home>

Expand Down
5 changes: 5 additions & 0 deletions resource-managers/kubernetes/core/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,11 @@
<scope>test</scope>
</dependency>

<dependency>
Copy link
Contributor Author

@attilapiros attilapiros Sep 25, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

https://github.com/fabric8io/kubernetes-client/blob/master/doc/MIGRATION-v6.md#okhttp-httpclient:

The -client dependencies still default to the OkHttp client If you are doing any customization to OkHttp clients directly, you'll need to include the kubernetes-httpclient-okhttp dependency in the compile scope - instead of the default runtime scope

<groupId>io.fabric8</groupId>
<artifactId>kubernetes-httpclient-okhttp</artifactId>
<version>${kubernetes-client.version}</version>
</dependency>
<dependency>
<groupId>io.fabric8</groupId>
<artifactId>kubernetes-client</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ import java.io.File
import com.fasterxml.jackson.databind.ObjectMapper
import com.google.common.base.Charsets
import com.google.common.io.Files
import io.fabric8.kubernetes.client.{ConfigBuilder, DefaultKubernetesClient, KubernetesClient}
import io.fabric8.kubernetes.client.{ConfigBuilder, KubernetesClient, KubernetesClientBuilder}
import io.fabric8.kubernetes.client.Config.KUBERNETES_REQUEST_RETRY_BACKOFFLIMIT_SYSTEM_PROPERTY
import io.fabric8.kubernetes.client.Config.autoConfigure
import io.fabric8.kubernetes.client.okhttp.OkHttpClientFactory
Expand Down Expand Up @@ -115,7 +115,10 @@ private[spark] object SparkKubernetesClientFactory extends Logging {
}
logDebug("Kubernetes client config: " +
new ObjectMapper().writerWithDefaultPrettyPrinter().writeValueAsString(config))
new DefaultKubernetesClient(factoryWithCustomDispatcher.createHttpClient(config), config)
new KubernetesClientBuilder()
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Because of apiimpl-split:

When you rely solely on a compile dependency to the respective -api dependencies you will not be able to use DefaultKubernetesClient nor DefaultOpenShiftClient directly to create your client instances. You should instead use KubernetesClientBuilder.

.withHttpClientFactory(factoryWithCustomDispatcher)
.withConfig(config)
.build()
}

private implicit class OptionConfigurableConfigBuilder(val configBuilder: ConfigBuilder)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,9 @@ package org.apache.spark.deploy.k8s.submit
import scala.collection.JavaConverters._

import K8SSparkSubmitOperation.getGracePeriod
import io.fabric8.kubernetes.api.model.{Pod, PodList}
import io.fabric8.kubernetes.api.model.Pod
import io.fabric8.kubernetes.client.KubernetesClient
import io.fabric8.kubernetes.client.dsl.{NonNamespaceOperation, PodResource}
import io.fabric8.kubernetes.client.dsl.PodResource

import org.apache.spark.SparkConf
import org.apache.spark.deploy.SparkSubmitOperation
Expand All @@ -32,25 +32,23 @@ import org.apache.spark.deploy.k8s.KubernetesUtils.formatPodState
import org.apache.spark.util.{CommandLineLoggingUtils, Utils}

private sealed trait K8sSubmitOp extends CommandLineLoggingUtils {
type NON_NAMESPACED_PODS =
NonNamespaceOperation[Pod, PodList, PodResource[Pod]]
def executeOnPod(pName: String, namespace: Option[String], sparkConf: SparkConf)
(implicit client: KubernetesClient): Unit
def executeOnGlob(pods: List[Pod], ns: Option[String], sparkConf: SparkConf)
(implicit client: KubernetesClient): Unit
def listPodsInNameSpace(namespace: Option[String])
(implicit client: KubernetesClient): NON_NAMESPACED_PODS = {
def getPod(namespace: Option[String], name: String)
(implicit client: KubernetesClient): PodResource = {
namespace match {
case Some(ns) => client.pods.inNamespace(ns)
case None => client.pods
case Some(ns) => client.pods.inNamespace(ns).withName(name)
case None => client.pods.withName(name)
}
}
}

private class KillApplication extends K8sSubmitOp {
override def executeOnPod(pName: String, namespace: Option[String], sparkConf: SparkConf)
(implicit client: KubernetesClient): Unit = {
val podToDelete = listPodsInNameSpace(namespace).withName(pName)
val podToDelete = getPod(namespace, pName)

if (Option(podToDelete).isDefined) {
getGracePeriod(sparkConf) match {
Expand All @@ -66,19 +64,11 @@ private class KillApplication extends K8sSubmitOp {
(implicit client: KubernetesClient): Unit = {
if (pods.nonEmpty) {
pods.foreach { pod => printMessage(s"Deleting driver pod: ${pod.getMetadata.getName}.") }
val listedPods = listPodsInNameSpace(namespace)

getGracePeriod(sparkConf) match {
case Some(period) =>
// this is not using the batch api because no option is provided
// when using the grace period.
pods.foreach { pod =>
listedPods
.withName(pod.getMetadata.getName)
.withGracePeriod(period)
.delete()
}
case _ => listedPods.delete(pods.asJava)
client.resourceList(pods.asJava).withGracePeriod(period).delete()
case _ =>
client.resourceList(pods.asJava).delete()
}
} else {
printMessage("No applications found.")
Expand All @@ -89,7 +79,7 @@ private class KillApplication extends K8sSubmitOp {
private class ListStatus extends K8sSubmitOp {
override def executeOnPod(pName: String, namespace: Option[String], sparkConf: SparkConf)
(implicit client: KubernetesClient): Unit = {
val pod = listPodsInNameSpace(namespace).withName(pName).get()
val pod = getPod(namespace, pName).get()
if (Option(pod).isDefined) {
printMessage("Application status (driver): " +
Option(pod).map(formatPodState).getOrElse("unknown."))
Expand Down Expand Up @@ -144,14 +134,13 @@ private[spark] class K8SSparkSubmitOperation extends SparkSubmitOperation
kubernetesClient
.pods
}
ops.withLabel(SPARK_ROLE_LABEL, SPARK_POD_DRIVER_ROLE)
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do the label based filtering as early as possible.

val pods = ops
.list()
.getItems
.asScala
.filter { pod =>
val meta = pod.getMetadata
meta.getName.startsWith(pName.stripSuffix("*")) &&
meta.getLabels.get(SPARK_ROLE_LABEL) == SPARK_POD_DRIVER_ROLE
pod.getMetadata.getName.startsWith(pName.stripSuffix("*"))
}.toList
op.executeOnGlob(pods, namespace, sparkConf)
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -149,7 +149,8 @@ private[spark] class Client(
var watch: Watch = null
var createdDriverPod: Pod = null
try {
createdDriverPod = kubernetesClient.pods().create(resolvedDriverPod)
createdDriverPod =
kubernetesClient.pods().inNamespace(conf.namespace).resource(resolvedDriverPod).create()
} catch {
case NonFatal(e) =>
kubernetesClient.resourceList(preKubernetesResources: _*).delete()
Expand All @@ -163,7 +164,7 @@ private[spark] class Client(
kubernetesClient.resourceList(preKubernetesResources: _*).createOrReplace()
} catch {
case NonFatal(e) =>
kubernetesClient.pods().delete(createdDriverPod)
kubernetesClient.pods().resource(createdDriverPod).delete()
kubernetesClient.resourceList(preKubernetesResources: _*).delete()
throw e
}
Expand All @@ -175,7 +176,7 @@ private[spark] class Client(
kubernetesClient.resourceList(otherKubernetesResources: _*).createOrReplace()
} catch {
case NonFatal(e) =>
kubernetesClient.pods().delete(createdDriverPod)
kubernetesClient.pods().resource(createdDriverPod).delete()
throw e
}

Expand All @@ -185,6 +186,7 @@ private[spark] class Client(
while (true) {
val podWithName = kubernetesClient
.pods()
.inNamespace(conf.namespace)
.withName(driverPodName)
// Reset resource to old before we start the watch, this is important for race conditions
watcher.reset()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,7 @@ class ExecutorPodsAllocator(

val driverPod = kubernetesDriverPodName
.map(name => Option(kubernetesClient.pods()
.inNamespace(namespace)
.withName(name)
.get())
.getOrElse(throw new SparkException(
Expand Down Expand Up @@ -112,6 +113,7 @@ class ExecutorPodsAllocator(
Utils.tryLogNonFatalError {
kubernetesClient
.pods()
.inNamespace(namespace)
.withName(pod.getMetadata.getName)
.waitUntilReady(driverPodReadinessTimeout, TimeUnit.SECONDS)
}
Expand Down Expand Up @@ -185,6 +187,7 @@ class ExecutorPodsAllocator(
Utils.tryLogNonFatalError {
kubernetesClient
.pods()
.inNamespace(namespace)
.withLabel(SPARK_APP_ID_LABEL, applicationId)
.withLabel(SPARK_ROLE_LABEL, SPARK_POD_EXECUTOR_ROLE)
.withLabelIn(SPARK_EXECUTOR_ID_LABEL, timedOut.toSeq.map(_.toString): _*)
Expand Down Expand Up @@ -299,6 +302,7 @@ class ExecutorPodsAllocator(
Utils.tryLogNonFatalError {
kubernetesClient
.pods()
.inNamespace(namespace)
.withField("status.phase", "Pending")
.withLabel(SPARK_APP_ID_LABEL, applicationId)
.withLabel(SPARK_ROLE_LABEL, SPARK_POD_EXECUTOR_ROLE)
Expand Down Expand Up @@ -406,7 +410,8 @@ class ExecutorPodsAllocator(
.build()
val resources = replacePVCsIfNeeded(
podWithAttachedContainer, resolvedExecutorSpec.executorKubernetesResources, reusablePVCs)
val createdExecutorPod = kubernetesClient.pods().create(podWithAttachedContainer)
val createdExecutorPod =
kubernetesClient.pods().inNamespace(namespace).resource(podWithAttachedContainer).create()
try {
addOwnerReference(createdExecutorPod, resources)
resources
Expand All @@ -418,13 +423,16 @@ class ExecutorPodsAllocator(
val pvc = resource.asInstanceOf[PersistentVolumeClaim]
logInfo(s"Trying to create PersistentVolumeClaim ${pvc.getMetadata.getName} with " +
s"StorageClass ${pvc.getSpec.getStorageClassName}")
kubernetesClient.persistentVolumeClaims().create(pvc)
kubernetesClient.persistentVolumeClaims().resource(pvc).create()
}
newlyCreatedExecutors(newExecutorId) = (resourceProfileId, clock.getTimeMillis())
logDebug(s"Requested executor with id $newExecutorId from Kubernetes.")
} catch {
case NonFatal(e) =>
kubernetesClient.pods().delete(createdExecutorPod)
kubernetesClient.pods()
.inNamespace(namespace)
.resource(createdExecutorPod)
.delete()
throw e
}
}
Expand Down Expand Up @@ -475,6 +483,7 @@ class ExecutorPodsAllocator(
Utils.tryLogNonFatalError {
kubernetesClient
.pods()
.inNamespace(namespace)
.withLabel(SPARK_APP_ID_LABEL, applicationId)
.withLabel(SPARK_ROLE_LABEL, SPARK_POD_EXECUTOR_ROLE)
.delete()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
package org.apache.spark.scheduler.cluster.k8s

import java.util.concurrent.TimeUnit
import java.util.function.UnaryOperator

import com.google.common.cache.CacheBuilder
import io.fabric8.kubernetes.api.model.{Pod, PodBuilder}
Expand Down Expand Up @@ -57,6 +58,8 @@ private[spark] class ExecutorPodsLifecycleManager(
// This set is cleaned up when a snapshot containing the updated pod is processed.
private val inactivatedPods = mutable.HashSet.empty[Long]

private val namespace = conf.get(KUBERNETES_NAMESPACE)

def start(schedulerBackend: KubernetesClusterSchedulerBackend): Unit = {
val eventProcessingInterval = conf.get(KUBERNETES_EXECUTOR_EVENT_PROCESSING_INTERVAL)
snapshotsStore.addSubscriber(eventProcessingInterval) {
Expand Down Expand Up @@ -168,23 +171,19 @@ private[spark] class ExecutorPodsLifecycleManager(
// of getting rid of the pod is what matters.
kubernetesClient
.pods()
.inNamespace(namespace)
.withName(updatedPod.getMetadata.getName)
.delete()
} else if (!inactivatedPods.contains(execId) && !isPodInactive(updatedPod)) {
// If the config is set to keep the executor around, mark the pod as "inactive" so it
// can be ignored in future updates from the API server.
logDebug(s"Marking executor ${updatedPod.getMetadata.getName} as inactive since " +
"deletion is disabled.")
val inactivatedPod = new PodBuilder(updatedPod)
.editMetadata()
.addToLabels(Map(SPARK_EXECUTOR_INACTIVE_LABEL -> "true").asJava)
.endMetadata()
.build()

kubernetesClient
.pods()
.inNamespace(namespace)
.withName(updatedPod.getMetadata.getName)
.patch(inactivatedPod)
.edit(executorInactivationFn)
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As I see the edit is preferred over the patch, see kubernetes-client-dsl-usage


inactivatedPods += execId
}
Expand Down Expand Up @@ -274,4 +273,9 @@ private object ExecutorPodsLifecycleManager {
s"${code}${humanStr}"
}

def executorInactivationFn: UnaryOperator[Pod] = (p: Pod) => new PodBuilder(p)
.editOrNewMetadata()
.addToLabels(SPARK_EXECUTOR_INACTIVE_LABEL, "true")
.endMetadata()
.build()
}
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@ private[spark] class KubernetesClusterSchedulerBackend(
Map(SPARK_APP_ID_LABEL -> applicationId(), SPARK_ROLE_LABEL -> SPARK_POD_EXECUTOR_ROLE)
val configMap = KubernetesClientUtils.buildConfigMap(configMapName, confFilesMap, labels)
KubernetesUtils.addOwnerReference(driverPod.orNull, Seq(configMap))
kubernetesClient.configMaps().create(configMap)
kubernetesClient.configMaps().inAnyNamespace().resource(configMap).create()
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@ class StatefulSetPodsAllocator(

val driverPod = kubernetesDriverPodName
.map(name => Option(kubernetesClient.pods()
.inNamespace(namespace)
.withName(name)
.get())
.getOrElse(throw new SparkException(
Expand All @@ -69,6 +70,7 @@ class StatefulSetPodsAllocator(
Utils.tryLogNonFatalError {
kubernetesClient
.pods()
.inNamespace(namespace)
.withName(pod.getMetadata.getName)
.waitUntilReady(driverPodReadinessTimeout, TimeUnit.SECONDS)
}
Expand Down Expand Up @@ -99,7 +101,7 @@ class StatefulSetPodsAllocator(
applicationId: String,
resourceProfileId: Int): Unit = {
if (setsCreated.contains(resourceProfileId)) {
val statefulset = kubernetesClient.apps().statefulSets().withName(
val statefulset = kubernetesClient.apps().statefulSets().inNamespace(namespace).withName(
setName(applicationId, resourceProfileId: Int))
statefulset.scale(expected, false /* wait */)
} else {
Expand Down Expand Up @@ -169,7 +171,7 @@ class StatefulSetPodsAllocator(
val statefulSet = new io.fabric8.kubernetes.api.model.apps.StatefulSetBuilder()
.withNewMetadata()
.withName(setName(applicationId, resourceProfileId))
.withNamespace(conf.get(KUBERNETES_NAMESPACE))
.withNamespace(namespace)
.endMetadata()
.withNewSpec()
.withPodManagementPolicy("Parallel")
Expand All @@ -185,7 +187,7 @@ class StatefulSetPodsAllocator(
.build()

addOwnerReference(driverPod.get, Seq(statefulSet))
kubernetesClient.apps().statefulSets().create(statefulSet)
kubernetesClient.apps().statefulSets().inNamespace(namespace).resource(statefulSet).create()
setsCreated += (resourceProfileId)
}
}
Expand All @@ -194,7 +196,12 @@ class StatefulSetPodsAllocator(
// Cleanup the statefulsets when we stop
setsCreated.foreach { rpid =>
Utils.tryLogNonFatalError {
kubernetesClient.apps().statefulSets().withName(setName(applicationId, rpid)).delete()
kubernetesClient
.apps()
.statefulSets()
.inNamespace(namespace)
.withName(setName(applicationId, rpid))
.delete()
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,19 +17,28 @@
package org.apache.spark.deploy.k8s

import io.fabric8.kubernetes.api.model.{ConfigMap, ConfigMapList, HasMetadata, PersistentVolumeClaim, PersistentVolumeClaimList, Pod, PodList}
import io.fabric8.kubernetes.client.dsl.{FilterWatchListDeletable, MixedOperation, NamespaceListVisitFromServerGetDeleteRecreateWaitApplicable, PodResource, Resource}
import io.fabric8.kubernetes.api.model.apps.StatefulSet
import io.fabric8.kubernetes.api.model.apps.StatefulSetList
import io.fabric8.kubernetes.client.dsl.{AnyNamespaceOperation, FilterWatchListDeletable, MixedOperation, NamespaceListVisitFromServerGetDeleteRecreateWaitApplicable, NonNamespaceOperation, PodResource, Resource, RollableScalableResource}

object Fabric8Aliases {
type PODS = MixedOperation[Pod, PodList, PodResource[Pod]]
type PODS = MixedOperation[Pod, PodList, PodResource]
type PODS_WITH_NAMESPACE = NonNamespaceOperation[Pod, PodList, PodResource]
type CONFIG_MAPS = MixedOperation[
ConfigMap, ConfigMapList, Resource[ConfigMap]]
type LABELED_PODS = FilterWatchListDeletable[Pod, PodList]
type LABELED_CONFIG_MAPS = FilterWatchListDeletable[ConfigMap, ConfigMapList]
type SINGLE_POD = PodResource[Pod]
type CONFIG_MAPS_OPERATION = AnyNamespaceOperation[ConfigMap, ConfigMapList, Resource[ConfigMap]]
type CONFIG_MAPS_RESOURCE = Resource[ConfigMap]
type LABELED_PODS = FilterWatchListDeletable[Pod, PodList, PodResource]
type LABELED_CONFIG_MAPS = FilterWatchListDeletable[ConfigMap, ConfigMapList, Resource[ConfigMap]]
type SINGLE_POD = PodResource
type RESOURCE_LIST = NamespaceListVisitFromServerGetDeleteRecreateWaitApplicable[
HasMetadata]
type STATEFUL_SET_RES = RollableScalableResource[StatefulSet]
type STATEFUL_SETS = MixedOperation[StatefulSet, StatefulSetList, STATEFUL_SET_RES]
type STATEFUL_SETS_NAMESPACED =
NonNamespaceOperation[StatefulSet, StatefulSetList, STATEFUL_SET_RES]
type PERSISTENT_VOLUME_CLAIMS = MixedOperation[PersistentVolumeClaim, PersistentVolumeClaimList,
Resource[PersistentVolumeClaim]]
type LABELED_PERSISTENT_VOLUME_CLAIMS =
FilterWatchListDeletable[PersistentVolumeClaim, PersistentVolumeClaimList]
type LABELED_PERSISTENT_VOLUME_CLAIMS = FilterWatchListDeletable[PersistentVolumeClaim,
PersistentVolumeClaimList, Resource[PersistentVolumeClaim]]
}
Loading