-
Notifications
You must be signed in to change notification settings - Fork 28.5k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[SPARK-28625][Core] Indeterminate shuffle support in Shuffle Writer API #25361
Conversation
Add indeterminate stage rerun support in shuffle writer api (cherry picked from commit 99c2b4a) Signed-off-by: Yuanjian Li <[email protected]>
Test build #108662 has finished for PR 25361 at commit
|
Test build #108663 has finished for PR 25361 at commit
|
@@ -39,16 +39,20 @@ | |||
/** | |||
* Called once per map task to create a writer that will be responsible for persisting all the | |||
* partitioned bytes written by that map task. | |||
* @param shuffleId Unique identifier for the shuffle the map task is a part of | |||
* @param shuffleId Unique identifier for the shuffle the map task is a part of | |||
* @param shuffleGenerationId The shuffle generation ID of the stage that this task belongs to, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why can't we just use the mapTaskAttemptId
? (In fact I wonder if we can just remove shuffleId
and mapId
and just use mapTaskAttemptId
as a global identifier, but that might be a bit ambitious.)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Specifically taking a look at the linked PR for indeterminate retries - I'd expect that on a rolled back map stage, the implementation of this plugin will be given a different mapTaskAttemptId
anyways since that's going to be updated on the resubmit. So, we'll end up opening a new writer regardless, but, we could have gotten the same behavior just by using the mapTaskAttemptId
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actually let's move discussion over to #24892
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yep, thanks for your faster review, I describe the requirement in #24892 (comment)
ShuffleMapOutputWriter mapOutputWriter = shuffleExecutorComponents | ||
.createMapOutputWriter(shuffleId, mapId, mapTaskAttemptId, numPartitions); | ||
ShuffleMapOutputWriter mapOutputWriter = shuffleExecutorComponents.createMapOutputWriter( | ||
shuffleId, -1, mapId, mapTaskAttemptId, numPartitions); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why are we always passing in -1 here?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This PR created for quick reviewing of API changes, you can see the real scenario of shuffleGeneraionId here :)
Close this preview PR, the API changed in #25620, which uses map task attempt Id as the map id during writer creation. |
What changes were proposed in this pull request?
Add indeterminate stage rerun support in shuffle writer API, the usage of newly added
shuffleGenerationId
param is in #24892.How was this patch tested?
Existing UT.