-
Notifications
You must be signed in to change notification settings - Fork 28.5k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[SPARK-25341][Core] Support rolling back a shuffle map stage and re-generate the shuffle files #25620
Conversation
Test build #109907 has finished for PR 25620 at commit
|
*/ | ||
private[this] val numMapsForShuffle = new ConcurrentHashMap[Int, Int]() | ||
private[this] val taskIdMapsForShuffle = new ConcurrentHashMap[Int, ArrayBuffer[Long]]() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
After using the map task attempt id as part of shuffle file, here we must record all the map task attempt id for each shuffle id. So comparing with the original implement, it's a memory waste here. But consider the shuffle map task number, it's maybe an accessible change?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
mostly it's hundreds or thousands of mappers, which is about several KB. I think it's fine. If there are many many mappers, our DAG scheduler will burn out first.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Well, @xuanyuanking should already be aware of that 100K Mappers is not that rare for large production jobs. That would be ~10MB for single one map stage.
Maybe we should removes old shuffleId's data just like the scheduler removes old stages. However I do believe it's fine for now. Let's revise this when it actually hits.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah that's why I highlight this question here, just because we see the huge job before. :)
For current implement of ContextCleaner, the in-memory shuffle metrics bind with JVM gc, the config spark.cleaner.periodicGC.interval
can help us.
Sure, let's keep tracking.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
but this map is only touched on executors, right? so its only the number of tasks which run on one executor which matter here.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This doesn't seem too worrying at first (even 10MB per stage isn't that much overhead if it's cleaned up eventually). But perhaps using OpenHashSet
can help with larger stages (vs. an ArrayBuffer
), although it will use more memory for smaller ones.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for the guidance, change it to use OpenHashSet
, and gives a smaller initial size 16
.
As @squito's suggestion in #24892 (comment), this PR reuse the map task attempt id as part of the shuffle file name. |
@@ -300,7 +300,7 @@ public ShuffleMetrics() { | |||
} | |||
|
|||
ManagedBufferIterator(FetchShuffleBlocks msg, int numBlockIds) { | |||
final int[] mapIdAndReduceIds = new int[2 * numBlockIds]; | |||
final long[] mapIdAndReduceIds = new long[2 * numBlockIds]; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
maybe we should one long[] for map id and one int[] for reduce id.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actually we already have long[] for map id and int[] for reduce id in the message, here we need is kinda assemble work to flatten reduce id and its corresponding mapid.
The current way waste memory, we can also do it in a cpu consuming way, which is for each index, calculate which map id and reduce id corresponding with the idx
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
After taking a further look, I split the new protocol managed buffer iterator in
539d725, that make us more flexible to control the iterator and no more array created.
*/ | ||
ShuffleMapOutputWriter createMapOutputWriter( | ||
int shuffleId, | ||
int mapId, | ||
long mapTaskAttemptId, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
shall we name it mapId
? To be consistent with the codebase.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The original mapId still used in mapOutputTracker and scheduler, I doubt anybody will confused by these two ids use same name?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I changed all mapTaskAttemptId
stuff to mapId
in 539d725.
So after the change, the mapId is the unique id for a map task. If we think it's confused to have a mapId represent the map index within a stage or a task set, mapIndex
maybe a much better name.
ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.spark.ml.image.ImageSchema.readImages"), | ||
|
||
// [SPARK-25341][CORE] Support rolling back a shuffle map stage and re-generate the shuffle files | ||
ProblemFilters.exclude[IncompatibleMethTypeProblem]("org.apache.spark.shuffle.sort.UnsafeShuffleWriter.this"), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm surprised that this is tracked by mima. It's obviously an internal class. cc @srowen @HyukjinKwon
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You can probably customize what is ignored by adding some logic to GenerateMIMAIgnore
. I think it's OK to just add exclusions here too.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe GenerateMIMAIgnore
isn't really ignoring classes annotated with @Private
.
Test build #109909 has finished for PR 25620 at commit
|
Test build #109940 has finished for PR 25620 at commit
|
@@ -106,7 +106,7 @@ protected void handleMessage( | |||
numBlockIds += ids.length; | |||
} | |||
streamId = streamManager.registerStream(client.getClientId(), | |||
new ManagedBufferIterator(msg, numBlockIds), client.getChannel()); | |||
new ShuffleManagedBufferIterator(msg), client.getChannel()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
we can also remove
numBlockIds = 0;
for (int[] ids: msg.reduceIds) {
numBlockIds += ids.length;
}
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The numBlockIds used in callback:
callback.onSuccess(new StreamHandle(streamId, numBlockIds).toByteBuffer());
core/src/main/java/org/apache/spark/shuffle/api/ShuffleExecutorComponents.java
Outdated
Show resolved
Hide resolved
core/src/main/scala/org/apache/spark/storage/ShuffleBlockFetcherIterator.scala
Outdated
Show resolved
Hide resolved
core/src/main/scala/org/apache/spark/storage/ShuffleBlockFetcherIterator.scala
Outdated
Show resolved
Hide resolved
@@ -706,6 +714,7 @@ object ShuffleBlockFetcherIterator { | |||
*/ | |||
private[storage] case class SuccessFetchResult( | |||
blockId: BlockId, | |||
mapId: Int, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why we need the map index here?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
if I follow correctly, the reason is that even a SuccessFetchResult still sometimes results in a FetchFailure back to driver (eg. error decompressing the buffer). And the FetchFailure needs the mapIndex, because the mapstatus is still stored by mapIndex, so this tells us what we need to remove in the handling in DAGScheduler.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah, that's right. Here we need to guarantee all paths to throwFetchFailedException
has mapIndex pass though, even a SuccessFetchResult still can trigger fetch failed exception.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
do I understand right that here in DAGScheduler:
spark/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
Lines 1534 to 1537 in ea90ea6
} else if (mapId != -1) { | |
// Mark the map whose fetch failed as broken in the map stage | |
mapOutputTracker.unregisterMapOutput(shuffleId, mapId, bmAddress) | |
} |
you'd only have mapId == 1
if using the old shuffle protocol? is that worth an assert?
I also find it tough to follow whether an id is the index within the stage or the global task id -- @cloud-fan pointed out a couple of cases where things could be named mapIndex
. Its unfortunate we already have confusing names here ... what do you think of using mapTid
consistently for all the places you mean the global id? I am at least used to seeing "TID" in spark logs for the global id, so maybe that would make it more clear?
handle.shuffleId, handle.asInstanceOf[BaseShuffleHandle[_, _, _]].numMaps) | ||
taskIdMapsForShuffle.synchronized { | ||
taskIdMapsForShuffle.putIfAbsent(handle.shuffleId, ArrayBuffer.empty[Long]) | ||
taskIdMapsForShuffle.get(handle.shuffleId).append(context.taskAttemptId()) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
you're trying to protect concurrent access to the ArrayBuffer[Long]
with that synchronized block, right? minor, but you could avoid locking the entire map, and instead do
val mapTaskIds = taskIdMapsForShuffle.putIfAbsent(handle.shuffleId, ArrayBuffer.empty[Long])
mapTaskIds.synchronized { mapTaskIds.append(context.taskAttemptId()) }
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yep, great thanks for your advice for the optimization! Done in 4bd9e00.
(I think you mean taskIdMapsForShuffle.computeIfAbsent
, use it in the new commit.)
*/ | ||
private[this] val numMapsForShuffle = new ConcurrentHashMap[Int, Int]() | ||
private[this] val taskIdMapsForShuffle = new ConcurrentHashMap[Int, ArrayBuffer[Long]]() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
but this map is only touched on executors, right? so its only the number of tasks which run on one executor which matter here.
@@ -706,6 +714,7 @@ object ShuffleBlockFetcherIterator { | |||
*/ | |||
private[storage] case class SuccessFetchResult( | |||
blockId: BlockId, | |||
mapId: Int, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
if I follow correctly, the reason is that even a SuccessFetchResult still sometimes results in a FetchFailure back to driver (eg. error decompressing the buffer). And the FetchFailure needs the mapIndex, because the mapstatus is still stored by mapIndex, so this tells us what we need to remove in the handling in DAGScheduler.
539d725
to
4bd9e00
Compare
@squito Thanks for reviewing this, as @cloud-fan's suggestion, I'll do the follow-up work to normalize all the names by using mapIndex and mapId, mapIndex indecate the index of this map task in the task set or stage, mapId refers to the unique id for this task. WDYT? |
Test build #109986 has finished for PR 25620 at commit
|
I agree with @squito that |
|
||
@Override | ||
public boolean hasNext() { | ||
// mapIds.length must equal to reduceIds.length, and the passed in FetchShuffleBlocks |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Shall we add check logic here to be safe?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yea if the place is not super performance critical I'd prefer a double check here.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sure, done the double-check in 00e78b2.
common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalBlockHandler.java
Outdated
Show resolved
Hide resolved
core/src/main/java/org/apache/spark/shuffle/api/ShuffleExecutorComponents.java
Show resolved
Hide resolved
// Before find missing partition, do the intermediate state clean work first. | ||
// The operation here can make sure for the intermediate stage, `findMissingPartitions()` | ||
// returns all partitions every time. | ||
stage match { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Just curious, why not do unregister during failure handling?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
That's for the scenario of ExecutorLost. While executor lost happened, there's possible for the indeterminate stage rerun triggered by submitParentStage
.
So if we only unregister during failure handling, only fetch failed stage and its parent stage do unregister, that logic would not cover the scenario of its parent's parent stage is indeterminate and have missing tasks.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It seems we can still put the unregister logic into the block:
https://github.com/apache/spark/pull/25620/files#diff-6a9ff7fb74fd490a50462d45db2d5e11R1626 ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actually the place points out by Xingbo is just my first-time attempt :), I found that's not enough to fix this problem during doing the integrate test.
This is because the logic we calculate stagesToRollback
in collectStagesToRollback
, only care about the downstream stages of the current fetch failed stage. For upstream indeterminate stages, put the unregister logic in failure handling didn't cover. So the correctness bug will still happen.
Also the newly added UT SPARK-25341: retry all the succeeding stages when the map stage is indeterminate
also covered this check, if we do the unregister in failure handling, the UT will fail.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm a little concerned about putting this here, as you'll see lower down in this method there is some handling for the case that submitMissingTasks
is called but there are actually no tasks to run. I'm not seeing how that happens now, but your change would make those cases always re-evaluate all partitions of the stage.
I think @jiangxb1987 suggestion makes sense, couldn't you do it the unregistering there? I agree the logic is currently insufficient as its not building up the full set of stages that need to be recomputed, but maybe we need to combine both.
or maybe we understand the old cases of submitting a stage with no missing partitions and my concern is not relevant?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for the detailed comment.
I tried to combine both at first but seems kind of duplicated code.
For the concerned about no missing partitions stage submission, I checkedshuffleMapStage.isAvailable
for the unregister in aa3a409, what we need here is making sure the partially completed indeterminate stage will be whole stage rerun.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I see people already commented on the mapId vs. mapIndex thing, so not gonna touch that.
startPartition: Int, | ||
endPartition: Int, | ||
useOldFetchProtocol: Boolean) | ||
: Iterator[(BlockManagerId, Seq[(BlockId, Long, Int)])] = { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: indent more or less. It should be at a different indent level than the method body.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks, done in 3bfb6e6.
Pretty confuse before about how to address the line starting with :
, thanks for your guidance :)
core/src/main/scala/org/apache/spark/internal/config/package.scala
Outdated
Show resolved
Hide resolved
*/ | ||
private[this] val numMapsForShuffle = new ConcurrentHashMap[Int, Int]() | ||
private[this] val taskIdMapsForShuffle = new ConcurrentHashMap[Int, ArrayBuffer[Long]]() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This doesn't seem too worrying at first (even 10MB per stage isn't that much overhead if it's cleaned up eventually). But perhaps using OpenHashSet
can help with larger stages (vs. an ArrayBuffer
), although it will use more memory for smaller ones.
core/src/main/scala/org/apache/spark/storage/ShuffleBlockFetcherIterator.scala
Outdated
Show resolved
Hide resolved
core/src/test/scala/org/apache/spark/storage/ShuffleBlockFetcherIteratorSuite.scala
Outdated
Show resolved
Hide resolved
ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.spark.ml.image.ImageSchema.readImages"), | ||
|
||
// [SPARK-25341][CORE] Support rolling back a shuffle map stage and re-generate the shuffle files | ||
ProblemFilters.exclude[IncompatibleMethTypeProblem]("org.apache.spark.shuffle.sort.UnsafeShuffleWriter.this"), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe GenerateMIMAIgnore
isn't really ignoring classes annotated with @Private
.
@@ -1047,6 +1047,14 @@ package object config { | |||
.checkValue(v => v > 0, "The value should be a positive integer.") | |||
.createWithDefault(2000) | |||
|
|||
private[spark] val SHUFFLE_USE_OLD_FETCH_PROTOCOL = |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What happens if you connect to an old shuffle service without setting this? Will things just fail (and always fail)?
Probably ok if they do. Although a more user-friendly error, if possible, might be good.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What happens if you connect to an old shuffle service without setting this? Will things just fail (and always fail)?
We'll always fail with the UnsupportedOperationException:Unexpected message: FetchShuffleBlocks
. This work is done in #24565.
Although a more user-friendly error, if possible, might be good.
Yeah, in this PR we had detailed log in DAGScheduler.
Let me think how to add a more user-friendly error in the follow-up work for the old shuffle service, currently, we only have some doc in the migration guide. https://github.com/apache/spark/pull/24565/files#diff-3f19ec3d15dcd8cd42bb25dde1c5c1a9R139
Great thanks for all comments, something inserted yesterday. I'll address comments from Xingbo and Vanzin, change all name related by Wenchen and Squito in today(Beijing time). |
Test build #110185 has finished for PR 25620 at commit
|
b527fe7
to
3bfb6e6
Compare
Test build #110186 has finished for PR 25620 at commit
|
Test build #110229 has finished for PR 25620 at commit
|
common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalBlockHandler.java
Outdated
Show resolved
Hide resolved
@@ -88,23 +88,23 @@ private class ShuffleStatus(numPartitions: Int) { | |||
* Register a map output. If there is already a registered location for the map output then it | |||
* will be replaced by the new location. | |||
*/ | |||
def addMapOutput(mapId: Int, status: MapStatus): Unit = synchronized { | |||
if (mapStatuses(mapId) == null) { | |||
def addMapOutput(mapIndex: Int, status: MapStatus): Unit = synchronized { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is a place that I think mapIndex
makes sense.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for pointing this out.
} | ||
|
||
/** | ||
* Remove the map output which was served by the specified block manager. | ||
* This is a no-op if there is no registered map output or if the registered output is from a | ||
* different block manager. | ||
*/ | ||
def removeMapOutput(mapId: Int, bmAddress: BlockManagerId): Unit = synchronized { | ||
if (mapStatuses(mapId) != null && mapStatuses(mapId).location == bmAddress) { | ||
def removeMapOutput(mapIndex: Int, bmAddress: BlockManagerId): Unit = synchronized { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ditto
@xuanyuanking thanks for the renaming work! After taking a quick look, I think we can go further. It looks to me that we should only use the name There are only a few places that we explicitly mean What do you think? |
Sure, done this in c86f6cc, keeping |
caa949d
to
c86f6cc
Compare
Test build #110923 has finished for PR 25620 at commit
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM except some code style comments
core/src/main/java/org/apache/spark/shuffle/api/ShuffleExecutorComponents.java
Show resolved
Hide resolved
core/src/main/java/org/apache/spark/shuffle/sort/UnsafeShuffleWriter.java
Show resolved
Hide resolved
core/src/main/scala/org/apache/spark/shuffle/ShuffleWriteProcessor.scala
Show resolved
Hide resolved
*/ | ||
case class FetchRequest(address: BlockManagerId, blocks: Seq[(BlockId, Long)]) { | ||
case class FetchRequest(address: BlockManagerId, blocks: Seq[(BlockId, Long, Int)]) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Now it's a tuple3 with int and long elements. I think it's better to create a class for it to make the code easier to read.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks, add FetchBlockInfo class for this in d2215b2.
Test build #110924 has finished for PR 25620 at commit
|
Test build #110930 has finished for PR 25620 at commit
|
Test build #110928 has finished for PR 25620 at commit
|
@squito @vanzin @jiangxb1987 do you have any more comments? This looks good to me now and I'd like to merge it within a few days if none of you object. |
since there is no objection, I'm merging it, thanks! |
Finally! Thank you all for the review. |
@@ -172,7 +172,7 @@ public ManagedBuffer getBlockData( | |||
String appId, | |||
String execId, | |||
int shuffleId, | |||
int mapId, | |||
long mapId, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@xuanyuanking why change this from int to long? Is it possible that a mapId can be greater than 2^31?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
previous the map id is the index of the mapper, and can get conflicts when we re-run the task. Now the map id is the task id, which is unique. task id needs to be long.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, after this patch, we set mapId by using the taskAttemptId
of map task, which is a unique Id within the same SparkContext. You can see the comment #25620 (comment)
After the newly added shuffle block fetching protocol in #24565, we can keep this work by extending the FetchShuffleBlocks message.
What changes were proposed in this pull request?
In this patch, we achieve the indeterminate shuffle rerun by reusing the task attempt id(unique id within an application) in shuffle id, so that each shuffle write attempt has a different file name. For the indeterministic stage, when the stage resubmits, we'll clear all existing map status and rerun all partitions.
All changes are summarized as follows:
Why are the changes needed?
This is a follow-up work for #22112's future improvment[1]:
Currently we can't rollback and rerun a shuffle map stage, and just fail.
Spark will rerun a finished shuffle write stage while meeting fetch failures, currently, the rerun shuffle map stage will only resubmit the task for missing partitions and reuse the output of other partitions. This logic is fine in most scenarios, but for indeterministic operations(like repartition), multiple shuffle write attempts may write different data, only rerun the missing partition will lead a correctness bug. So for the shuffle map stage of indeterministic operations, we need to support rolling back the shuffle map stage and re-generate the shuffle files.
Does this PR introduce any user-facing change?
Yes, after this PR, the indeterminate stage rerun will be accepted by rerunning the whole stage. The original behavior is aborting the stage and fail the job.
How was this patch tested?
It's a simple job with multi indeterminate stage, it will get a wrong answer while using old Spark version like 2.2/2.3, and will be killed after #22112. With this fix, the job can retry all indeterminate stage as below screenshot and get the right result.
