Skip to content

Commit

Permalink
[SPARK-39006][K8S] Show a directional error message for executor PVC …
Browse files Browse the repository at this point in the history
…dynamic allocation failure

### What changes were proposed in this pull request?

This PR aims to show a directional error message for executor PVC dynamic allocation failure.

### Why are the changes needed?

apache#29846 supports dynamic PVC creation/deletion for K8s executors.
apache#29557 support execId placeholder in executor PVC conf.
If not set `spark.kubernetes.executor.volumes.persistentVolumeClaim.spark-local-dir-1.options.claimName` with `onDemand` or `SPARK_EXECUTOR_ID`, spark will continue to try to create the executor pod.
After this PR, spark can show a directional error message for this situation.
```plain
ERROR ExecutorPodsSnapshotsStoreImpl: Going to stop due to IllegalArgumentException
java.lang.IllegalArgumentException: PVC ClaimName should contain OnDemand or SPARK_EXECUTOR_ID when multiple executors are required
```

### Does this PR introduce _any_ user-facing change?

No

### How was this patch tested?

Add unit test.

Closes apache#36374 from dcoliversun/SPARK-39006.

Authored-by: Qian.Sun <[email protected]>
Signed-off-by: Dongjoon Hyun <[email protected]>
  • Loading branch information
dcoliversun authored and dongjoon-hyun committed May 8, 2022
1 parent fdcbc8c commit b065c94
Show file tree
Hide file tree
Showing 2 changed files with 54 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import io.fabric8.kubernetes.api.model._

import org.apache.spark.deploy.k8s._
import org.apache.spark.deploy.k8s.Constants.{ENV_EXECUTOR_ID, SPARK_APP_ID_LABEL}
import org.apache.spark.internal.config.EXECUTOR_INSTANCES

private[spark] class MountVolumesFeatureStep(conf: KubernetesConf)
extends KubernetesFeatureConfigStep {
Expand Down Expand Up @@ -71,6 +72,7 @@ private[spark] class MountVolumesFeatureStep(conf: KubernetesConf)
case KubernetesPVCVolumeConf(claimNameTemplate, storageClass, size) =>
val claimName = conf match {
case c: KubernetesExecutorConf =>
checkPVCClaimName(claimNameTemplate)
claimNameTemplate
.replaceAll(PVC_ON_DEMAND,
s"${conf.resourceNamePrefix}-exec-${c.executorId}$PVC_POSTFIX-$i")
Expand Down Expand Up @@ -120,6 +122,20 @@ private[spark] class MountVolumesFeatureStep(conf: KubernetesConf)
override def getAdditionalKubernetesResources(): Seq[HasMetadata] = {
additionalResources.toSeq
}

private def checkPVCClaimName(claimName: String): Unit = {
val executorInstances = conf.get(EXECUTOR_INSTANCES)
if (executorInstances.isDefined && executorInstances.get > 1) {
// PVC ClaimName should contain OnDemand or SPARK_EXECUTOR_ID
// when requiring multiple executors.
// Else, spark continues to try to create the executor pod.
if (!claimName.contains(PVC_ON_DEMAND) && !claimName.contains(ENV_EXECUTOR_ID)) {
throw new IllegalArgumentException(s"PVC ClaimName: $claimName " +
s"should contain $PVC_ON_DEMAND or $ENV_EXECUTOR_ID " +
"when requiring multiple executors")
}
}
}
}

private[spark] object MountVolumesFeatureStep {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,13 @@
*/
package org.apache.spark.deploy.k8s.features

import java.util.UUID

import scala.collection.JavaConverters._

import org.apache.spark.SparkFunSuite
import org.apache.spark.{SparkConf, SparkFunSuite}
import org.apache.spark.deploy.k8s._
import org.apache.spark.internal.config.EXECUTOR_INSTANCES

class MountVolumesFeatureStepSuite extends SparkFunSuite {
test("Mounts hostPath volumes") {
Expand Down Expand Up @@ -148,6 +151,40 @@ class MountVolumesFeatureStepSuite extends SparkFunSuite {
assert(executorPVC.getClaimName.endsWith("-exec-1-pvc-0"))
}

test("SPARK-39006: Check PVC ClaimName") {
val claimName = s"pvc-${UUID.randomUUID().toString}"
val volumeConf = KubernetesVolumeSpec(
"testVolume",
"/tmp",
"",
mountReadOnly = true,
KubernetesPVCVolumeConf(claimName)
)
// Create pvc without specified claimName unsuccessfully when requiring multiple executors
val conf = new SparkConf().set(EXECUTOR_INSTANCES, 2)
var executorConf =
KubernetesTestConf.createExecutorConf(sparkConf = conf, volumes = Seq(volumeConf))
var executorStep = new MountVolumesFeatureStep(executorConf)
assertThrows[IllegalArgumentException] {
executorStep.configurePod(SparkPod.initialPod())
}
assert(intercept[IllegalArgumentException] {
executorStep.configurePod(SparkPod.initialPod())
}.getMessage.equals(s"PVC ClaimName: $claimName " +
"should contain OnDemand or SPARK_EXECUTOR_ID when requiring multiple executors"))

// Create and mount pvc with any claimName successfully when requiring one executor
conf.set(EXECUTOR_INSTANCES, 1)
executorConf =
KubernetesTestConf.createExecutorConf(sparkConf = conf, volumes = Seq(volumeConf))
executorStep = new MountVolumesFeatureStep(executorConf)
val executorPod = executorStep.configurePod(SparkPod.initialPod())

assert(executorPod.pod.getSpec.getVolumes.size() === 1)
val executorPVC = executorPod.pod.getSpec.getVolumes.get(0).getPersistentVolumeClaim
assert(executorPVC.getClaimName.equals(claimName))
}

test("Mounts emptyDir") {
val volumeConf = KubernetesVolumeSpec(
"testVolume",
Expand Down

0 comments on commit b065c94

Please sign in to comment.