Skip to content

Commit

Permalink
[SPARK-28699][CORE] Fix a corner case for aborting indeterminate stage
Browse files Browse the repository at this point in the history
Change the logic of collecting the indeterminate stage, we should look at stages from mapStage, not failedStage during handle FetchFailed.

In the fetch failed error handle logic, the original logic of collecting indeterminate stage from the fetch failed stage. And in the scenario of the fetch failed happened in the first task of this stage, this logic will cause the indeterminate stage to resubmit partially. Eventually, we are capable of getting correctness bug.

It makes the corner case of indeterminate stage abort as expected.

New UT in DAGSchedulerSuite.
Run below integrated test with `local-cluster[5, 2, 5120]`, and set `spark.sql.execution.sortBeforeRepartition`=false, it will abort the indeterminate stage as expected:
```
import scala.sys.process._
import org.apache.spark.TaskContext

val res = spark.range(0, 10000 * 10000, 1).map{ x => (x % 1000, x)}
// kill an executor in the stage that performs repartition(239)
val df = res.repartition(113).map{ x => (x._1 + 1, x._2)}.repartition(239).map { x =>
  if (TaskContext.get.attemptNumber == 0 && TaskContext.get.partitionId < 1 && TaskContext.get.stageAttemptNumber == 0) {
    throw new Exception("pkill -f -n java".!!)
  }
  x
}
val r2 = df.distinct.count()
```

Closes apache#25498 from xuanyuanking/SPARK-28699-followup.

Authored-by: Yuanjian Li <[email protected]>
Signed-off-by: Wenchen Fan <[email protected]>
(cherry picked from commit 0d3a783)
Signed-off-by: Yuanjian Li <[email protected]>
  • Loading branch information
xuanyuanking committed Aug 20, 2019
1 parent 19f7b74 commit a4d7360
Show file tree
Hide file tree
Showing 2 changed files with 7 additions and 24 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -1370,13 +1370,13 @@ class DAGScheduler(
// guaranteed to be determinate, so the input data of the reducers will not change
// even if the map tasks are re-tried.
if (mapStage.rdd.outputDeterministicLevel == DeterministicLevel.INDETERMINATE) {
// It's a little tricky to find all the succeeding stages of `failedStage`, because
// It's a little tricky to find all the succeeding stages of `mapStage`, because
// each stage only know its parents not children. Here we traverse the stages from
// the leaf nodes (the result stages of active jobs), and rollback all the stages
// in the stage chains that connect to the `failedStage`. To speed up the stage
// in the stage chains that connect to the `mapStage`. To speed up the stage
// traversing, we collect the stages to rollback first. If a stage needs to
// rollback, all its succeeding stages need to rollback to.
val stagesToRollback = scala.collection.mutable.HashSet(failedStage)
val stagesToRollback = HashSet[Stage](mapStage)

def collectStagesToRollback(stageChain: List[Stage]): Unit = {
if (stagesToRollback.contains(stageChain.head)) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2527,27 +2527,10 @@ class DAGSchedulerSuite extends SparkFunSuite with LocalSparkContext with TimeLi
FetchFailed(makeBlockManagerId("hostC"), shuffleId2, 0, 0, "ignored"),
null))

val failedStages = scheduler.failedStages.toSeq
assert(failedStages.length == 2)
// Shuffle blocks of "hostC" is lost, so first task of the `shuffleMapRdd2` needs to retry.
assert(failedStages.collect {
case stage: ShuffleMapStage if stage.shuffleDep.shuffleId == shuffleId2 => stage
}.head.findMissingPartitions() == Seq(0))
// The result stage is still waiting for its 2 tasks to complete
assert(failedStages.collect {
case stage: ResultStage => stage
}.head.findMissingPartitions() == Seq(0, 1))

scheduler.resubmitFailedStages()

// The first task of the `shuffleMapRdd2` failed with fetch failure
runEvent(makeCompletionEvent(
taskSets(3).tasks(0),
FetchFailed(makeBlockManagerId("hostA"), shuffleId1, 0, 0, "ignored"),
null))

// The job should fail because Spark can't rollback the shuffle map stage.
assert(failure != null && failure.getMessage.contains("Spark cannot rollback"))
// The second shuffle map stage need to rerun, the job will abort for the indeterminate
// stage rerun.
assert(failure != null && failure.getMessage
.contains("Spark cannot rollback the ShuffleMapStage 1"))
}

private def assertResultStageFailToRollback(mapRdd: MyRDD): Unit = {
Expand Down

0 comments on commit a4d7360

Please sign in to comment.