-
Notifications
You must be signed in to change notification settings - Fork 28.5k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[SPARK-25341][Core] Support rolling back a shuffle map stage and re-generate the shuffle files #24892
Conversation
Test build #106588 has finished for PR 24892 at commit
|
ping @cloud-fan @gatorsmile |
...ork-shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalShuffleBlockResolver.java
Outdated
Show resolved
Hide resolved
ExecutorShuffleInfo executor, int shuffleId, int mapId, int reduceId) { | ||
ExecutorShuffleInfo executor, int shuffleId, | ||
int mapId, int reduceId, int stageAttemptId) { | ||
String baseFileName = "shuffle_" + shuffleId + "_" + mapId + "_0"; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
not related to your change, but do you know what _0
means here?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
IIUC, it's the IndexShuffleBlockResolver.NOOP_REDUCE_ID
, as described in the comment
// No-op reduce ID used in interactions with disk store.
// The disk store currently expects puts to relate to a (map, reduce) pair, but in the sort
// shuffle outputs for several reduces are glommed into a single file.
After all blocks consolidate in single file, we didn't use reduceId in the shuffle file name, just use the offsite reading from index file to find the block in the shuffle data file.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
it was from the hash shuffle algorithm from long ago -- that last id was the reducePartitionId. But now we always merge all reducePartitions into one.
...work-shuffle/src/main/java/org/apache/spark/network/shuffle/protocol/FetchShuffleBlocks.java
Outdated
Show resolved
Hide resolved
core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
Outdated
Show resolved
Hide resolved
Hi @xuanyuanking thanks for your great work! can you extend the PR description to explain what is shuffle generation id and why we can use stage attempt id to represent it? |
Test build #106719 has finished for PR 24892 at commit
|
@cloud-fan Thanks Wenchen for your review and advice, comment address and test fix done. |
Test build #106724 has finished for PR 24892 at commit
|
...ork-shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalShuffleBlockResolver.java
Outdated
Show resolved
Hide resolved
...ork-shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalShuffleBlockResolver.java
Outdated
Show resolved
Hide resolved
...work-shuffle/src/main/java/org/apache/spark/network/shuffle/protocol/FetchShuffleBlocks.java
Outdated
Show resolved
Hide resolved
...work-shuffle/src/main/java/org/apache/spark/network/shuffle/protocol/FetchShuffleBlocks.java
Outdated
Show resolved
Hide resolved
...twork-shuffle/src/test/java/org/apache/spark/network/shuffle/BlockTransferMessagesSuite.java
Outdated
Show resolved
Hide resolved
core/src/main/java/org/apache/spark/shuffle/sort/BypassMergeSortShuffleWriter.java
Outdated
Show resolved
Hide resolved
core/src/main/java/org/apache/spark/shuffle/sort/UnsafeShuffleWriter.java
Outdated
Show resolved
Hide resolved
core/src/main/scala/org/apache/spark/internal/config/package.scala
Outdated
Show resolved
Hide resolved
@@ -30,7 +30,7 @@ trait BlockDataManager { | |||
* Interface to get local block data. Throws an exception if the block cannot be found or | |||
* cannot be read successfully. | |||
*/ | |||
def getBlockData(blockId: BlockId): ManagedBuffer | |||
def getBlockData(blockId: BlockId, shuffleGenerationId: Int): ManagedBuffer |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
hmm, it's a little weird to have a shuffleGenerationId
parameter in a general getBlockData
method.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Fully agree, actually in the original patch, I add a new function named getShuffleBlockData, which want only to address getting shuffle block data. But I found that for support old version of fetching blocks by OpenBlocks
, we still keep the logic of fetching shuffle block data which has no shuffle generation id in getBlockData
function, so I revert back and implement as the current version. Does Wenchen have any suggestion?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We can keep getBlockData
unchanged, and add a new getShuffleBlockData
method.
For old shuffle protocol, we still call getBlockData
. Otherwise, call getShuffleBlockData
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for the suggestion, done this in 96bab1e.
// First figure out the indexes of partition ids to compute. | ||
// Before find missing partition, do the intermediate state clean work first. | ||
stage match { | ||
case sms: ShuffleMapStage if stage.isIndeterminate => |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
shall we add if stage.latestInfo.attemptNumber() > 0
as well?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It will cause the correctness problem because we didn't call makeNewStageAttempt
so far here.
core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
Outdated
Show resolved
Hide resolved
Test build #106807 has finished for PR 24892 at commit
|
...on/network-shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalShuffleClient.java
Outdated
Show resolved
Hide resolved
...on/network-shuffle/src/main/java/org/apache/spark/network/shuffle/OneForOneBlockFetcher.java
Outdated
Show resolved
Hide resolved
...on/network-shuffle/src/main/java/org/apache/spark/network/shuffle/OneForOneBlockFetcher.java
Outdated
Show resolved
Hide resolved
String blockId = "shuffle_" + shuffleId + "_" + mapId + "_0"; | ||
if (indeterminateBlock) blockId += "_0"; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
_0
is a little confusing here. Maybe we should have a shuffleGenerationId
parameter (by default -1), instead of
indeterminateBlock
.
@@ -168,7 +180,9 @@ private FetchResult fetchBlocks( | |||
String execId, | |||
String[] blockIds, | |||
TransportConf clientConf, | |||
int port) throws Exception { | |||
int port, | |||
int shuffleGenerationId, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
maybe just an Integer shuffleGenerationId
, and use null to indicate fetching data block.
@@ -157,9 +158,20 @@ public void releaseBuffers() { | |||
} | |||
} | |||
|
|||
// Fetch a set of blocks from a pre-registered executor. | |||
// Fetch a set of shuffle blocks with default generation id -1 from a pre-registered executor. | |||
private FetchResult fetchBlocks(String execId, String[] blockIds) throws Exception { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
shall we rename it to fetchShuffleBlocks
?
} | ||
|
||
// Fetch a set of shuffle blocks from a pre-registered executor. | ||
private FetchResult fetchBlocks( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ditto
TransportConf transportConf) { | ||
TransportConf transportConf, | ||
boolean useShuffleBlockFetcher, | ||
int shuffleGenerationId) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ditto, shall we use a single Integer shuffleGenerationId
parameter?
@@ -39,16 +39,20 @@ | |||
/** | |||
* Called once per map task to create a writer that will be responsible for persisting all the | |||
* partitioned bytes written by that map task. | |||
* @param shuffleId Unique identifier for the shuffle the map task is a part of | |||
* @param shuffleId Unique identifier for the shuffle the map task is a part of | |||
* @param shuffleGenerationId The shuffle generation ID of the stage that this task belongs to, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
how about
The generation ID of the shuffle, which is always -1 if the shuffle map stage is deterministic.
Otherwise, it starts at 0 and increases when the shuffle map stage gets retied.
abortStage(mapStage, reason, None) | ||
} else { | ||
logInfo(s"The indeterminate stage $mapStage will be resubmitted," + | ||
" the stage self and all indeterminate parent stage will be" + |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
the stage itself
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
all indeterminate parent stage
=> all parent stages
. We need to rerun the parent stages even if they are deterministic.
@@ -28,7 +28,8 @@ private[spark] class TaskSet( | |||
val stageId: Int, | |||
val stageAttemptId: Int, | |||
val priority: Int, | |||
val properties: Properties) { | |||
val properties: Properties, | |||
val isIndeterminateRerun: Boolean = false) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
how about allowSpeculativeTask: Boolean = true
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
and add some comments to explain when we don't allow speculative tasks.
} | ||
|
||
test("SPARK-25341: continuous indeterminate stage roll back") { | ||
// shuffleMapRdd1/2/3 are all indeterminate. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
how about we make the shuffleMapRdd3
deterministic? To prove that all parent stages need to rerun even if they are deterministic.
val mapId = 2 | ||
val idxName = s"shuffle_${shuffleId}_${mapId}_0.index" | ||
private def testWithIndexShuffleBlockResolver( | ||
shuffleId: Int, mapId: Int, idxName: String, generationId: Int): Unit = { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: shall we put generationId
right after shuffleId
?
*/ | ||
ShuffleMapOutputWriter createMapOutputWriter( | ||
int shuffleId, | ||
int shuffleGenerationId, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
shuffleGenerationId
is a bit long, how about just generationId
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM except a few minor comments, thanks for fixing it!
Sorry I don't understand -- why don't you want to support speculative execution? Correctness before performance, yes. On the assumptions about task independence -- yes, that was the assumption, before this whole thread of issues related to non-determinstic tasks and stage retry. Fetch failures are more likely on large clusters & large workloads, precisely where speculative execution is important too. If we couldn't get it to work together, then I would totally agree we should go for correctness. But I think using the global TID would give us the behavior we want. I also think the global TID is simpler. For one, debugging is simpler -- the shuffle id is actually the shuffle id; there isn't some other state that is tracked separately to know which block to get. If you're really just concerned about the overhead of an additional field in the shuffle block, I think you could even swap out the original map partition id for the TID of the map task (though that would be more complex in other ways, the |
After another look, I think speculative task is OK. When we run an indeterminate shuffle map stage, it's always a fresh run (either the first run, or a retry that reruns all the downstream stages). Sorry about missing it before. It's fine to write shuffle files with speculative tasks. The shuffle map task writes to a temp file first, and then try to rename the temp file to the formal shuffle file name( I think it's a good idea to use TID instead of partition ID to represent mapId. There is no more file name conflict anymore. We can keep the shuffle protocol unchanged, but there will be a little overhead in One concern is, it will be hard to test. Now we need to query I think this worth a discussion, cc @vanzin @tgravescs @jiangxb1987 |
Thanks for taking another look. Another thing to keep in mind w/ speculative shuffle tasks -- the scheduler never puts speculative tasks on the same host as the original task. That ensures the speculative task isn't trying to write to the same disk with local shuffle storage. But, that doesn't help at all with distributed shuffle storage (and doesn't help deal w/ zombie tasks etc.) |
So I'm trying to page in enough context about all this, but I can't shake this feeling that I'm missing something about speculative tasks in non-deterministic stages being safe. The code that triggers me is this, in
That seems to be blindly overwriting an existing task's output with the new one. Wouldn't that mean that a speculative task could replace the output of another task after the stage has finished (and thus after the next stage started running)? The stage is marked as finished as soon as there are output blocks for all the partitions, and at that point there may still be speculative tasks that haven't reported back. I believe in that case the driver makes an effort to kill them, but what if that task result arrives first? (There's a check at the very next line where if checks Sorry if I'm missing something obvious. Need to spend more time to fully understand this. (I also realize that what I'm commenting on isn't necessarily caused by this particular PR or would change by the latest suggestions, but rather is an existing thing.) |
@vanzin I think your concern is valid. Seems the shuffle writing policy is contradictory to itself: if a partition has multiple shuffle write tasks, 1) if they are on the same node (e.g. a stage becomes zombie and rerun), first write wins. 2) if they are on different nodes (speculative tasks), last write wins. I think we should stick with "first write wins". As you said we should only update the map status if the task is the first one that completes. |
@vanzin I checked the code: spark/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala Lines 1355 to 1365 in 1b416a0
Spark will ignore late speculative tasks that are completed after the stage is completed. So at least the worst case won't happen: a speculative task could replace the output of another task after the stage has finished (and thus after the next stage started running) But we still have the contradiction: executor side first shuffle write wins, driver side last shuffle write wins. When we run an indeterminate stage, the downstream stages are always fresh (when we rerun an indeterminate stage, the scheduler rolls back all downstream stages). So it doesn't matter which shuffle write wins, as long as the shuffle write is atomic. We'd better fix this contradiction, but it doesn't cause any real problems. |
thanks for the looking into this @vanzin and @cloud-fan . I agree its unnecessarily confusing, would be good to make this consistent. |
Thanks for looking at the speculation-related code. it's a little confusing at times. Another idea: instead of introducing a e.g. when an indeterminate stage fails, you call into the The DAGScheduler might need some adjustments, but other than that, wouldn't this approach also solve the problem? And without needing to change the way block IDs are used anywhere. |
Hmm, nevermind, that doesn't account for the existing files from the old shuffle, which is the main change in this PR (changing the file names so that they contain the shuffle generation id). |
@cloud-fan could you clarify this comment?
What tests are you thinking about? The way I see Imran's suggestion (encode the task ID in the Maybe there are some tests that make assumptions about how the On an unrelated note, just a note about my comments above: it seems that this could also be fixed by using a "last write wins" approach on the executor side (invalidate all outputs from previous stage attempt in the tracker, and tasks from the new stage attempt would overwrite files generated by the previous attempt). But that sounds dangerous (and hard to make sure all is working as it should), so the unique file name based on task ID sounds like a good balance between complexity and size of changes. (It would shift a lot of the changes to the test code, if what Wenchen says really is a problem, but I prefer that to more changes in the main code.) |
Oh, one more thing: changing from partition ID to task ID in ShuffleBlockId & friends would still qualify as a change in the shuffle service protocol, since there's a type change from int to long, and a lot of code in the shuffle service assumes that the id will be an integer. It should still work in a lot of cases with an old shuffle service, but would break when task IDs exceed |
Great thanks @squito for the idea of reusing the task attempt id as map id, this significantly reduces the code changes. I reimplement the task in #25620. Beside of the tests changes and map status should add the map task attempt id, I found maybe the last overhead during this work, it's about the SortShuffleManager, we need to record all the map task id while only keep the map numbers before, let's discuss this here.
Thanks @vanzin for the reminding, the compatibility for external shuffle service is definitely an important consideration. We'll only do this extension for the new shuffle protocol, thanks for the work in #24565, we can compatible with old external shuffle service by using the old protocol, you can see the corresponding implement here. |
After the newly added shuffle block fetching protocol in #24565, we can keep this work by extending the FetchShuffleBlocks message.
What changes were proposed in this pull request?
This is a follow-up work for #22112's future improvment[1]:
Currently we can't rollback and rerun a shuffle map stage, and just fail.
Spark will rerun a finished shuffle write stage while meeting fetch failures, currently, the rerun shuffle map stage will only resubmit the task for missing partitions and reuse the output of other partitions. This logic is fine in most scenarios, but for indeterministic operations(like repartition), multiple shuffle write attempts may write different data, only rerun the missing partition will lead a correctness bug. So for the shuffle map stage of indeterministic operations, we need to support rolling back the shuffle map stage and re-generate the shuffle files. In this patch, we achieve this by adding the indeterministic tag in the stage, for the indeterministic stage, when the stage resubmits, we'll clear all existing map status and rerun all partitions. We also add properties in task LocalProperties to record the message for the retried indeterminate stage, so that the shuffle file which generated from retried indeterministic stage will keep the shuffle generation id in the file name. The corresponding reduce side will specify which shuffle generation id of shuffle it wants to read. The shuffle generation id marked the retried times of this stage, so we reuse the stage attempt id as the shuffle generation id while meeting the indeterminate stage reran.
All changes are summarized as follows:
How was this patch tested?
It's a simple job with multi indeterminate stage, it will get a wrong answer while using old Spark version like 2.2/2.3, and will be killed after #22112. With this fix, the job can retry all indeterminate stage as below screenshot and get the right result.
