Skip to content

Commit

Permalink
[SPARK-40459][K8S] recoverDiskStore should not stop by existing rec…
Browse files Browse the repository at this point in the history
…omputed files

### What changes were proposed in this pull request?

This PR aims to ignore `FileExistsException` during `recoverDiskStore` processing.

### Why are the changes needed?

Although `recoverDiskStore` is already wrapped by `tryLogNonFatalError`, a single file recovery exception should not block the whole `recoverDiskStore` .

https://github.com/apache/spark/blob/5938e84e72b81663ccacf0b36c2f8271455de292/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/shuffle/KubernetesLocalDiskShuffleExecutorComponents.scala#L45-L47

```
org.apache.commons.io.FileExistsException: ...
  at org.apache.commons.io.FileUtils.requireAbsent(FileUtils.java:2587)
  at org.apache.commons.io.FileUtils.moveFile(FileUtils.java:2305)
  at org.apache.commons.io.FileUtils.moveFile(FileUtils.java:2283)
  at org.apache.spark.storage.DiskStore.moveFileToBlock(DiskStore.scala:150)
  at org.apache.spark.storage.BlockManager$TempFileBasedBlockStoreUpdater.saveToDiskStore(BlockManager.scala:487)
  at org.apache.spark.storage.BlockManager$BlockStoreUpdater.$anonfun$save$1(BlockManager.scala:407)
  at org.apache.spark.storage.BlockManager.org$apache$spark$storage$BlockManager$$doPut(BlockManager.scala:1445)
  at org.apache.spark.storage.BlockManager$BlockStoreUpdater.save(BlockManager.scala:380)
  at org.apache.spark.storage.BlockManager$TempFileBasedBlockStoreUpdater.save(BlockManager.scala:490)
  at org.apache.spark.shuffle.KubernetesLocalDiskShuffleExecutorComponents$.$anonfun$recoverDiskStore$14(KubernetesLocalDiskShuffleExecutorComponents.scala:95)
  at scala.collection.IndexedSeqOptimized.foreach(IndexedSeqOptimized.scala:36)
  at scala.collection.IndexedSeqOptimized.foreach$(IndexedSeqOptimized.scala:33)
  at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:198)
  at org.apache.spark.shuffle.KubernetesLocalDiskShuffleExecutorComponents$.recoverDiskStore(KubernetesLocalDiskShuffleExecutorComponents.scala:91)
```

### Does this PR introduce _any_ user-facing change?

No, this will improve the recover rate.

### How was this patch tested?

Pass the CIs.

Closes apache#37903 from dongjoon-hyun/SPARK-40459.

Authored-by: Dongjoon Hyun <[email protected]>
Signed-off-by: Dongjoon Hyun <[email protected]>
(cherry picked from commit f24bb43)
Signed-off-by: Dongjoon Hyun <[email protected]>
  • Loading branch information
dongjoon-hyun authored and sunchao committed Sep 19, 2022
1 parent 71f87c3 commit 44c31cd
Showing 1 changed file with 4 additions and 0 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@ import java.util.Optional

import scala.reflect.ClassTag

import org.apache.commons.io.FileExistsException

import org.apache.spark.{SparkConf, SparkEnv}
import org.apache.spark.internal.Logging
import org.apache.spark.shuffle.api.{ShuffleExecutorComponents, ShuffleMapOutputWriter, SingleSpillShuffleMapOutputWriter}
Expand Down Expand Up @@ -95,6 +97,8 @@ object KubernetesLocalDiskShuffleExecutorComponents extends Logging {
bm.TempFileBasedBlockStoreUpdater(id, level, classTag, f, decryptedSize).save()
} catch {
case _: UnrecognizedBlockId =>
case _: FileExistsException =>
// This may happen due to recompute, but we continue to recover next files
}
}
}
Expand Down

0 comments on commit 44c31cd

Please sign in to comment.