Skip to content

Commit

Permalink
[SPARK-41781][K8S] Add the ability to create pvc before creating driv…
Browse files Browse the repository at this point in the history
…er/executor pod
  • Loading branch information
dcoliversun committed Jan 10, 2023
1 parent 15a0f55 commit ad014e5
Show file tree
Hide file tree
Showing 10 changed files with 293 additions and 53 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -20,4 +20,5 @@ import io.fabric8.kubernetes.api.model.HasMetadata

private[spark] case class KubernetesExecutorSpec(
pod: SparkPod,
executorPreKubernetesResources: Seq[HasMetadata],
executorKubernetesResources: Seq[HasMetadata])
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ import java.util.{Collections, UUID}

import scala.collection.JavaConverters._

import io.fabric8.kubernetes.api.model.{Container, ContainerBuilder, ContainerStateRunning, ContainerStateTerminated, ContainerStateWaiting, ContainerStatus, EnvVar, EnvVarBuilder, EnvVarSourceBuilder, HasMetadata, OwnerReferenceBuilder, Pod, PodBuilder, Quantity}
import io.fabric8.kubernetes.api.model.{Container, ContainerBuilder, ContainerStateRunning, ContainerStateTerminated, ContainerStateWaiting, ContainerStatus, EnvVar, EnvVarBuilder, EnvVarSourceBuilder, HasMetadata, OwnerReferenceBuilder, PersistentVolumeClaim, Pod, PodBuilder, Quantity}
import io.fabric8.kubernetes.client.KubernetesClient
import org.apache.commons.codec.binary.Hex
import org.apache.hadoop.fs.{FileSystem, Path}
Expand Down Expand Up @@ -414,4 +414,52 @@ object KubernetesUtils extends Logging {
.build()
}
}

/**
* Create pre-resource in need before pod creation
*/
@Since("3.4.0")
def createPreResource(
client: KubernetesClient,
resource: HasMetadata,
namespace: String): Unit = {
resource match {
case pvc: PersistentVolumeClaim =>
client.persistentVolumeClaims().inNamespace(namespace).resource(pvc).create()
case other =>
client.resourceList(Seq(other): _*).createOrReplace()
}
}

/**
* Refresh OwnerReference in the given resource
* making the driver or executor pod an owner of them
*/
@Since("3.4.0")
def refreshOwnerReferenceInResource(
client: KubernetesClient,
resource: HasMetadata,
namespace: String,
pod: Pod): Unit = {
resource match {
case pvc: PersistentVolumeClaim =>
val createdPVC =
client
.persistentVolumeClaims()
.inNamespace(namespace)
.withName(pvc.getMetadata.getName)
.get()
addOwnerReference(pod, Seq(createdPVC))
logDebug(s"Trying to refresh PersistentVolumeClaim ${pvc.getMetadata.getName} with " +
s"OwnerReference ${pvc.getMetadata.getOwnerReferences}")
client
.persistentVolumeClaims()
.inNamespace(namespace)
.withName(pvc.getMetadata.getName)
.patch(pvc)
case other =>
addOwnerReference(pod, Seq(other))
client.resourceList(Seq(other): _*).createOrReplace()
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ private[spark] class MountVolumesFeatureStep(conf: KubernetesConf)
extends KubernetesFeatureConfigStep {
import MountVolumesFeatureStep._

val additionalResources = ArrayBuffer.empty[HasMetadata]
val additionalPreResources = ArrayBuffer.empty[HasMetadata]

override def configurePod(pod: SparkPod): SparkPod = {
val (volumeMounts, volumes) = constructVolumes(conf.volumes).unzip
Expand Down Expand Up @@ -82,7 +82,7 @@ private[spark] class MountVolumesFeatureStep(conf: KubernetesConf)
.replaceAll(PVC_ON_DEMAND, s"${conf.resourceNamePrefix}-driver$PVC_POSTFIX-$i")
}
if (storageClass.isDefined && size.isDefined) {
additionalResources.append(new PersistentVolumeClaimBuilder()
additionalPreResources.append(new PersistentVolumeClaimBuilder()
.withKind(PVC)
.withApiVersion("v1")
.withNewMetadata()
Expand Down Expand Up @@ -119,8 +119,8 @@ private[spark] class MountVolumesFeatureStep(conf: KubernetesConf)
}
}

override def getAdditionalKubernetesResources(): Seq[HasMetadata] = {
additionalResources.toSeq
override def getAdditionalPreKubernetesResources(): Seq[HasMetadata] = {
additionalPreResources.toSeq
}

private def checkPVCClaimName(claimName: String): Unit = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ import org.apache.spark.deploy.SparkApplication
import org.apache.spark.deploy.k8s._
import org.apache.spark.deploy.k8s.Config._
import org.apache.spark.deploy.k8s.Constants._
import org.apache.spark.deploy.k8s.KubernetesUtils.addOwnerReference
import org.apache.spark.deploy.k8s.KubernetesUtils.{addOwnerReference, createPreResource, refreshOwnerReferenceInResource}
import org.apache.spark.internal.Logging
import org.apache.spark.util.Utils

Expand Down Expand Up @@ -136,14 +136,16 @@ private[spark] class Client(

// setup resources before pod creation
val preKubernetesResources = resolvedDriverSpec.driverPreKubernetesResources
try {
kubernetesClient.resourceList(preKubernetesResources: _*).createOrReplace()
} catch {
case NonFatal(e) =>
logError("Please check \"kubectl auth can-i create [resource]\" first." +
" It should be yes. And please also check your feature step implementation.")
kubernetesClient.resourceList(preKubernetesResources: _*).delete()
throw e
preKubernetesResources.foreach { resource =>
try {
createPreResource(kubernetesClient, resource, conf.namespace)
} catch {
case NonFatal(e) =>
logError("Please check \"kubectl auth can-i create [resource]\" first." +
" It should be yes. And please also check feature step implementation.")
kubernetesClient.resourceList(Seq(resource): _*).delete()
throw e
}
}

var watch: Watch = null
Expand All @@ -159,14 +161,16 @@ private[spark] class Client(
}

// Refresh all pre-resources' owner references
try {
addOwnerReference(createdDriverPod, preKubernetesResources)
kubernetesClient.resourceList(preKubernetesResources: _*).createOrReplace()
} catch {
case NonFatal(e) =>
kubernetesClient.pods().resource(createdDriverPod).delete()
kubernetesClient.resourceList(preKubernetesResources: _*).delete()
throw e
preKubernetesResources.foreach { resource =>
try {
refreshOwnerReferenceInResource(kubernetesClient, resource, conf.namespace,
createdDriverPod)
} catch {
case NonFatal(e) =>
kubernetesClient.pods().resource(createdDriverPod).delete()
kubernetesClient.resourceList(Seq(resource): _*).delete()
throw e
}
}

// setup resources after pod creation, and refresh all resources' owner references
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ import org.apache.spark.{SecurityManager, SparkConf, SparkException}
import org.apache.spark.deploy.k8s.Config._
import org.apache.spark.deploy.k8s.Constants._
import org.apache.spark.deploy.k8s.KubernetesConf
import org.apache.spark.deploy.k8s.KubernetesUtils.addOwnerReference
import org.apache.spark.deploy.k8s.KubernetesUtils.{createPreResource, refreshOwnerReferenceInResource}
import org.apache.spark.internal.Logging
import org.apache.spark.internal.config.{DYN_ALLOCATION_EXECUTOR_IDLE_TIMEOUT, DYN_ALLOCATION_MAX_EXECUTORS, EXECUTOR_INSTANCES}
import org.apache.spark.resource.ResourceProfile
Expand Down Expand Up @@ -434,34 +434,59 @@ class ExecutorPodsAllocator(
.addToContainers(executorPod.container)
.endSpec()
.build()
val resources = replacePVCsIfNeeded(
podWithAttachedContainer, resolvedExecutorSpec.executorKubernetesResources, reusablePVCs)
val createdExecutorPod =
kubernetesClient.pods().inNamespace(namespace).resource(podWithAttachedContainer).create()
try {
addOwnerReference(createdExecutorPod, resources)
resources
.filter(_.getKind == "PersistentVolumeClaim")
.foreach { resource =>
if (conf.get(KUBERNETES_DRIVER_OWN_PVC) && driverPod.nonEmpty) {
addOwnerReference(driverPod.get, Seq(resource))
}
val pvc = resource.asInstanceOf[PersistentVolumeClaim]
logInfo(s"Trying to create PersistentVolumeClaim ${pvc.getMetadata.getName} with " +
s"StorageClass ${pvc.getSpec.getStorageClassName}")
kubernetesClient.persistentVolumeClaims().inNamespace(namespace).resource(pvc).create()
val preResources = replacePVCsIfNeeded(
podWithAttachedContainer, resolvedExecutorSpec.executorPreKubernetesResources, reusablePVCs)

preResources.foreach { resource =>
try {
createPreResource(kubernetesClient, resource, namespace)
if (resource.isInstanceOf[PersistentVolumeClaim]) {
PVC_COUNTER.incrementAndGet()
}
} catch {
case NonFatal(e) =>
logError("Please check \"kubectl auth can-i create [resource]\" first." +
" It should be yes. And please also check feature step implementation.")
kubernetesClient.resourceList(Seq(resource): _*).delete()
throw e
}
}

var createdExecutorPod: Pod = null
try {
createdExecutorPod =
kubernetesClient.pods().inNamespace(namespace).resource(podWithAttachedContainer).create()
newlyCreatedExecutors(newExecutorId) = (resourceProfileId, clock.getTimeMillis())
logDebug(s"Requested executor with id $newExecutorId from Kubernetes.")
} catch {
case NonFatal(e) =>
kubernetesClient.pods()
.inNamespace(namespace)
.resource(createdExecutorPod)
.delete()
kubernetesClient.resourceList(preResources: _*).delete()
logError("Please check \"kubectl auth can-i create pod\" first. It should be yes.")
throw e
}

// Refresh all pre-resources' owner references
preResources.foreach { resource =>
try {
if (resource.isInstanceOf[PersistentVolumeClaim]) {
if (conf.get(KUBERNETES_DRIVER_OWN_PVC) && driverPod.nonEmpty) {
refreshOwnerReferenceInResource(kubernetesClient, resource, namespace,
driverPod.get)
} else {
refreshOwnerReferenceInResource(kubernetesClient, resource, namespace,
createdExecutorPod)
}
} else {
refreshOwnerReferenceInResource(kubernetesClient, resource, namespace,
createdExecutorPod)
}
} catch {
case NonFatal(e) =>
kubernetesClient.pods().inNamespace(namespace).resource(createdExecutorPod).delete()
kubernetesClient.resourceList(Seq(resource): _*).delete()
throw e
}
}
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -75,15 +75,18 @@ private[spark] class KubernetesExecutorBuilder {

val spec = KubernetesExecutorSpec(
initialPod,
executorPreKubernetesResources = Seq.empty,
executorKubernetesResources = Seq.empty)

// If using a template this will always get the resources from that and combine
// them with any Spark conf or ResourceProfile resources.
features.foldLeft(spec) { case (spec, feature) =>
val configuredPod = feature.configurePod(spec.pod)
val addedPreResources = feature.getAdditionalPreKubernetesResources()
val addedResources = feature.getAdditionalKubernetesResources()
KubernetesExecutorSpec(
configuredPod,
spec.executorPreKubernetesResources ++ addedPreResources,
spec.executorKubernetesResources ++ addedResources)
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -127,10 +127,10 @@ class StatefulSetPodsAllocator(
val meta = executorPod.pod.getMetadata()

// Resources that need to be created, volumes are per-pod which is all we care about here.
val resources = resolvedExecutorSpec.executorKubernetesResources
val preResources = resolvedExecutorSpec.executorPreKubernetesResources
// We'll let PVCs be handled by the statefulset. Note user is responsible for
// cleaning up PVCs. Future work: integrate with KEP1847 once stabilized.
val dynamicVolumeClaims = resources.filter(_.getKind == "PersistentVolumeClaim")
val dynamicVolumeClaims = preResources.filter(_.getKind == "PersistentVolumeClaim")
.map(_.asInstanceOf[PersistentVolumeClaim])
// Remove the dynamic volumes from our pod
val dynamicVolumeClaimNames: Set[String] = dynamicVolumeClaims.map(_.getMetadata().getName())
Expand Down
Loading

0 comments on commit ad014e5

Please sign in to comment.