-
Notifications
You must be signed in to change notification settings - Fork 28.5k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[SPARK-25341][Core] Support rolling back a shuffle map stage and re-generate the shuffle files #24110
Conversation
Test this please. |
Test build #103528 has finished for PR 24110 at commit
|
To help other people review this patch, can you add the following information in your PR description?
A side question: can we skip the temp file when writing shuffle files? It was introduced at #9610 and seems unnecessary when the shuffle files have attempt number in the name. |
Thanks Wenchen, the PR description updated.
|
Why do we need this tag? For deterministic map stage, we will never rollback it, isn't it? |
Yes Wenchen, also mentioned this just now in the answer of your side question. |
That's what I expect. My initial idea is to extend the shuffle id with a generation number, and shuffle id becomes a (int, int) instead of a single int. With the shuffle id change, the shuffle block id also changes, as well as other places that refer to shuffle id, e.g. map output tracker, the shuffle file name maybe. I think it's a much simpler design comparing to doing different things for different deterministic levels. |
Copy that, I'll try in this way in the following commit.
I met some problem in this way because while the shuffleId is created in shuffle dependency, there's no generation number. Maybe we still need to keep the generation number in ShuffleStatus. |
Test build #104049 has finished for PR 24110 at commit
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@cloud-fan Thanks Wenchen for the guidance, it's much simpler and cleaner implement to extend shuffle block id for all cases.
core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
Outdated
Show resolved
Hide resolved
62edf82
to
b17eaf3
Compare
Test build #104051 has finished for PR 24110 at commit
|
...work-shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalShuffleBlockHandler.java
Outdated
Show resolved
Hide resolved
@@ -1231,7 +1234,8 @@ private[spark] class DAGScheduler( | |||
logInfo(s"Submitting ${tasks.size} missing tasks from $stage (${stage.rdd}) (first 15 " + | |||
s"tasks are for partitions ${tasks.take(15).map(_.partitionId)})") | |||
taskScheduler.submitTasks(new TaskSet( | |||
tasks.toArray, stage.id, stage.latestInfo.attemptNumber, jobId, properties)) | |||
tasks.toArray, stage.id, stage.latestInfo.attemptNumber, jobId, properties, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What about making Stage#nextAttemptId
readable and using it instead of stage.latestInfo.attemptNumber
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe stage.latestInfo.attemptNumber
is better for readability, nextAttemptId = latestInfo.attemptNumber + 1.
Hi @xuanyuanking , thanks for your work here. Can you please explain how your PR takes care of multiple write attempts? I am especially interested in how this takes care of resolving write attempts across two different stage attempts. From the code, I could not see that resolution happening anywhere but I can be wrong. |
@ankurcha the general idea is to let the driver( |
Thanks @cloud-fan for your reply. That makes sense and I saw that you posted that comment earlier as well: #24110 (comment) But it seems that the current PR doesn't work that way. |
Sorry for the delay, some other things inserted in these days, I'll finish this tomorrow. |
b17eaf3
to
012ae1d
Compare
@ankuriitg Thanks for your question, just as the answer by Wenchen, the general idea is to reuse the stage attempt id as the "generation id" for each indeterminate shuffle map task, the output file will be extended by the "generation id" while multiple write attempts happened, you can check the corresponding code in all shuffle writers, e.g. SortShuffleWriter. |
@cloud-fan As our offline discussion, thanks for your excellent idea about keeping the "generation id" by reusing the local properties of the task, that reduces a lot of interface changes. The newest commit f88f9d6 also keeps the behavior about only use extended block id for a retried indeterminate stage, I think it's ready for reviewing now. |
Test build #104265 has finished for PR 24110 at commit
|
|
retest this please. |
...work-shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalShuffleBlockHandler.java
Outdated
Show resolved
Hide resolved
* attempt number while the stage is not determinate and returns none on the contrary. | ||
*/ | ||
def indeterminateStageAttemptId(shuffleId: Int): Option[Int] = { | ||
val id = getLocalProperty(SparkContext.INDETERMINATE_STAGE_ATTEMPT_ID_PREFIX + shuffleId) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
do we need to include shuffle id?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think we should include shuffle id here, for an indeterminate shuffle map task which parent stage also indeterminate, we both need parent stage id and current id here for shuffle reader and writer. Also for the more complex scenario, for join/cogroup operation, maybe we have more indeterminate parents for single shuffle map stage.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
makes sense
core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
Outdated
Show resolved
Hide resolved
Test build #104275 has finished for PR 24110 at commit
|
Test build #104287 has finished for PR 24110 at commit
|
Test build #104435 has started for PR 24110 at commit |
retest this please. |
Test build #104522 has finished for PR 24110 at commit
|
After #24359, this pr need little re-implement, and the new parameter in TaskSet is no longer needed. I'm doing this and will give a commit soon after manually run the integrated test. |
a1e6d0c
to
b8bc611
Compare
Test build #104806 has finished for PR 24110 at commit
|
PR #25620 fix this issue. |
As the current approach in OneForOneBlockFetcher, we reuse the OpenBlocks protocol to describe the fetch request for shuffle blocks, and it causes the extension work for shuffle fetching like apache#19788 and apache#24110 very awkward. In this PR, we split the fetch request for shuffle blocks from OpenBlocks which named FetchShuffleBlocks. It's a loose bind with ShuffleBlockId and can easily extend by adding new fields in this protocol. Existing and new added UT. Closes apache#24565 from xuanyuanking/SPARK-27665. Authored-by: Yuanjian Li <[email protected]> Signed-off-by: Wenchen Fan <[email protected]> (cherry picked from commit 8949bc7)
What changes were proposed in this pull request?
This is a follow-up work for #22112's future improvment[1]:
Currently we can't rollback and rerun a shuffle map stage, and just fail.
Spark will rerun a finished shuffle write stage while meeting fetch failures, currently, the rerun shuffle map stage will only resubmit the task for missing partitions and reuse the output of other partitions. This logic is fine in most scenarios, but for indeterministic operations(like repartition), multiple shuffle write attempts may write different data, only rerun the missing partition will lead a correctness bug. So for the shuffle map stage of indeterministic operations, we need to support rolling back the shuffle map stage and re-generate the shuffle files. In this patch, we achieve this by adding the indeterministic tag in the stage, for the indeterministic stage, when the stage resubmits, we'll clear all existing map status and rerun all partitions. We also add properties in task LocalProperties to record the message for the retried indeterminate stage, so that the shuffle file which generated from retried indeterministic stage will keep attempt id in the file name, the corresponding reduce side will specify which attempt id of shuffle it wants to read.
All changes are summarized as follows:
How was this patch tested?
It's a simple job with multi indeterminate stage, it will get a wrong answer while using old Spark version like 2.2/2.3, and will be killed after #22112. With this fix, the job can retry all indeterminate stage as below screenshot and get the right result.
