diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/MountVolumesFeatureStep.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/MountVolumesFeatureStep.scala index 78dd6ec21ed34..76e79024e715d 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/MountVolumesFeatureStep.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/MountVolumesFeatureStep.scala @@ -23,6 +23,7 @@ import io.fabric8.kubernetes.api.model._ import org.apache.spark.deploy.k8s._ import org.apache.spark.deploy.k8s.Constants.{ENV_EXECUTOR_ID, SPARK_APP_ID_LABEL} +import org.apache.spark.internal.config.EXECUTOR_INSTANCES private[spark] class MountVolumesFeatureStep(conf: KubernetesConf) extends KubernetesFeatureConfigStep { @@ -71,6 +72,7 @@ private[spark] class MountVolumesFeatureStep(conf: KubernetesConf) case KubernetesPVCVolumeConf(claimNameTemplate, storageClass, size) => val claimName = conf match { case c: KubernetesExecutorConf => + checkPVCOnDemandWhenMultiExecutors(claimNameTemplate) claimNameTemplate .replaceAll(PVC_ON_DEMAND, s"${conf.resourceNamePrefix}-exec-${c.executorId}$PVC_POSTFIX-$i") @@ -120,6 +122,16 @@ private[spark] class MountVolumesFeatureStep(conf: KubernetesConf) override def getAdditionalKubernetesResources(): Seq[HasMetadata] = { additionalResources.toSeq } + + private def checkPVCOnDemandWhenMultiExecutors(claimName: String): Unit = { + val executorInstances = conf.get(EXECUTOR_INSTANCES) + if (executorInstances.isEmpty) return + if (claimName != PVC_ON_DEMAND && executorInstances.get > 1) { + throw new IllegalArgumentException("ClaimName of PVC must be " + + PVC_ON_DEMAND + + " when multiple executors are required") + } + } } private[spark] object MountVolumesFeatureStep { diff --git a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/MountVolumesFeatureStepSuite.scala b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/MountVolumesFeatureStepSuite.scala index 468d1dde9fb6d..d09b44dbb2c2f 100644 --- a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/MountVolumesFeatureStepSuite.scala +++ b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/MountVolumesFeatureStepSuite.scala @@ -18,8 +18,9 @@ package org.apache.spark.deploy.k8s.features import scala.collection.JavaConverters._ -import org.apache.spark.SparkFunSuite +import org.apache.spark.{SparkConf, SparkFunSuite} import org.apache.spark.deploy.k8s._ +import org.apache.spark.internal.config.EXECUTOR_INSTANCES class MountVolumesFeatureStepSuite extends SparkFunSuite { test("Mounts hostPath volumes") { @@ -148,6 +149,26 @@ class MountVolumesFeatureStepSuite extends SparkFunSuite { assert(executorPVC.getClaimName.endsWith("-exec-1-pvc-0")) } + test("SPARK-39006 PVC claimName must be onDemand when multiple executors") { + val volumeConf = KubernetesVolumeSpec( + "testVolume", + "/tmp", + "", + mountReadOnly = true, + KubernetesPVCVolumeConf("testClaimName") + ) + val conf = new SparkConf().set(EXECUTOR_INSTANCES, 2) + val executorConf = + KubernetesTestConf.createExecutorConf(sparkConf = conf, volumes = Seq(volumeConf)) + val executorStep = new MountVolumesFeatureStep(executorConf) + assertThrows[IllegalArgumentException] { + executorStep.configurePod(SparkPod.initialPod()) + } + assert(intercept[IllegalArgumentException] { + executorStep.configurePod(SparkPod.initialPod()) + }.getMessage.contains("ClaimName of PVC must be OnDemand")) + } + test("Mounts emptyDir") { val volumeConf = KubernetesVolumeSpec( "testVolume",