-
Notifications
You must be signed in to change notification settings - Fork 28.5k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[SPARK-40458][K8S] Bump Kubernetes Client Version to 6.1.1 #37990
Changes from 1 commit
5b6100c
0e0d93b
ad2f40c
c0c38be
2656c74
0132955
e5fdc50
30c9e4c
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -21,7 +21,7 @@ import java.io.File | |
import com.fasterxml.jackson.databind.ObjectMapper | ||
import com.google.common.base.Charsets | ||
import com.google.common.io.Files | ||
import io.fabric8.kubernetes.client.{ConfigBuilder, DefaultKubernetesClient, KubernetesClient} | ||
import io.fabric8.kubernetes.client.{ConfigBuilder, KubernetesClient, KubernetesClientBuilder} | ||
import io.fabric8.kubernetes.client.Config.KUBERNETES_REQUEST_RETRY_BACKOFFLIMIT_SYSTEM_PROPERTY | ||
import io.fabric8.kubernetes.client.Config.autoConfigure | ||
import io.fabric8.kubernetes.client.okhttp.OkHttpClientFactory | ||
|
@@ -115,7 +115,10 @@ private[spark] object SparkKubernetesClientFactory extends Logging { | |
} | ||
logDebug("Kubernetes client config: " + | ||
new ObjectMapper().writerWithDefaultPrettyPrinter().writeValueAsString(config)) | ||
new DefaultKubernetesClient(factoryWithCustomDispatcher.createHttpClient(config), config) | ||
new KubernetesClientBuilder() | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Because of apiimpl-split:
|
||
.withHttpClientFactory(factoryWithCustomDispatcher) | ||
.withConfig(config) | ||
.build() | ||
} | ||
|
||
private implicit class OptionConfigurableConfigBuilder(val configBuilder: ConfigBuilder) | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -19,9 +19,9 @@ package org.apache.spark.deploy.k8s.submit | |
import scala.collection.JavaConverters._ | ||
|
||
import K8SSparkSubmitOperation.getGracePeriod | ||
import io.fabric8.kubernetes.api.model.{Pod, PodList} | ||
import io.fabric8.kubernetes.api.model.Pod | ||
import io.fabric8.kubernetes.client.KubernetesClient | ||
import io.fabric8.kubernetes.client.dsl.{NonNamespaceOperation, PodResource} | ||
import io.fabric8.kubernetes.client.dsl.PodResource | ||
|
||
import org.apache.spark.SparkConf | ||
import org.apache.spark.deploy.SparkSubmitOperation | ||
|
@@ -32,25 +32,23 @@ import org.apache.spark.deploy.k8s.KubernetesUtils.formatPodState | |
import org.apache.spark.util.{CommandLineLoggingUtils, Utils} | ||
|
||
private sealed trait K8sSubmitOp extends CommandLineLoggingUtils { | ||
type NON_NAMESPACED_PODS = | ||
NonNamespaceOperation[Pod, PodList, PodResource[Pod]] | ||
def executeOnPod(pName: String, namespace: Option[String], sparkConf: SparkConf) | ||
(implicit client: KubernetesClient): Unit | ||
def executeOnGlob(pods: List[Pod], ns: Option[String], sparkConf: SparkConf) | ||
(implicit client: KubernetesClient): Unit | ||
def listPodsInNameSpace(namespace: Option[String]) | ||
(implicit client: KubernetesClient): NON_NAMESPACED_PODS = { | ||
def getPod(namespace: Option[String], name: String) | ||
(implicit client: KubernetesClient): PodResource = { | ||
namespace match { | ||
case Some(ns) => client.pods.inNamespace(ns) | ||
case None => client.pods | ||
case Some(ns) => client.pods.inNamespace(ns).withName(name) | ||
case None => client.pods.withName(name) | ||
} | ||
} | ||
} | ||
|
||
private class KillApplication extends K8sSubmitOp { | ||
override def executeOnPod(pName: String, namespace: Option[String], sparkConf: SparkConf) | ||
(implicit client: KubernetesClient): Unit = { | ||
val podToDelete = listPodsInNameSpace(namespace).withName(pName) | ||
val podToDelete = getPod(namespace, pName) | ||
|
||
if (Option(podToDelete).isDefined) { | ||
getGracePeriod(sparkConf) match { | ||
|
@@ -66,19 +64,11 @@ private class KillApplication extends K8sSubmitOp { | |
(implicit client: KubernetesClient): Unit = { | ||
if (pods.nonEmpty) { | ||
pods.foreach { pod => printMessage(s"Deleting driver pod: ${pod.getMetadata.getName}.") } | ||
val listedPods = listPodsInNameSpace(namespace) | ||
|
||
getGracePeriod(sparkConf) match { | ||
case Some(period) => | ||
// this is not using the batch api because no option is provided | ||
// when using the grace period. | ||
pods.foreach { pod => | ||
listedPods | ||
.withName(pod.getMetadata.getName) | ||
.withGracePeriod(period) | ||
.delete() | ||
} | ||
case _ => listedPods.delete(pods.asJava) | ||
client.resourceList(pods.asJava).withGracePeriod(period).delete() | ||
case _ => | ||
client.resourceList(pods.asJava).delete() | ||
} | ||
} else { | ||
printMessage("No applications found.") | ||
|
@@ -89,7 +79,7 @@ private class KillApplication extends K8sSubmitOp { | |
private class ListStatus extends K8sSubmitOp { | ||
override def executeOnPod(pName: String, namespace: Option[String], sparkConf: SparkConf) | ||
(implicit client: KubernetesClient): Unit = { | ||
val pod = listPodsInNameSpace(namespace).withName(pName).get() | ||
val pod = getPod(namespace, pName).get() | ||
if (Option(pod).isDefined) { | ||
printMessage("Application status (driver): " + | ||
Option(pod).map(formatPodState).getOrElse("unknown.")) | ||
|
@@ -144,14 +134,13 @@ private[spark] class K8SSparkSubmitOperation extends SparkSubmitOperation | |
kubernetesClient | ||
.pods | ||
} | ||
ops.withLabel(SPARK_ROLE_LABEL, SPARK_POD_DRIVER_ROLE) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Do the label based filtering as early as possible. |
||
val pods = ops | ||
.list() | ||
.getItems | ||
.asScala | ||
.filter { pod => | ||
val meta = pod.getMetadata | ||
meta.getName.startsWith(pName.stripSuffix("*")) && | ||
meta.getLabels.get(SPARK_ROLE_LABEL) == SPARK_POD_DRIVER_ROLE | ||
pod.getMetadata.getName.startsWith(pName.stripSuffix("*")) | ||
}.toList | ||
op.executeOnGlob(pods, namespace, sparkConf) | ||
} else { | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -17,6 +17,7 @@ | |
package org.apache.spark.scheduler.cluster.k8s | ||
|
||
import java.util.concurrent.TimeUnit | ||
import java.util.function.UnaryOperator | ||
|
||
import com.google.common.cache.CacheBuilder | ||
import io.fabric8.kubernetes.api.model.{Pod, PodBuilder} | ||
|
@@ -57,6 +58,8 @@ private[spark] class ExecutorPodsLifecycleManager( | |
// This set is cleaned up when a snapshot containing the updated pod is processed. | ||
private val inactivatedPods = mutable.HashSet.empty[Long] | ||
|
||
private val namespace = conf.get(KUBERNETES_NAMESPACE) | ||
|
||
def start(schedulerBackend: KubernetesClusterSchedulerBackend): Unit = { | ||
val eventProcessingInterval = conf.get(KUBERNETES_EXECUTOR_EVENT_PROCESSING_INTERVAL) | ||
snapshotsStore.addSubscriber(eventProcessingInterval) { | ||
|
@@ -168,23 +171,19 @@ private[spark] class ExecutorPodsLifecycleManager( | |
// of getting rid of the pod is what matters. | ||
kubernetesClient | ||
.pods() | ||
.inNamespace(namespace) | ||
.withName(updatedPod.getMetadata.getName) | ||
.delete() | ||
} else if (!inactivatedPods.contains(execId) && !isPodInactive(updatedPod)) { | ||
// If the config is set to keep the executor around, mark the pod as "inactive" so it | ||
// can be ignored in future updates from the API server. | ||
logDebug(s"Marking executor ${updatedPod.getMetadata.getName} as inactive since " + | ||
"deletion is disabled.") | ||
val inactivatedPod = new PodBuilder(updatedPod) | ||
.editMetadata() | ||
.addToLabels(Map(SPARK_EXECUTOR_INACTIVE_LABEL -> "true").asJava) | ||
.endMetadata() | ||
.build() | ||
|
||
kubernetesClient | ||
.pods() | ||
.inNamespace(namespace) | ||
.withName(updatedPod.getMetadata.getName) | ||
.patch(inactivatedPod) | ||
.edit(executorInactivationFn) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. As I see the |
||
|
||
inactivatedPods += execId | ||
} | ||
|
@@ -274,4 +273,9 @@ private object ExecutorPodsLifecycleManager { | |
s"${code}${humanStr}" | ||
} | ||
|
||
def executorInactivationFn: UnaryOperator[Pod] = (p: Pod) => new PodBuilder(p) | ||
.editOrNewMetadata() | ||
.addToLabels(SPARK_EXECUTOR_INACTIVE_LABEL, "true") | ||
.endMetadata() | ||
.build() | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
https://github.com/fabric8io/kubernetes-client/blob/master/doc/MIGRATION-v6.md#okhttp-httpclient: