Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-28699][Core] Fix a corner case for aborting indeterminate stage #25498

Closed
wants to merge 2 commits into from

Conversation

xuanyuanking
Copy link
Member

@xuanyuanking xuanyuanking commented Aug 19, 2019

What changes were proposed in this pull request?

When collecting the indeterminate stages for handling FetchFailed, we should look at stages from mapStage, instead of failedStage.

Why are the changes needed?

In the fetch failed error handle logic, the original logic of collecting indeterminate stage from the fetch failed stage. And in the scenario of the fetch failed happened in the first task of this stage, this logic will cause the indeterminate stage to resubmit partially. Eventually, we are capable of getting correctness bug.

Does this PR introduce any user-facing change?

It makes the corner case of indeterminate stage abort as expected.

How was this patch tested?

New UT in DAGSchedulerSuite.
Run below integrated test with local-cluster[5, 2, 5120], and set spark.sql.execution.sortBeforeRepartition=false, it will abort the indeterminate stage as expected:

import scala.sys.process._
import org.apache.spark.TaskContext

val res = spark.range(0, 10000 * 10000, 1).map{ x => (x % 1000, x)}
// kill an executor in the stage that performs repartition(239)
val df = res.repartition(113).map{ x => (x._1 + 1, x._2)}.repartition(239).map { x =>
  if (TaskContext.get.attemptNumber == 0 && TaskContext.get.partitionId < 1 && TaskContext.get.stageAttemptNumber == 0) {
    throw new Exception("pkill -f -n java".!!)
  }
  x
}
val r2 = df.distinct.count()

@cloud-fan
Copy link
Contributor

good catch! LGTM

@SparkQA
Copy link

SparkQA commented Aug 19, 2019

Test build #109349 has finished for PR 25498 at commit 08bba60.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@@ -2741,27 +2741,8 @@ class DAGSchedulerSuite extends SparkFunSuite with LocalSparkContext with TimeLi
FetchFailed(makeBlockManagerId("hostC"), shuffleId2, 0, 0, "ignored"),
null))

val failedStages = scheduler.failedStages.toSeq
assert(failedStages.length == 2)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

not big deal, but I think this assert still applies?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

After this change, failedStages.length == 0 because we do the cleanup work in failJobAndIndependentStages by cleanupStateForJobAndIndependentStages.

@SparkQA
Copy link

SparkQA commented Aug 20, 2019

Test build #109378 has finished for PR 25498 at commit 485157a.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@cloud-fan cloud-fan closed this in 0d3a783 Aug 20, 2019
@cloud-fan
Copy link
Contributor

thanks, merging to master!

@xuanyuanking can you send PRs for branch 2.3 and 2.4? the code conflicts.

@xuanyuanking
Copy link
Member Author

Sure, I'm doing the backport now.

xuanyuanking added a commit to xuanyuanking/spark that referenced this pull request Aug 20, 2019
Change the logic of collecting the indeterminate stage, we should look at stages from mapStage, not failedStage during handle FetchFailed.

In the fetch failed error handle logic, the original logic of collecting indeterminate stage from the fetch failed stage. And in the scenario of the fetch failed happened in the first task of this stage, this logic will cause the indeterminate stage to resubmit partially. Eventually, we are capable of getting correctness bug.

It makes the corner case of indeterminate stage abort as expected.

New UT in DAGSchedulerSuite.
Run below integrated test with `local-cluster[5, 2, 5120]`, and set `spark.sql.execution.sortBeforeRepartition`=false, it will abort the indeterminate stage as expected:
```
import scala.sys.process._
import org.apache.spark.TaskContext

val res = spark.range(0, 10000 * 10000, 1).map{ x => (x % 1000, x)}
// kill an executor in the stage that performs repartition(239)
val df = res.repartition(113).map{ x => (x._1 + 1, x._2)}.repartition(239).map { x =>
  if (TaskContext.get.attemptNumber == 0 && TaskContext.get.partitionId < 1 && TaskContext.get.stageAttemptNumber == 0) {
    throw new Exception("pkill -f -n java".!!)
  }
  x
}
val r2 = df.distinct.count()
```

Closes apache#25498 from xuanyuanking/SPARK-28699-followup.

Authored-by: Yuanjian Li <[email protected]>
Signed-off-by: Wenchen Fan <[email protected]>
(cherry picked from commit 0d3a783)
Signed-off-by: Yuanjian Li <[email protected]>
@xuanyuanking xuanyuanking deleted the SPARK-28699-followup branch August 20, 2019 07:38
xuanyuanking added a commit to xuanyuanking/spark that referenced this pull request Aug 20, 2019
Change the logic of collecting the indeterminate stage, we should look at stages from mapStage, not failedStage during handle FetchFailed.

In the fetch failed error handle logic, the original logic of collecting indeterminate stage from the fetch failed stage. And in the scenario of the fetch failed happened in the first task of this stage, this logic will cause the indeterminate stage to resubmit partially. Eventually, we are capable of getting correctness bug.

It makes the corner case of indeterminate stage abort as expected.

New UT in DAGSchedulerSuite.
Run below integrated test with `local-cluster[5, 2, 5120]`, and set `spark.sql.execution.sortBeforeRepartition`=false, it will abort the indeterminate stage as expected:
```
import scala.sys.process._
import org.apache.spark.TaskContext

val res = spark.range(0, 10000 * 10000, 1).map{ x => (x % 1000, x)}
// kill an executor in the stage that performs repartition(239)
val df = res.repartition(113).map{ x => (x._1 + 1, x._2)}.repartition(239).map { x =>
  if (TaskContext.get.attemptNumber == 0 && TaskContext.get.partitionId < 1 && TaskContext.get.stageAttemptNumber == 0) {
    throw new Exception("pkill -f -n java".!!)
  }
  x
}
val r2 = df.distinct.count()
```

Closes apache#25498 from xuanyuanking/SPARK-28699-followup.

Authored-by: Yuanjian Li <[email protected]>
Signed-off-by: Wenchen Fan <[email protected]>
(cherry picked from commit 0d3a783)
Signed-off-by: Yuanjian Li <[email protected]>
@dongjoon-hyun
Copy link
Member

Thank you, @xuanyuanking , @cloud-fan and @viirya !
I'm waiting for backporting PRs.

Also, cc @kiszk for branch-2.3.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

5 participants