Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-39006][K8S] Check PVC claimName must be OnDemand when multiple executors required #36333

Closed
wants to merge 3 commits into from

Conversation

dcoliversun
Copy link
Contributor

What changes were proposed in this pull request?

This PR aims to check PVC claimName must be OnDemand when multiple executors required.

Why are the changes needed?

#29846 supports dynamic PVC creation/deletion for K8s executors. When not set spark.kubernetes.executor.volumes.persistentVolumeClaim.spark-local-dir-1.options.claimName be onDemand, it will retry to create executor pods long time because pvc with same name already exists.

22/04/22 08:55:47 WARN ExecutorPodsSnapshotsStoreImpl: Exception when notifying snapshot subscriber.
io.fabric8.kubernetes.client.KubernetesClientException: Failure executing: POST at: https://kubernetes.default.svc/api/v1/namespaces/default/persistentvolumeclaims. Message: persistentvolumeclaims "test-1" already exists. Received status: Status(apiVersion=v1, code=409, details=StatusDetails(causes=[], group=null, kind=persistentvolumeclaims, name=test-1, retryAfterSeconds=null, uid=null, additionalProperties={}), kind=Status, message=persistentvolumeclaims "test-1" already exists, metadata=ListMeta(_continue=null, remainingItemCount=null, resourceVersion=null, selfLink=null, additionalProperties={}), reason=AlreadyExists, status=Failure, additionalProperties={}).
        at io.fabric8.kubernetes.client.dsl.base.OperationSupport.requestFailure(OperationSupport.java:697) ~[kubernetes-client-5.10.1.jar:?]
        at io.fabric8.kubernetes.client.dsl.base.OperationSupport.requestFailure(OperationSupport.java:676) ~[kubernetes-client-5.10.1.jar:?]
        at io.fabric8.kubernetes.client.dsl.base.OperationSupport.assertResponseCode(OperationSupport.java:629) ~[kubernetes-client-5.10.1.jar:?]
        at io.fabric8.kubernetes.client.dsl.base.OperationSupport.handleResponse(OperationSupport.java:566) ~[kubernetes-client-5.10.1.jar:?]
        at io.fabric8.kubernetes.client.dsl.base.OperationSupport.handleResponse(OperationSupport.java:527) ~[kubernetes-client-5.10.1.jar:?]
        at io.fabric8.kubernetes.client.dsl.base.OperationSupport.handleCreate(OperationSupport.java:315) ~[kubernetes-client-5.10.1.jar:?]
        at io.fabric8.kubernetes.client.dsl.base.BaseOperation.handleCreate(BaseOperation.java:651) ~[kubernetes-client-5.10.1.jar:?]
        at io.fabric8.kubernetes.client.dsl.base.BaseOperation.handleCreate(BaseOperation.java:91) ~[kubernetes-client-5.10.1.jar:?]
        at io.fabric8.kubernetes.client.dsl.base.CreateOnlyResourceOperation.create(CreateOnlyResourceOperation.java:61) ~[kubernetes-client-5.10.1.jar:?]
        at org.apache.spark.scheduler.cluster.k8s.ExecutorPodsAllocator.$anonfun$requestNewExecutors$3(ExecutorPodsAllocator.scala:415) ~[spark-kubernetes_2.12-3.3.0-SNAPSHOT.jar:3.3.0-SNAPSHOT]
        at scala.collection.immutable.List.foreach(List.scala:431) ~[scala-library-2.12.15.jar:?]
        at org.apache.spark.scheduler.cluster.k8s.ExecutorPodsAllocator.$anonfun$requestNewExecutors$1(ExecutorPodsAllocator.scala:408) ~[spark-kubernetes_2.12-3.3.0-SNAPSHOT.jar:3.3.0-SNAPSHOT]
        at scala.collection.immutable.Range.foreach$mVc$sp(Range.scala:158) ~[scala-library-2.12.15.jar:?]
        at org.apache.spark.scheduler.cluster.k8s.ExecutorPodsAllocator.requestNewExecutors(ExecutorPodsAllocator.scala:385) ~[spark-kubernetes_2.12-3.3.0-SNAPSHOT.jar:3.3.0-SNAPSHOT]
        at org.apache.spark.scheduler.cluster.k8s.ExecutorPodsAllocator.$anonfun$onNewSnapshots$35(ExecutorPodsAllocator.scala:349) ~[spark-kubernetes_2.12-3.3.0-SNAPSHOT.jar:3.3.0-SNAPSHOT]
        at org.apache.spark.scheduler.cluster.k8s.ExecutorPodsAllocator.$anonfun$onNewSnapshots$35$adapted(ExecutorPodsAllocator.scala:342) ~[spark-kubernetes_2.12-3.3.0-SNAPSHOT.jar:3.3.0-SNAPSHOT]
        at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62) ~[scala-library-2.12.15.jar:?]
        at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55) ~[scala-library-2.12.15.jar:?]
        at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49) ~[scala-library-2.12.15.jar:?]
        at org.apache.spark.scheduler.cluster.k8s.ExecutorPodsAllocator.onNewSnapshots(ExecutorPodsAllocator.scala:342) ~[spark-kubernetes_2.12-3.3.0-SNAPSHOT.jar:3.3.0-SNAPSHOT]
        at org.apache.spark.scheduler.cluster.k8s.ExecutorPodsAllocator.$anonfun$start$3(ExecutorPodsAllocator.scala:120) ~[spark-kubernetes_2.12-3.3.0-SNAPSHOT.jar:3.3.0-SNAPSHOT]
        at org.apache.spark.scheduler.cluster.k8s.ExecutorPodsAllocator.$anonfun$start$3$adapted(ExecutorPodsAllocator.scala:120) ~[spark-kubernetes_2.12-3.3.0-SNAPSHOT.jar:3.3.0-SNAPSHOT]
        at org.apache.spark.scheduler.cluster.k8s.ExecutorPodsSnapshotsStoreImpl$SnapshotsSubscriber.org$apache$spark$scheduler$cluster$k8s$ExecutorPodsSnapshotsStoreImpl$SnapshotsSubscriber$$processSnapshotsInternal(ExecutorPodsSnapshotsStoreImpl.scala:138) ~[spark-kubernetes_2.12-3.3.0-SNAPSHOT.jar:3.3.0-SNAPSHOT]
        at org.apache.spark.scheduler.cluster.k8s.ExecutorPodsSnapshotsStoreImpl$SnapshotsSubscriber.processSnapshots(ExecutorPodsSnapshotsStoreImpl.scala:126) ~[spark-kubernetes_2.12-3.3.0-SNAPSHOT.jar:3.3.0-SNAPSHOT]
        at org.apache.spark.scheduler.cluster.k8s.ExecutorPodsSnapshotsStoreImpl.$anonfun$addSubscriber$1(ExecutorPodsSnapshotsStoreImpl.scala:81) ~[spark-kubernetes_2.12-3.3.0-SNAPSHOT.jar:3.3.0-SNAPSHOT]
        at java.util.concurrent.Executors$RunnableAdapter.call(Unknown Source) [?:?]
        at java.util.concurrent.FutureTask.runAndReset(Unknown Source) [?:?]
        at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(Unknown Source) [?:?]
        at java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source) [?:?]
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source) [?:?]
        at java.lang.Thread.run(Unknown Source) [?:?]

After this PR, spark application exits with IllegalArgumentException at once and help user to fix parameter.

2022/04/24 06:47:59 ERROR ExecutorPodsSnapshotsStoreImpl: Going to stop due to IllegalArgumentException
java.lang.IllegalArgumentException: ClaimName of PVC must be OnDemand when multiple executors required
        at org.apache.spark.deploy.k8s.features.MountVolumesFeatureStep.checkPVCOnDemandWhenMultiExecutors(MountVolumesFeatureStep.scala:133) ~[spark-kubernetes_2.12-3.4.0-SNAPSHOT.jar:3.4.0-SNAPSHOT]
        at org.apache.spark.deploy.k8s.features.MountVolumesFeatureStep.$anonfun$constructVolumes$4(MountVolumesFeatureStep.scala:75) ~[spark-kubernetes_2.12-3.4.0-SNAPSHOT.jar:3.4.0-SNAPSHOT]
        at scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:286) ~[scala-library-2.12.15.jar:?]
        at scala.collection.Iterator.foreach(Iterator.scala:943) ~[scala-library-2.12.15.jar:?]
        at scala.collection.Iterator.foreach$(Iterator.scala:943) ~[scala-library-2.12.15.jar:?]
        at scala.collection.AbstractIterator.foreach(Iterator.scala:1431) ~[scala-library-2.12.15.jar:?]
        at scala.collection.IterableLike.foreach(IterableLike.scala:74) ~[scala-library-2.12.15.jar:?]
        at scala.collection.IterableLike.foreach$(IterableLike.scala:73) ~[scala-library-2.12.15.jar:?]
        at scala.collection.AbstractIterable.foreach(Iterable.scala:56) ~[scala-library-2.12.15.jar:?]
        at scala.collection.TraversableLike.map(TraversableLike.scala:286) ~[scala-library-2.12.15.jar:?]
        at scala.collection.TraversableLike.map$(TraversableLike.scala:279) ~[scala-library-2.12.15.jar:?]
        at scala.collection.AbstractTraversable.map(Traversable.scala:108) ~[scala-library-2.12.15.jar:?]
        at org.apache.spark.deploy.k8s.features.MountVolumesFeatureStep.constructVolumes(MountVolumesFeatureStep.scala:58) ~[spark-kubernetes_2.12-3.4.0-SNAPSHOT.jar:3.4.0-SNAPSHOT]
        at org.apache.spark.deploy.k8s.features.MountVolumesFeatureStep.configurePod(MountVolumesFeatureStep.scala:35) ~[spark-kubernetes_2.12-3.4.0-SNAPSHOT.jar:3.4.0-SNAPSHOT]
        at org.apache.spark.scheduler.cluster.k8s.KubernetesExecutorBuilder.$anonfun$buildFromFeatures$5(KubernetesExecutorBuilder.scala:83) ~[spark-kubernetes_2.12-3.4.0-SNAPSHOT.jar:3.4.0-SNAPSHOT]

Does this PR introduce any user-facing change?

No.

How was this patch tested?

Add new unit test.

@dcoliversun
Copy link
Contributor Author

Pass IT test

$ build/sbt -Pkubernetes -Pkubernetes-integration-tests -Dtest.exclude.tags=r -Dspark.kubernetes.test.imageRepo=kubespark "kubernetes-integration-tests/test"
[info] KubernetesSuite:
[info] - Run SparkPi with no resources (16 seconds, 928 milliseconds)
[info] - Run SparkPi with no resources & statefulset allocation (15 seconds, 302 milliseconds)
[info] - Run SparkPi with a very long application name. (15 seconds, 495 milliseconds)
[info] - Use SparkLauncher.NO_RESOURCE (15 seconds, 706 milliseconds)
[info] - Run SparkPi with a master URL without a scheme. (13 seconds, 751 milliseconds)
[info] - Run SparkPi with an argument. (13 seconds, 437 milliseconds)
[info] - Run SparkPi with custom labels, annotations, and environment variables. (13 seconds, 311 milliseconds)
[info] - All pods have the same service account by default (13 seconds, 899 milliseconds)
[info] - Run extraJVMOptions check on driver (7 seconds, 497 milliseconds)
[info] - Run SparkRemoteFileTest using a remote data file (18 seconds, 99 milliseconds)
[info] - Verify logging configuration is picked from the provided SPARK_CONF_DIR/log4j2.properties (16 seconds, 843 milliseconds)
[info] - Run SparkPi with env and mount secrets. (29 seconds, 539 milliseconds)
[info] - Run PySpark on simple pi.py example (23 seconds, 127 milliseconds)
[info] - Run PySpark to test a pyfiles example (19 seconds, 417 milliseconds)
[info] - Run PySpark with memory customization (17 seconds, 704 milliseconds)
[info] - Run in client mode. (10 seconds, 484 milliseconds)
[info] - Start pod creation from template (15 seconds, 88 milliseconds)
[info] - SPARK-38398: Schedule pod creation from template (26 seconds, 416 milliseconds)
[info] - PVs with local hostpath storage on statefulsets (17 seconds, 321 milliseconds)
[info] - PVs with local hostpath and storageClass on statefulsets (17 seconds, 712 milliseconds)
[info] - PVs with local storage (18 seconds, 933 milliseconds)
[info] - Launcher client dependencies (2 minutes, 34 seconds)
[info] - SPARK-33615: Launcher client archives (1 minute, 50 seconds)
[info] - SPARK-33748: Launcher python client respecting PYSPARK_PYTHON (1 minute, 41 seconds)
[info] - SPARK-33748: Launcher python client respecting spark.pyspark.python and spark.pyspark.driver.python (1 minute, 40 seconds)
[info] - Launcher python client dependencies using a zip file (1 minute, 41 seconds)
[info] - Test basic decommissioning (52 seconds, 329 milliseconds)
[info] - Test basic decommissioning with shuffle cleanup (49 seconds, 25 milliseconds)
[info] - Test decommissioning with dynamic allocation & shuffle cleanups (2 minutes, 52 seconds)
[info] - Test decommissioning timeouts (48 seconds, 306 milliseconds)
[info] - SPARK-37576: Rolling decommissioning (1 minute, 9 seconds)
[info] Run completed in 27 minutes, 27 seconds.
[info] Total number of tests run: 31
[info] Suites: completed 1, aborted 0
[info] Tests: succeeded 31, failed 0, canceled 0, ignored 0, pending 0
[info] All tests passed.
[success] Total time: 2159 s (35:59), completed 2022-4-24 16:41:04

@dcoliversun
Copy link
Contributor Author

@dongjoon-hyun
It would be good if you could take a look when you have time, thanks!

@AmplabJenkins
Copy link

Can one of the admins verify this patch?

val executorConf =
KubernetesTestConf.createExecutorConf(sparkConf = conf, volumes = Seq(volumeConf))
val executorStep = new MountVolumesFeatureStep(executorConf)
assertThrows[IllegalArgumentException] {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

IMO it would be good to assert on the exception message too. Or use a specialized exception.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for your help. I have addressed this comment.

dcoliversun and others added 2 commits April 26, 2022 17:55
…ark/deploy/k8s/features/MountVolumesFeatureStep.scala

Co-authored-by: Martin Grigorov <[email protected]>
Copy link
Member

@dongjoon-hyun dongjoon-hyun left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry, but -1 for this proposal because this will break SPARK-32713.
Apache Spark has been supporting claimName=pvc-spark-SPARK_EXECUTOR_ID style before OnDemand feature.

@dongjoon-hyun
Copy link
Member

Let me close this first. We can continue to discuss and reopen if there is a valid reason, @dcoliversun .

@dcoliversun
Copy link
Contributor Author

@dongjoon-hyun I can also check claimName must contain ENV_EXECUTOR_ID to meet SPARK-32713.
This PR is to improve the user experience. When the pvc cannot be created, spark will continue to try to create the executor pod. For users who are not familiar with this configuration, they will spend time and energy on code troubleshooting. If intuitive exception information can be thrown here, it can help users quickly locate and solve problems.

@dongjoon-hyun
Copy link
Member

dongjoon-hyun commented Apr 27, 2022

To me, this PR sounds like a misleading UX instead of improving UX.

Check PVC claimName must be OnDemand when multiple executors required

@dcoliversun
Copy link
Contributor Author

I could change the title as Check PVC dynamic allocation when multiple executors required. @dongjoon-hyun

@dongjoon-hyun
Copy link
Member

You are still misunderstanding SPARK-32713 which supports multiple executors :)

I could change the title as Check PVC dynamic allocation when multiple executors required

@dongjoon-hyun
Copy link
Member

@dcoliversun If you want to continue in your way, I'd like to recommend to revise the JIRA issue first like Show a directional error message for .... Then, make another PR for your that error message improvement.

@dcoliversun
Copy link
Contributor Author

@dongjoon-hyun Thanks for your help and I will revise the JIRA issue. I want to confirm with you that my understanding is correct.
When claimName contains onDemand or SPARK_EXECUTOR_ID, pvc can dynamic allocation.

@dcoliversun
Copy link
Contributor Author

replace with #36374

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants