Skip to content

Commit

Permalink
[SPARK-43342][K8S] Revert SPARK-39006 Show a directional error messag…
Browse files Browse the repository at this point in the history
…e for executor PVC dynamic allocation failure

### What changes were proposed in this pull request?

This reverts commit b065c94.

### Why are the changes needed?

To remove the regression from SPARK-39006.

### Does this PR introduce _any_ user-facing change?

No.

### How was this patch tested?

Pass the CIs.

Closes apache#41057

Closes apache#41069 from dongjoon-hyun/SPARK-43342.

Authored-by: Qian.Sun <[email protected]>
Signed-off-by: Dongjoon Hyun <[email protected]>
(cherry picked from commit 3ba1fa3)
Signed-off-by: Dongjoon Hyun <[email protected]>
  • Loading branch information
dcoliversun authored and Gladwin committed Oct 10, 2023
1 parent eea5dac commit b824fc5
Show file tree
Hide file tree
Showing 2 changed files with 1 addition and 54 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@ import io.fabric8.kubernetes.api.model._

import org.apache.spark.deploy.k8s._
import org.apache.spark.deploy.k8s.Constants.{ENV_EXECUTOR_ID, SPARK_APP_ID_LABEL}
import org.apache.spark.internal.config.EXECUTOR_INSTANCES

private[spark] class MountVolumesFeatureStep(conf: KubernetesConf)
extends KubernetesFeatureConfigStep {
Expand Down Expand Up @@ -72,7 +71,6 @@ private[spark] class MountVolumesFeatureStep(conf: KubernetesConf)
case KubernetesPVCVolumeConf(claimNameTemplate, storageClass, size) =>
val claimName = conf match {
case c: KubernetesExecutorConf =>
checkPVCClaimName(claimNameTemplate)
claimNameTemplate
.replaceAll(PVC_ON_DEMAND,
s"${conf.resourceNamePrefix}-exec-${c.executorId}$PVC_POSTFIX-$i")
Expand Down Expand Up @@ -122,20 +120,6 @@ private[spark] class MountVolumesFeatureStep(conf: KubernetesConf)
override def getAdditionalKubernetesResources(): Seq[HasMetadata] = {
additionalResources.toSeq
}

private def checkPVCClaimName(claimName: String): Unit = {
val executorInstances = conf.get(EXECUTOR_INSTANCES)
if (executorInstances.isDefined && executorInstances.get > 1) {
// PVC ClaimName should contain OnDemand or SPARK_EXECUTOR_ID
// when requiring multiple executors.
// Else, spark continues to try to create the executor pod.
if (!claimName.contains(PVC_ON_DEMAND) && !claimName.contains(ENV_EXECUTOR_ID)) {
throw new IllegalArgumentException(s"PVC ClaimName: $claimName " +
s"should contain $PVC_ON_DEMAND or $ENV_EXECUTOR_ID " +
"when requiring multiple executors")
}
}
}
}

private[spark] object MountVolumesFeatureStep {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,13 +16,10 @@
*/
package org.apache.spark.deploy.k8s.features

import java.util.UUID

import scala.collection.JavaConverters._

import org.apache.spark.{SparkConf, SparkFunSuite}
import org.apache.spark.SparkFunSuite
import org.apache.spark.deploy.k8s._
import org.apache.spark.internal.config.EXECUTOR_INSTANCES

class MountVolumesFeatureStepSuite extends SparkFunSuite {
test("Mounts hostPath volumes") {
Expand Down Expand Up @@ -151,40 +148,6 @@ class MountVolumesFeatureStepSuite extends SparkFunSuite {
assert(executorPVC.getClaimName.endsWith("-exec-1-pvc-0"))
}

test("SPARK-39006: Check PVC ClaimName") {
val claimName = s"pvc-${UUID.randomUUID().toString}"
val volumeConf = KubernetesVolumeSpec(
"testVolume",
"/tmp",
"",
mountReadOnly = true,
KubernetesPVCVolumeConf(claimName)
)
// Create pvc without specified claimName unsuccessfully when requiring multiple executors
val conf = new SparkConf().set(EXECUTOR_INSTANCES, 2)
var executorConf =
KubernetesTestConf.createExecutorConf(sparkConf = conf, volumes = Seq(volumeConf))
var executorStep = new MountVolumesFeatureStep(executorConf)
assertThrows[IllegalArgumentException] {
executorStep.configurePod(SparkPod.initialPod())
}
assert(intercept[IllegalArgumentException] {
executorStep.configurePod(SparkPod.initialPod())
}.getMessage.equals(s"PVC ClaimName: $claimName " +
"should contain OnDemand or SPARK_EXECUTOR_ID when requiring multiple executors"))

// Create and mount pvc with any claimName successfully when requiring one executor
conf.set(EXECUTOR_INSTANCES, 1)
executorConf =
KubernetesTestConf.createExecutorConf(sparkConf = conf, volumes = Seq(volumeConf))
executorStep = new MountVolumesFeatureStep(executorConf)
val executorPod = executorStep.configurePod(SparkPod.initialPod())

assert(executorPod.pod.getSpec.getVolumes.size() === 1)
val executorPVC = executorPod.pod.getSpec.getVolumes.get(0).getPersistentVolumeClaim
assert(executorPVC.getClaimName.equals(claimName))
}

test("Mounts emptyDir") {
val volumeConf = KubernetesVolumeSpec(
"testVolume",
Expand Down

0 comments on commit b824fc5

Please sign in to comment.