Skip to content

Commit

Permalink
[SPARK-41525][K8S] Improve onNewSnapshots to use unique lists of kn…
Browse files Browse the repository at this point in the history
…own executor IDs and PVC names

### What changes were proposed in this pull request?

This PR improve `ExecutorPodsAllocator.onNewSnapshots` by removing duplications at `k8sKnownExecIds` and `k8sKnownPVCNames`. In the large cluster, this causes inefficiency.

### Why are the changes needed?

The existing variables have lots of duplications because `snapshots` is `Seq[ExecutorPodsSnapshot]`.
```
val k8sKnownExecIds = snapshots.flatMap(_.executorPods.keys)
```

For example, if we print out the values, it looks like the following.
```
22/12/15 07:09:37 INFO ExecutorPodsAllocator: 1
22/12/15 07:09:37 INFO ExecutorPodsAllocator: 2
22/12/15 07:09:37 INFO ExecutorPodsAllocator: 1
22/12/15 07:09:37 INFO ExecutorPodsAllocator: 2
22/12/15 07:09:37 INFO ExecutorPodsAllocator: 1
22/12/15 07:09:37 INFO ExecutorPodsAllocator: 2
22/12/15 07:09:37 INFO ExecutorPodsAllocator: 1
22/12/15 07:09:37 INFO ExecutorPodsAllocator: 2
22/12/15 07:09:37 INFO ExecutorPodsAllocator: 1
22/12/15 07:09:37 INFO ExecutorPodsAllocator: 2
22/12/15 07:09:37 INFO ExecutorPodsAllocator: 1
22/12/15 07:09:37 INFO ExecutorPodsAllocator: 2
22/12/15 07:09:37 INFO ExecutorPodsAllocator: 3
22/12/15 07:09:37 INFO ExecutorPodsAllocator: 1
22/12/15 07:09:37 INFO ExecutorPodsAllocator: 2
22/12/15 07:09:37 INFO ExecutorPodsAllocator: 3
22/12/15 07:09:37 INFO ExecutorPodsAllocator: 1
22/12/15 07:09:37 INFO ExecutorPodsAllocator: 2
22/12/15 07:09:37 INFO ExecutorPodsAllocator: 3
22/12/15 07:09:37 INFO ExecutorPodsAllocator: 1
22/12/15 07:09:37 INFO ExecutorPodsAllocator: 2
22/12/15 07:09:37 INFO ExecutorPodsAllocator: 3
22/12/15 07:09:37 INFO ExecutorPodsAllocator: 1
22/12/15 07:09:37 INFO ExecutorPodsAllocator: 2
22/12/15 07:09:37 INFO ExecutorPodsAllocator: 3
```

### Does this PR introduce _any_ user-facing change?

No.

### How was this patch tested?

Manual review because this is an improvement on the local variable computation.

Closes #39070 from dongjoon-hyun/SPARK-41525.

Authored-by: Dongjoon Hyun <[email protected]>
Signed-off-by: Dongjoon Hyun <[email protected]>
  • Loading branch information
dongjoon-hyun committed Dec 15, 2022
1 parent 35fa5e6 commit 8b2a2d1
Showing 1 changed file with 2 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -152,7 +152,7 @@ class ExecutorPodsAllocator(
applicationId: String,
schedulerBackend: KubernetesClusterSchedulerBackend,
snapshots: Seq[ExecutorPodsSnapshot]): Unit = {
val k8sKnownExecIds = snapshots.flatMap(_.executorPods.keys)
val k8sKnownExecIds = snapshots.flatMap(_.executorPods.keys).distinct
newlyCreatedExecutors --= k8sKnownExecIds
schedulerKnownNewlyCreatedExecs --= k8sKnownExecIds

Expand All @@ -162,7 +162,7 @@ class ExecutorPodsAllocator(
val k8sKnownPVCNames = snapshots.flatMap(_.executorPods.values.map(_.pod)).flatMap { pod =>
pod.getSpec.getVolumes.asScala
.flatMap { v => Option(v.getPersistentVolumeClaim).map(_.getClaimName) }
}
}.distinct

// transfer the scheduler backend known executor requests from the newlyCreatedExecutors
// to the schedulerKnownNewlyCreatedExecs
Expand Down

0 comments on commit 8b2a2d1

Please sign in to comment.