Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-40459][K8S] recoverDiskStore should not stop by existing recomputed files #37903

Closed
wants to merge 1 commit into from
Closed

Conversation

dongjoon-hyun
Copy link
Member

@dongjoon-hyun dongjoon-hyun commented Sep 16, 2022

What changes were proposed in this pull request?

This PR aims to ignore FileExistsException during recoverDiskStore processing.

Why are the changes needed?

Although recoverDiskStore is already wrapped by tryLogNonFatalError, a single file recovery exception should not block the whole recoverDiskStore .

Utils.tryLogNonFatalError {
KubernetesLocalDiskShuffleExecutorComponents.recoverDiskStore(sparkConf, blockManager)
}

org.apache.commons.io.FileExistsException: ...
  at org.apache.commons.io.FileUtils.requireAbsent(FileUtils.java:2587)
  at org.apache.commons.io.FileUtils.moveFile(FileUtils.java:2305)
  at org.apache.commons.io.FileUtils.moveFile(FileUtils.java:2283)
  at org.apache.spark.storage.DiskStore.moveFileToBlock(DiskStore.scala:150)
  at org.apache.spark.storage.BlockManager$TempFileBasedBlockStoreUpdater.saveToDiskStore(BlockManager.scala:487)
  at org.apache.spark.storage.BlockManager$BlockStoreUpdater.$anonfun$save$1(BlockManager.scala:407)
  at org.apache.spark.storage.BlockManager.org$apache$spark$storage$BlockManager$$doPut(BlockManager.scala:1445)
  at org.apache.spark.storage.BlockManager$BlockStoreUpdater.save(BlockManager.scala:380)
  at org.apache.spark.storage.BlockManager$TempFileBasedBlockStoreUpdater.save(BlockManager.scala:490)
  at org.apache.spark.shuffle.KubernetesLocalDiskShuffleExecutorComponents$.$anonfun$recoverDiskStore$14(KubernetesLocalDiskShuffleExecutorComponents.scala:95)
  at scala.collection.IndexedSeqOptimized.foreach(IndexedSeqOptimized.scala:36)
  at scala.collection.IndexedSeqOptimized.foreach$(IndexedSeqOptimized.scala:33)
  at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:198)
  at org.apache.spark.shuffle.KubernetesLocalDiskShuffleExecutorComponents$.recoverDiskStore(KubernetesLocalDiskShuffleExecutorComponents.scala:91) 

Does this PR introduce any user-facing change?

No, this will improve the recover rate.

How was this patch tested?

Pass the CIs.

@dongjoon-hyun
Copy link
Member Author

This is a little difficult to have a test coverage. Could you review this additional exception catching, @viirya ?

@viirya
Copy link
Member

viirya commented Sep 16, 2022

lgtm

@dongjoon-hyun
Copy link
Member Author

Thank you, @viirya . Merged to master/3.3/3.2.

dongjoon-hyun added a commit that referenced this pull request Sep 16, 2022
…omputed files

### What changes were proposed in this pull request?

This PR aims to ignore `FileExistsException` during `recoverDiskStore` processing.

### Why are the changes needed?

Although `recoverDiskStore` is already wrapped by `tryLogNonFatalError`, a single file recovery exception should not block the whole `recoverDiskStore` .

https://github.com/apache/spark/blob/5938e84e72b81663ccacf0b36c2f8271455de292/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/shuffle/KubernetesLocalDiskShuffleExecutorComponents.scala#L45-L47

```
org.apache.commons.io.FileExistsException: ...
  at org.apache.commons.io.FileUtils.requireAbsent(FileUtils.java:2587)
  at org.apache.commons.io.FileUtils.moveFile(FileUtils.java:2305)
  at org.apache.commons.io.FileUtils.moveFile(FileUtils.java:2283)
  at org.apache.spark.storage.DiskStore.moveFileToBlock(DiskStore.scala:150)
  at org.apache.spark.storage.BlockManager$TempFileBasedBlockStoreUpdater.saveToDiskStore(BlockManager.scala:487)
  at org.apache.spark.storage.BlockManager$BlockStoreUpdater.$anonfun$save$1(BlockManager.scala:407)
  at org.apache.spark.storage.BlockManager.org$apache$spark$storage$BlockManager$$doPut(BlockManager.scala:1445)
  at org.apache.spark.storage.BlockManager$BlockStoreUpdater.save(BlockManager.scala:380)
  at org.apache.spark.storage.BlockManager$TempFileBasedBlockStoreUpdater.save(BlockManager.scala:490)
  at org.apache.spark.shuffle.KubernetesLocalDiskShuffleExecutorComponents$.$anonfun$recoverDiskStore$14(KubernetesLocalDiskShuffleExecutorComponents.scala:95)
  at scala.collection.IndexedSeqOptimized.foreach(IndexedSeqOptimized.scala:36)
  at scala.collection.IndexedSeqOptimized.foreach$(IndexedSeqOptimized.scala:33)
  at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:198)
  at org.apache.spark.shuffle.KubernetesLocalDiskShuffleExecutorComponents$.recoverDiskStore(KubernetesLocalDiskShuffleExecutorComponents.scala:91)
```

### Does this PR introduce _any_ user-facing change?

No, this will improve the recover rate.

### How was this patch tested?

Pass the CIs.

Closes #37903 from dongjoon-hyun/SPARK-40459.

Authored-by: Dongjoon Hyun <[email protected]>
Signed-off-by: Dongjoon Hyun <[email protected]>
(cherry picked from commit f24bb43)
Signed-off-by: Dongjoon Hyun <[email protected]>
dongjoon-hyun added a commit that referenced this pull request Sep 16, 2022
…omputed files

### What changes were proposed in this pull request?

This PR aims to ignore `FileExistsException` during `recoverDiskStore` processing.

### Why are the changes needed?

Although `recoverDiskStore` is already wrapped by `tryLogNonFatalError`, a single file recovery exception should not block the whole `recoverDiskStore` .

https://github.com/apache/spark/blob/5938e84e72b81663ccacf0b36c2f8271455de292/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/shuffle/KubernetesLocalDiskShuffleExecutorComponents.scala#L45-L47

```
org.apache.commons.io.FileExistsException: ...
  at org.apache.commons.io.FileUtils.requireAbsent(FileUtils.java:2587)
  at org.apache.commons.io.FileUtils.moveFile(FileUtils.java:2305)
  at org.apache.commons.io.FileUtils.moveFile(FileUtils.java:2283)
  at org.apache.spark.storage.DiskStore.moveFileToBlock(DiskStore.scala:150)
  at org.apache.spark.storage.BlockManager$TempFileBasedBlockStoreUpdater.saveToDiskStore(BlockManager.scala:487)
  at org.apache.spark.storage.BlockManager$BlockStoreUpdater.$anonfun$save$1(BlockManager.scala:407)
  at org.apache.spark.storage.BlockManager.org$apache$spark$storage$BlockManager$$doPut(BlockManager.scala:1445)
  at org.apache.spark.storage.BlockManager$BlockStoreUpdater.save(BlockManager.scala:380)
  at org.apache.spark.storage.BlockManager$TempFileBasedBlockStoreUpdater.save(BlockManager.scala:490)
  at org.apache.spark.shuffle.KubernetesLocalDiskShuffleExecutorComponents$.$anonfun$recoverDiskStore$14(KubernetesLocalDiskShuffleExecutorComponents.scala:95)
  at scala.collection.IndexedSeqOptimized.foreach(IndexedSeqOptimized.scala:36)
  at scala.collection.IndexedSeqOptimized.foreach$(IndexedSeqOptimized.scala:33)
  at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:198)
  at org.apache.spark.shuffle.KubernetesLocalDiskShuffleExecutorComponents$.recoverDiskStore(KubernetesLocalDiskShuffleExecutorComponents.scala:91)
```

### Does this PR introduce _any_ user-facing change?

No, this will improve the recover rate.

### How was this patch tested?

Pass the CIs.

Closes #37903 from dongjoon-hyun/SPARK-40459.

Authored-by: Dongjoon Hyun <[email protected]>
Signed-off-by: Dongjoon Hyun <[email protected]>
(cherry picked from commit f24bb43)
Signed-off-by: Dongjoon Hyun <[email protected]>
@dongjoon-hyun dongjoon-hyun deleted the SPARK-40459 branch September 16, 2022 01:05
LuciferYang pushed a commit to LuciferYang/spark that referenced this pull request Sep 20, 2022
…omputed files

### What changes were proposed in this pull request?

This PR aims to ignore `FileExistsException` during `recoverDiskStore` processing.

### Why are the changes needed?

Although `recoverDiskStore` is already wrapped by `tryLogNonFatalError`, a single file recovery exception should not block the whole `recoverDiskStore` .

https://github.com/apache/spark/blob/5938e84e72b81663ccacf0b36c2f8271455de292/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/shuffle/KubernetesLocalDiskShuffleExecutorComponents.scala#L45-L47

```
org.apache.commons.io.FileExistsException: ...
  at org.apache.commons.io.FileUtils.requireAbsent(FileUtils.java:2587)
  at org.apache.commons.io.FileUtils.moveFile(FileUtils.java:2305)
  at org.apache.commons.io.FileUtils.moveFile(FileUtils.java:2283)
  at org.apache.spark.storage.DiskStore.moveFileToBlock(DiskStore.scala:150)
  at org.apache.spark.storage.BlockManager$TempFileBasedBlockStoreUpdater.saveToDiskStore(BlockManager.scala:487)
  at org.apache.spark.storage.BlockManager$BlockStoreUpdater.$anonfun$save$1(BlockManager.scala:407)
  at org.apache.spark.storage.BlockManager.org$apache$spark$storage$BlockManager$$doPut(BlockManager.scala:1445)
  at org.apache.spark.storage.BlockManager$BlockStoreUpdater.save(BlockManager.scala:380)
  at org.apache.spark.storage.BlockManager$TempFileBasedBlockStoreUpdater.save(BlockManager.scala:490)
  at org.apache.spark.shuffle.KubernetesLocalDiskShuffleExecutorComponents$.$anonfun$recoverDiskStore$14(KubernetesLocalDiskShuffleExecutorComponents.scala:95)
  at scala.collection.IndexedSeqOptimized.foreach(IndexedSeqOptimized.scala:36)
  at scala.collection.IndexedSeqOptimized.foreach$(IndexedSeqOptimized.scala:33)
  at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:198)
  at org.apache.spark.shuffle.KubernetesLocalDiskShuffleExecutorComponents$.recoverDiskStore(KubernetesLocalDiskShuffleExecutorComponents.scala:91)
```

### Does this PR introduce _any_ user-facing change?

No, this will improve the recover rate.

### How was this patch tested?

Pass the CIs.

Closes apache#37903 from dongjoon-hyun/SPARK-40459.

Authored-by: Dongjoon Hyun <[email protected]>
Signed-off-by: Dongjoon Hyun <[email protected]>
sunchao pushed a commit to sunchao/spark that referenced this pull request Jun 2, 2023
…omputed files

### What changes were proposed in this pull request?

This PR aims to ignore `FileExistsException` during `recoverDiskStore` processing.

### Why are the changes needed?

Although `recoverDiskStore` is already wrapped by `tryLogNonFatalError`, a single file recovery exception should not block the whole `recoverDiskStore` .

https://github.com/apache/spark/blob/5938e84e72b81663ccacf0b36c2f8271455de292/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/shuffle/KubernetesLocalDiskShuffleExecutorComponents.scala#L45-L47

```
org.apache.commons.io.FileExistsException: ...
  at org.apache.commons.io.FileUtils.requireAbsent(FileUtils.java:2587)
  at org.apache.commons.io.FileUtils.moveFile(FileUtils.java:2305)
  at org.apache.commons.io.FileUtils.moveFile(FileUtils.java:2283)
  at org.apache.spark.storage.DiskStore.moveFileToBlock(DiskStore.scala:150)
  at org.apache.spark.storage.BlockManager$TempFileBasedBlockStoreUpdater.saveToDiskStore(BlockManager.scala:487)
  at org.apache.spark.storage.BlockManager$BlockStoreUpdater.$anonfun$save$1(BlockManager.scala:407)
  at org.apache.spark.storage.BlockManager.org$apache$spark$storage$BlockManager$$doPut(BlockManager.scala:1445)
  at org.apache.spark.storage.BlockManager$BlockStoreUpdater.save(BlockManager.scala:380)
  at org.apache.spark.storage.BlockManager$TempFileBasedBlockStoreUpdater.save(BlockManager.scala:490)
  at org.apache.spark.shuffle.KubernetesLocalDiskShuffleExecutorComponents$.$anonfun$recoverDiskStore$14(KubernetesLocalDiskShuffleExecutorComponents.scala:95)
  at scala.collection.IndexedSeqOptimized.foreach(IndexedSeqOptimized.scala:36)
  at scala.collection.IndexedSeqOptimized.foreach$(IndexedSeqOptimized.scala:33)
  at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:198)
  at org.apache.spark.shuffle.KubernetesLocalDiskShuffleExecutorComponents$.recoverDiskStore(KubernetesLocalDiskShuffleExecutorComponents.scala:91)
```

### Does this PR introduce _any_ user-facing change?

No, this will improve the recover rate.

### How was this patch tested?

Pass the CIs.

Closes apache#37903 from dongjoon-hyun/SPARK-40459.

Authored-by: Dongjoon Hyun <[email protected]>
Signed-off-by: Dongjoon Hyun <[email protected]>
(cherry picked from commit f24bb43)
Signed-off-by: Dongjoon Hyun <[email protected]>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants