diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/MountVolumesFeatureStep.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/MountVolumesFeatureStep.scala index 78dd6ec21ed34..d47024ca9fe0a 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/MountVolumesFeatureStep.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/MountVolumesFeatureStep.scala @@ -23,6 +23,7 @@ import io.fabric8.kubernetes.api.model._ import org.apache.spark.deploy.k8s._ import org.apache.spark.deploy.k8s.Constants.{ENV_EXECUTOR_ID, SPARK_APP_ID_LABEL} +import org.apache.spark.internal.config.EXECUTOR_INSTANCES private[spark] class MountVolumesFeatureStep(conf: KubernetesConf) extends KubernetesFeatureConfigStep { @@ -71,6 +72,7 @@ private[spark] class MountVolumesFeatureStep(conf: KubernetesConf) case KubernetesPVCVolumeConf(claimNameTemplate, storageClass, size) => val claimName = conf match { case c: KubernetesExecutorConf => + checkPVCClaimName(claimNameTemplate) claimNameTemplate .replaceAll(PVC_ON_DEMAND, s"${conf.resourceNamePrefix}-exec-${c.executorId}$PVC_POSTFIX-$i") @@ -120,6 +122,20 @@ private[spark] class MountVolumesFeatureStep(conf: KubernetesConf) override def getAdditionalKubernetesResources(): Seq[HasMetadata] = { additionalResources.toSeq } + + private def checkPVCClaimName(claimName: String): Unit = { + val executorInstances = conf.get(EXECUTOR_INSTANCES) + if (executorInstances.isDefined && executorInstances.get > 1) { + // PVC ClaimName should contain OnDemand or SPARK_EXECUTOR_ID + // when requiring multiple executors. + // Else, spark continues to try to create the executor pod. + if (!claimName.contains(PVC_ON_DEMAND) && !claimName.contains(ENV_EXECUTOR_ID)) { + throw new IllegalArgumentException(s"PVC ClaimName: $claimName " + + s"should contain $PVC_ON_DEMAND or $ENV_EXECUTOR_ID " + + "when requiring multiple executors") + } + } + } } private[spark] object MountVolumesFeatureStep { diff --git a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/MountVolumesFeatureStepSuite.scala b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/MountVolumesFeatureStepSuite.scala index 468d1dde9fb6d..e428e54d661b3 100644 --- a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/MountVolumesFeatureStepSuite.scala +++ b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/MountVolumesFeatureStepSuite.scala @@ -16,10 +16,13 @@ */ package org.apache.spark.deploy.k8s.features +import java.util.UUID + import scala.collection.JavaConverters._ -import org.apache.spark.SparkFunSuite +import org.apache.spark.{SparkConf, SparkFunSuite} import org.apache.spark.deploy.k8s._ +import org.apache.spark.internal.config.EXECUTOR_INSTANCES class MountVolumesFeatureStepSuite extends SparkFunSuite { test("Mounts hostPath volumes") { @@ -148,6 +151,40 @@ class MountVolumesFeatureStepSuite extends SparkFunSuite { assert(executorPVC.getClaimName.endsWith("-exec-1-pvc-0")) } + test("SPARK-39006: Check PVC ClaimName") { + val claimName = s"pvc-${UUID.randomUUID().toString}" + val volumeConf = KubernetesVolumeSpec( + "testVolume", + "/tmp", + "", + mountReadOnly = true, + KubernetesPVCVolumeConf(claimName) + ) + // Create pvc without specified claimName unsuccessfully when requiring multiple executors + val conf = new SparkConf().set(EXECUTOR_INSTANCES, 2) + var executorConf = + KubernetesTestConf.createExecutorConf(sparkConf = conf, volumes = Seq(volumeConf)) + var executorStep = new MountVolumesFeatureStep(executorConf) + assertThrows[IllegalArgumentException] { + executorStep.configurePod(SparkPod.initialPod()) + } + assert(intercept[IllegalArgumentException] { + executorStep.configurePod(SparkPod.initialPod()) + }.getMessage.equals(s"PVC ClaimName: $claimName " + + "should contain OnDemand or SPARK_EXECUTOR_ID when requiring multiple executors")) + + // Create and mount pvc with any claimName successfully when requiring one executor + conf.set(EXECUTOR_INSTANCES, 1) + executorConf = + KubernetesTestConf.createExecutorConf(sparkConf = conf, volumes = Seq(volumeConf)) + executorStep = new MountVolumesFeatureStep(executorConf) + val executorPod = executorStep.configurePod(SparkPod.initialPod()) + + assert(executorPod.pod.getSpec.getVolumes.size() === 1) + val executorPVC = executorPod.pod.getSpec.getVolumes.get(0).getPersistentVolumeClaim + assert(executorPVC.getClaimName.equals(claimName)) + } + test("Mounts emptyDir") { val volumeConf = KubernetesVolumeSpec( "testVolume",