Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-25341][Core] Support rolling back a shuffle map stage and re-generate the shuffle files #24110

Closed
wants to merge 16 commits into from

Conversation

xuanyuanking
Copy link
Member

@xuanyuanking xuanyuanking commented Mar 15, 2019

What changes were proposed in this pull request?

This is a follow-up work for #22112's future improvment[1]: Currently we can't rollback and rerun a shuffle map stage, and just fail.

Spark will rerun a finished shuffle write stage while meeting fetch failures, currently, the rerun shuffle map stage will only resubmit the task for missing partitions and reuse the output of other partitions. This logic is fine in most scenarios, but for indeterministic operations(like repartition), multiple shuffle write attempts may write different data, only rerun the missing partition will lead a correctness bug. So for the shuffle map stage of indeterministic operations, we need to support rolling back the shuffle map stage and re-generate the shuffle files. In this patch, we achieve this by adding the indeterministic tag in the stage, for the indeterministic stage, when the stage resubmits, we'll clear all existing map status and rerun all partitions. We also add properties in task LocalProperties to record the message for the retried indeterminate stage, so that the shuffle file which generated from retried indeterministic stage will keep attempt id in the file name, the corresponding reduce side will specify which attempt id of shuffle it wants to read.

All changes are summarized as follows:

  • Extend ShuffleBlockId with indeterminateAttemptId.
  • Add corresponding support for ShuffleBlockResolver, if the shuffle file generated from the indeterminate stage, its name will contain the indeterminateAttemptId, otherwise the file name just as before.
  • Add the retried indeterminate stage info in TaskContext.localProperties and use it in Shuffle Reader and Writer.
  • Add the determinate flag in Stage and use it in DAGScheduler and the cleaning work for the intermediate stage.

How was this patch tested?

  • UT: Add UT for all changing code and newly added function.
  • Manual Test: Also providing a manual test to verify the effect.
import scala.sys.process._
import org.apache.spark.TaskContext

val determinateStage0 = sc.parallelize(0 until 1000 * 1000 * 100, 10)
val indeterminateStage1 = determinateStage0.repartition(200)
val indeterminateStage2 = indeterminateStage1.repartition(200)
val indeterminateStage3 = indeterminateStage2.repartition(100)
val indeterminateStage4 = indeterminateStage3.repartition(300)
val fetchFailIndeterminateStage4 = indeterminateStage4.map { x =>
if (TaskContext.get.attemptNumber == 0 && TaskContext.get.partitionId == 190 && 
  TaskContext.get.stageAttemptNumber == 0) {
  throw new Exception("pkill -f -n java".!!)
  }
  x
}
val indeterminateStage5 = fetchFailIndeterminateStage4.repartition(200)
val finalStage6 = indeterminateStage5.repartition(100).collect().distinct.length

It's a simple job with multi indeterminate stage, it will get a wrong answer while using old Spark version like 2.2/2.3, and will be killed after #22112. With this fix, the job can retry all indeterminate stage as below screenshot and get the right result.
image

@xuanyuanking
Copy link
Member Author

xuanyuanking commented Mar 15, 2019

Test this please.

@xuanyuanking
Copy link
Member Author

cc @cloud-fan @gatorsmile.

@SparkQA
Copy link

SparkQA commented Mar 15, 2019

Test build #103528 has finished for PR 24110 at commit a9a3091.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@cloud-fan
Copy link
Contributor

To help other people review this patch, can you add the following information in your PR description?

  1. How the current shuffle works regarding multiple write attempts. This can happen when we try a map write task, or when retry the entire map stage.
  2. What's the problem we are trying to fix in the current shuffle. (because of non-deterministic operations, multiple shuffle write attempts may write different data)
  3. What's the new proposal and how it solves the problem.

A side question: can we skip the temp file when writing shuffle files? It was introduced at #9610 and seems unnecessary when the shuffle files have attempt number in the name.

@xuanyuanking
Copy link
Member Author

xuanyuanking commented Mar 19, 2019

To help other people review this patch, can you add the following information in your PR description?

Thanks Wenchen, the PR description updated.

A side question: can we skip the temp file when writing shuffle files?

Let me check this, it's related to https://issues.apache.org/jira/browse/SPARK-8029.
As current implement, the answer is no, because we only keep attempt id for the indeterminate stage.
For further checking, I think we can achieve the goal of skipping the temp file if we need. We need to do two more things below:

  • All the block id should be extended by adding attempt id, not only the indeterminate ones.
  • We should keep the mapping of (stage, partition) -> attemptId in MapOutputTracker, because, for the determinate stage, we will meet different stage attemptId in the single stage while fetch fail happened. Current implement just keeps the mapping of shuffleId to attemptId for each stage and only use them in the indeterminate stage.

@cloud-fan
Copy link
Contributor

we achieve this by adding the determistic tag in both stage and shuffle dependency

Why do we need this tag? For deterministic map stage, we will never rollback it, isn't it?

@xuanyuanking
Copy link
Member Author

Why do we need this tag? For deterministic map stage, we will never rollback it, isn't it?

Yes Wenchen, also mentioned this just now in the answer of your side question.

@cloud-fan
Copy link
Contributor

cloud-fan commented Mar 22, 2019

All the block id should be extended by adding attempt id, not only the indeterminate ones.

That's what I expect. My initial idea is to extend the shuffle id with a generation number, and shuffle id becomes a (int, int) instead of a single int. With the shuffle id change, the shuffle block id also changes, as well as other places that refer to shuffle id, e.g. map output tracker, the shuffle file name maybe.

I think it's a much simpler design comparing to doing different things for different deterministic levels.

@xuanyuanking
Copy link
Member Author

Copy that, I'll try in this way in the following commit.

My initial idea is to extend the shuffle id with a generation number, and shuffle id becomes a (int, int) instead of a single int.

I met some problem in this way because while the shuffleId is created in shuffle dependency, there's no generation number. Maybe we still need to keep the generation number in ShuffleStatus.

@SparkQA
Copy link

SparkQA commented Mar 28, 2019

Test build #104049 has finished for PR 24110 at commit 62edf82.

  • This patch fails to build.
  • This patch merges cleanly.
  • This patch adds no public classes.

Copy link
Member Author

@xuanyuanking xuanyuanking left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@cloud-fan Thanks Wenchen for the guidance, it's much simpler and cleaner implement to extend shuffle block id for all cases.

@SparkQA
Copy link

SparkQA commented Mar 28, 2019

Test build #104051 has finished for PR 24110 at commit b17eaf3.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@@ -1231,7 +1234,8 @@ private[spark] class DAGScheduler(
logInfo(s"Submitting ${tasks.size} missing tasks from $stage (${stage.rdd}) (first 15 " +
s"tasks are for partitions ${tasks.take(15).map(_.partitionId)})")
taskScheduler.submitTasks(new TaskSet(
tasks.toArray, stage.id, stage.latestInfo.attemptNumber, jobId, properties))
tasks.toArray, stage.id, stage.latestInfo.attemptNumber, jobId, properties,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What about making Stage#nextAttemptId readable and using it instead of stage.latestInfo.attemptNumber?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe stage.latestInfo.attemptNumber is better for readability, nextAttemptId = latestInfo.attemptNumber + 1.

@ankuriitg
Copy link
Contributor

Hi @xuanyuanking , thanks for your work here. Can you please explain how your PR takes care of multiple write attempts? I am especially interested in how this takes care of resolving write attempts across two different stage attempts. From the code, I could not see that resolution happening anywhere but I can be wrong.

@cloud-fan
Copy link
Contributor

@ankurcha the general idea is to let the driver(DAGScheduler) decides the shuffle "generation number" when submitting map tasks, then there is no conflict because different attempts of the shuffle map stage will write shuffle files with different file names. I haven't looked into this PR though.

@ankuriitg
Copy link
Contributor

Thanks @cloud-fan for your reply. That makes sense and I saw that you posted that comment earlier as well: #24110 (comment)

But it seems that the current PR doesn't work that way.

@xuanyuanking
Copy link
Member Author

Sorry for the delay, some other things inserted in these days, I'll finish this tomorrow.

@xuanyuanking
Copy link
Member Author

Can you please explain how your PR takes care of multiple write attempts?

@ankuriitg Thanks for your question, just as the answer by Wenchen, the general idea is to reuse the stage attempt id as the "generation id" for each indeterminate shuffle map task, the output file will be extended by the "generation id" while multiple write attempts happened, you can check the corresponding code in all shuffle writers, e.g. SortShuffleWriter.

@xuanyuanking
Copy link
Member Author

xuanyuanking commented Apr 4, 2019

@cloud-fan As our offline discussion, thanks for your excellent idea about keeping the "generation id" by reusing the local properties of the task, that reduces a lot of interface changes. The newest commit f88f9d6 also keeps the behavior about only use extended block id for a retried indeterminate stage, I think it's ready for reviewing now.

@SparkQA
Copy link

SparkQA commented Apr 4, 2019

Test build #104265 has finished for PR 24110 at commit 012ae1d.

  • This patch fails PySpark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@xuanyuanking
Copy link
Member Author

xuanyuanking commented Apr 4, 2019

python/run-tests --testnames 'pyspark.sql.tests.test_dataframe DataFrameTests'
The failed python tests can pass locally.

@xuanyuanking
Copy link
Member Author

retest this please.

* attempt number while the stage is not determinate and returns none on the contrary.
*/
def indeterminateStageAttemptId(shuffleId: Int): Option[Int] = {
val id = getLocalProperty(SparkContext.INDETERMINATE_STAGE_ATTEMPT_ID_PREFIX + shuffleId)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

do we need to include shuffle id?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we should include shuffle id here, for an indeterminate shuffle map task which parent stage also indeterminate, we both need parent stage id and current id here for shuffle reader and writer. Also for the more complex scenario, for join/cogroup operation, maybe we have more indeterminate parents for single shuffle map stage.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

makes sense

@SparkQA
Copy link

SparkQA commented Apr 4, 2019

Test build #104275 has finished for PR 24110 at commit 012ae1d.

  • This patch fails due to an unknown error code, -9.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Apr 4, 2019

Test build #104287 has finished for PR 24110 at commit dc527dd.

  • This patch fails PySpark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Apr 9, 2019

Test build #104435 has started for PR 24110 at commit a1e6d0c.

@xuanyuanking
Copy link
Member Author

retest this please.

@SparkQA
Copy link

SparkQA commented Apr 11, 2019

Test build #104522 has finished for PR 24110 at commit a1e6d0c.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@xuanyuanking
Copy link
Member Author

After #24359, this pr need little re-implement, and the new parameter in TaskSet is no longer needed. I'm doing this and will give a commit soon after manually run the integrated test.

@SparkQA
Copy link

SparkQA commented Apr 22, 2019

Test build #104806 has finished for PR 24110 at commit b8bc611.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@xuanyuanking
Copy link
Member Author

PR #25620 fix this issue.

@xuanyuanking xuanyuanking deleted the SPARK-25341-3.0 branch September 24, 2019 13:00
otterc pushed a commit to linkedin/spark that referenced this pull request Mar 22, 2023
As the current approach in OneForOneBlockFetcher, we reuse the OpenBlocks protocol to describe the fetch request for shuffle blocks, and it causes the extension work for shuffle fetching like apache#19788 and apache#24110 very awkward.
In this PR, we split the fetch request for shuffle blocks from OpenBlocks which named FetchShuffleBlocks. It's a loose bind with ShuffleBlockId and can easily extend by adding new fields in this protocol.

Existing and new added UT.

Closes apache#24565 from xuanyuanking/SPARK-27665.

Authored-by: Yuanjian Li <[email protected]>
Signed-off-by: Wenchen Fan <[email protected]>
(cherry picked from commit 8949bc7)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

8 participants