Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-23433][SPARK-25250] [CORE] Later created TaskSet should learn about the finished partitions #23871

Closed
wants to merge 10 commits into from

Conversation

Ngone51
Copy link
Member

@Ngone51 Ngone51 commented Feb 22, 2019

What changes were proposed in this pull request?

This is an optional solution for #22806 .

#21131 firstly implement that a previous successful completed task from zombie TaskSetManager could also succeed the active TaskSetManager, which based on an assumption that an active TaskSetManager always exists for that stage when this happen. But that's not always true as an active TaskSetManager may haven't been created when a previous task succeed, and this is the reason why #22806 hit the issue.

This pr extends #21131 's behavior by adding stageIdToFinishedPartitions into TaskSchedulerImpl, which recording the finished partition whenever a task(from zombie or active) succeed. Thus, a later created active TaskSetManager could also learn about the finished partition by looking into stageIdToFinishedPartitions and won't launch any duplicate tasks.

How was this patch tested?

Add.

@Ngone51
Copy link
Member Author

Ngone51 commented Feb 22, 2019

ping @squito @cloud-fan @pgandhi999 please have a look at this.

@pgandhi999
Copy link

@Ngone51 I understand that you had a proposal and we were actively discussing on various solutions in the PR #22806 , but however, I have been working on that PR tirelessly for a few months and we still have an ongoing discussion going on there. Any specific reasons as to why did you create your own PR for the same issue? WDYT @squito @cloud-fan ?

@Ngone51
Copy link
Member Author

Ngone51 commented Feb 22, 2019

I saw the quite different understanding when I reviewed your updated pr corresponding to my proposal. So, I think it may be better to implement it by myself. And I just want to make the changes clean and easy for people(not only the guys already involved) to review, since it just bases on #21131 and no need to mix in other changes. I'm sorry if you do care about this and I'm fine for crediting this to you. No offence.

@pgandhi999
Copy link

ok keeping that aside for now, I was curious to know how did you test the changes? I have tested my changes on a really large dataset(10 TB approx.) where the issue was more easily reproducible and prominent in the event of fetch failures. If you have tested both the issue as well the fix with your change, posting screenshots from the UI might help here.

@pgandhi999
Copy link

ok to test

@squito
Copy link
Contributor

squito commented Feb 22, 2019

@pgandhi999 I know this can be frustrating, but you shouldn't take offense at this. you both have been putting a lot of effort in on this and have contributed a ton. regardless of whose change is actually merged, the discussion and thought from everyone has driven this forward. sometimes when an idea isn't properly understood in normal english, its just easier to explain an idea by showing it with the actual code.

This is one of the thornier problems I've seen, as you've seen I've made more than one mistake on it myself.

@squito
Copy link
Contributor

squito commented Feb 22, 2019

Jenkins, add to whitelist

Copy link
Contributor

@squito squito left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

overall I think this approach makes sense, but lets get more input from others. I did leave some comments on the code, but nothing big.

I will be gone for a week, so I will probably not make any more comments on this for a while.

@@ -208,7 +211,7 @@ private[spark] class TaskSchedulerImpl(
taskSetsByStageIdAndAttempt.getOrElseUpdate(stage, new HashMap[Int, TaskSetManager])
stageTaskSets(taskSet.stageAttemptId) = manager
val conflictingTaskSet = stageTaskSets.exists { case (_, ts) =>
ts.taskSet != taskSet && !ts.isZombie
ts.taskSet != manager.taskSet && !ts.isZombie
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why this change? taskSet should be the same as manager.taskSet, as manager is created from taskSet with val manager = createTaskSetManager(taskSet, maxTaskFailures)

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have the same question, why use manager.taskSet here?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Previously, I filter the already completed tasks in createTaskSetManager() and creates a new TaskSet by filtered tasks. So, the submitted TaskSet form DAGScheduler is not the same with TaskSet in TaskSetManager. But it failed plenty of unit tests since those tests would keep the original submitted TaskSet for some operations.
And now I have moved the filter logic into the TaskSetManager.

val finishedPartitions =
stageIdToFinishedPartitions.getOrElseUpdate(taskSet.stageId, new HashSet[Int])
// filter the task which has been finished by previous attempts
val tasks = taskSet.tasks.filterNot{ t => finishedPartitions(t.partitionId) }
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd change this comment a bit, to explain why this is necessary, even briefly. Something like
"The DAGScheduler does not always know all the tasks that have been completed by other tasksets when completing a stage, so we do an extra filter here, while holding the TaskSchedulerImpl lock. See SPARK-25250 and markPartitionCompletedInAllTaskSets()`

@@ -1206,6 +1206,67 @@ class TaskSchedulerImplSuite extends SparkFunSuite with LocalSparkContext with B
}
}

test("successful tasks from previous attempts could be learnt by later active taskset") {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this test is good, but could it instead be folded into "Completions in zombie tasksets update status of non-zombie taskset"? that would make it easier to maintain (if you think there is a reason it needs to be separate, feel free to push back)

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good idea!

assert(taskScheduler.stageIdToFinishedPartitions(0).contains(1))

// submit a new taskset with 10 tasks after someone previous task attempt succeed
val attempt1 = FakeTask.createTaskSet(10, stageId = 0, stageAttemptId = 1)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

also be sure to expand the comment here, about why this is an important test case -- it simulates the race between the DAGScheduler event loop and the task-result-getter threads, where a taskset gets submitted with already complete tasks.

* partitions for that stage here and exclude the already finished tasks when we creating active
* TaskSetManagers later by looking into stageIdToFinishedPartitions. Thus, active TaskSetManager
* could be always notified about the finished partitions whether it has been created or not at
* the time we call this method.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This comment is good, but a little too wordy. I'd cut it down to something like:

There is also the possibility that the DAGScheduler submits another taskset at the same time as we're marking a task completed here -- that taskset would have a task for a partition that was already completed. We maintain the set of finished partitions in stageIdToFinishedPartitions, protected by this, so we can filter those tasks when the taskset is submitted. See SPARK-25250 for more details.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

And can you also add a comment that this should only be called with a lock on this?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

updated.

@@ -96,6 +96,9 @@ private[spark] class TaskSchedulerImpl(
private[scheduler] val taskIdToTaskSetManager = new ConcurrentHashMap[Long, TaskSetManager]
val taskIdToExecutorId = new HashMap[Long, String]

// Protected by `this`
private[scheduler] val stageIdToFinishedPartitions = new HashMap[Int, HashSet[Int]]
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

you can use a HashMap[Int, BitSet] here to save some memory.

@pgandhi999
Copy link

@squito That is indeed a good point and well taken. I also did run @Ngone51 's code for testing and the tests look good. I shall continue to review the PR meanwhile. Thank you.

@@ -297,6 +306,7 @@ private[spark] class TaskSchedulerImpl(
taskSetsForStage -= manager.taskSet.stageAttemptId
if (taskSetsForStage.isEmpty) {
taskSetsByStageIdAndAttempt -= manager.taskSet.stageId
stageIdToFinishedPartitions -= manager.taskSet.stageId

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we also add this code when calling killTasks?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I didn't find method killTasks in TaskSchedulerImpl, and for some similar func, e.g. cancelTasks, I think it's unnecessary.

@srowen
Copy link
Member

srowen commented Feb 22, 2019

I get where you're coming from @pgandhi999 . I'd read this as part of one larger discussion, and indeed it happens sometimes that the easiest way to consider an alternative is to take a look at it concretely. I don't read this as a hijack or anything.

I haven't read the long discussion on the other PR but can see it's a tricky change and you have a couple people with deep experience working on it with you to get it right. It may be that you cross-pollinate ideas between the two PRs. It doesn't so much matter which one gets merged as long as it's a consensus and the best we can come up with.

If the issue is credit then obviously anyone who reads these knows you've driven a lot of the discussion on the important change and this is based on your change.

@SparkQA
Copy link

SparkQA commented Feb 22, 2019

Test build #102671 has finished for PR 23871 at commit f132194.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Feb 23, 2019

Test build #102711 has finished for PR 23871 at commit ee1f5df.

  • This patch fails Scala style tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@Ngone51
Copy link
Member Author

Ngone51 commented Feb 23, 2019

@pgandhi999 Thanks for your understanding and helping test this, appreciate a lot.

@Ngone51
Copy link
Member Author

Ngone51 commented Feb 23, 2019

@pgandhi999 @squito Thank you for review, have updated.

@SparkQA
Copy link

SparkQA commented Feb 23, 2019

Test build #102712 has finished for PR 23871 at commit 76bb765.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

// holding the TaskSchedulerImpl lock.
// See SPARK-25250 and markPartitionCompletedInAllTaskSets()`
sched.stageIdToFinishedPartitions
.getOrElseUpdate(taskSet.stageId, new BitSet)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we can avoid creating empty bit set

sched.stageIdToFinishedPartitions.get(taskSet.stageId).foreach {
  finishedPartitions => finishedPartitions.foreach(markPartitionCompleted(_, None))
}

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

good idea!

*/
private[scheduler] def markPartitionCompletedInAllTaskSets(
stageId: Int,
partitionId: Int,
taskInfo: TaskInfo) = {
taskInfo: Option[TaskInfo]) = {
Copy link
Contributor

@cloud-fan cloud-fan Feb 26, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

seems no one calls this method with taskInfo = None

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

good catch!

@cloud-fan
Copy link
Contributor

can we address #21131 (comment) as well?

@Ngone51
Copy link
Member Author

Ngone51 commented Feb 26, 2019

can we address #21131 (comment) as well?

I revisit the discussion and code, I think maybe we could not remove maybeFinishTaskSet() there. Think about the case:

Stage 0 has 5 partitions and there's an active TSM 0.1 and a zombie TSM 0.0 in TaskScheduler currently. TSM 0.1 has finished partition [0, 1, 2, 3] and no running tasks(may be blocked by resource offer or delay scheduling) at the moment a succeed task from TSM 0.0 finished. Thus, we have successful[4]=true for TSM 0.1 by calling markPartitionCompleted. So, if we do not call maybeFinishTaskSet in the markPartitionCompleted, TSM 0.1 can not finish at the end because it has no tasks to launch, which results in we could not call maybeFinishTaskSet at somewhere else.

WDYT ? @cloud-fan

@cloud-fan
Copy link
Contributor

makes sense, since markPartitionCompleted is called in more than one place now.

@SparkQA
Copy link

SparkQA commented Feb 27, 2019

Test build #102794 has finished for PR 23871 at commit c53961f.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

// holding the TaskSchedulerImpl lock.
// See SPARK-25250 and markPartitionCompletedInAllTaskSets()`
sched.stageIdToFinishedPartitions.get(taskSet.stageId).foreach {
finishedPartitions => finishedPartitions.foreach(markPartitionCompleted(_, None))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How do we guarantee that there is no race condition between the creation of TaskSetManager and the updating of TaskSchedulerImpl. stageIdToFinishedPartitions?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we always hold lock on TaskSchedulerImpl while accessing stageIdToFinishedPartitions:

  • TaskSchedulerImpl.submitTasks

  • TaskSchedulerImpl.handleSuccessfulTask

  • TaskSchedulerImpl.taskSetFinished

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ah i see, TaskSetManager is always created at * TaskSchedulerImpl.submitTasks

@cloud-fan
Copy link
Contributor

LGTM

@SparkQA
Copy link

SparkQA commented Feb 27, 2019

Test build #102814 has finished for PR 23871 at commit 0b41d86.

  • This patch fails to build.
  • This patch merges cleanly.
  • This patch adds no public classes.

Copy link
Contributor

@squito squito left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just some very minor style stuff, otherwise lgtm

// been completed by other tasksets when completing a stage, so we mark
// those tasks as finished here to avoid launching duplicate tasks, while
// holding the TaskSchedulerImpl lock.
// See SPARK-25250 and markPartitionCompletedInAllTaskSets()`
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

super nit: run-on sentence and stray '`'.

@@ -797,16 +801,20 @@ private[spark] class TaskSetManager(
maybeFinishTaskSet()
}

private[scheduler] def markPartitionCompleted(partitionId: Int, taskInfo: TaskInfo): Unit = {
private[scheduler] def markPartitionCompleted(partitionId: Int, taskInfo: Option[TaskInfo])
: Unit = {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

super nit: I'm pretty sure that even if just the return type goes over the line, our style is to switch to multi-line definition

@@ -1133,30 +1133,51 @@ class TaskSchedulerImplSuite extends SparkFunSuite with LocalSparkContext with B
assert(tsm.runningTasks === 9)
tsm
}
// we've now got 2 zombie attempts, each with 9 tasks still active but zero active attempt
// in taskScheduler.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i don't understand "zero active attempt in taskScheduler", can you explain what you mean?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm, I mean "there's no active attempt in taskScheduler", updated.

// in taskScheduler.

// finish partition 1,2 by completing the tasks before a new attempt for the same stage submit.
// And it's possible since the behaviour of submitting new attempt and handling successful task
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is possible since ...

val finalAttempt = FakeTask.createTaskSet(10, stageAttemptId = 2)
taskScheduler.submitTasks(finalAttempt)
val finalTsm = taskScheduler.taskSetManagerForAttempt(0, 2).get
// Though, finalTsm gets submitted after some tasks succeeds, but it could also know about the
// finished partition by looking into `stageIdToFinishedPartitions` when it is being created,
// so that it won't launch any duplicate tasks later.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd reword this to

Though finalTSM gets submitted with 10 tasks, the call to taskScheduler.submitTasks should realize that 2 tasks have already completed, and mark them appropriately, so it won't launch any duplicate tasks later (SPARK-25250).

@@ -124,7 +124,8 @@ class FakeTaskScheduler(sc: SparkContext, liveExecutors: (String, String)* /* ex
}
}

override def taskSetFinished(manager: TaskSetManager): Unit = finishedManagers += manager
override def taskSetFinished(manager: TaskSetManager)
: Unit = finishedManagers += manager
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if this really needs to be multiline (though I dont' think it does), be sure to use multi-line def and wrap the body in braces

@Ngone51
Copy link
Member Author

Ngone51 commented Mar 6, 2019

@squito Thanks for your comments, have updated.

@SparkQA
Copy link

SparkQA commented Mar 6, 2019

Test build #103075 has finished for PR 23871 at commit 989c9d3.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

Copy link
Contributor

@squito squito left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

lgtm

asfgit pushed a commit that referenced this pull request Mar 6, 2019
…bout the finished partitions

## What changes were proposed in this pull request?

This is an optional solution for #22806 .

#21131 firstly implement that a previous successful completed task from zombie TaskSetManager could also succeed the active TaskSetManager, which based on an assumption that an active TaskSetManager always exists for that stage when this happen.  But that's not always true as an active TaskSetManager may haven't been created when a previous task succeed, and this is the reason why #22806 hit the issue.

This pr extends #21131 's behavior by adding `stageIdToFinishedPartitions` into TaskSchedulerImpl, which recording the finished partition whenever a task(from zombie or active) succeed. Thus, a later created active TaskSetManager could also learn about the finished partition by looking into `stageIdToFinishedPartitions ` and won't launch any duplicate tasks.

## How was this patch tested?

Add.

Closes #23871 from Ngone51/dev-23433-25250.

Lead-authored-by: wuyi <[email protected]>
Co-authored-by: Ngone51 <[email protected]>
Signed-off-by: Imran Rashid <[email protected]>
(cherry picked from commit e5c6143)
Signed-off-by: Imran Rashid <[email protected]>
@asfgit asfgit closed this in e5c6143 Mar 6, 2019
@squito
Copy link
Contributor

squito commented Mar 6, 2019

merged to master & 2.4. @Ngone51 there was a merge conflict against branch 2.3, would you like to open another PR for that branch?

Thanks for all the help from everyone on this. @pgandhi999 as a way to split credit, I assigned the jira to you. definitely appreciate all the work you put in on it

asfgit pushed a commit that referenced this pull request Mar 6, 2019
…a stage

## What changes were proposed in this pull request?

This is another attempt to fix the more-than-one-active-task-set-managers bug.

#17208 is the first attempt. It marks the TSM as zombie before sending a task completion event to DAGScheduler. This is necessary, because when the DAGScheduler gets the task completion event, and it's for the last partition, then the stage is finished. However, if it's a shuffle stage and it has missing map outputs, DAGScheduler will resubmit it(see the [code](https://github.com/apache/spark/blob/v2.4.0/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala#L1416-L1422)) and create a new TSM for this stage. This leads to more than one active TSM of a stage and fail.

This fix has a hole: Let's say a stage has 10 partitions and 2 task set managers: TSM1(zombie) and TSM2(active). TSM1 has a running task for partition 10 and it completes. TSM2 finishes tasks for partitions 1-9, and thinks he is still active because he hasn't finished partition 10 yet. However, DAGScheduler gets task completion events for all the 10 partitions and thinks the stage is finished. Then the same problem occurs: DAGScheduler may resubmit the stage and cause more than one actice TSM error.

#21131 fixed this hole by notifying all the task set managers when a task finishes. For the above case, TSM2 will know that partition 10 is already completed, so he can mark himself as zombie after partitions 1-9 are completed.

However, #21131 still has a hole: TSM2 may be created after the task from TSM1 is completed. Then TSM2 can't get notified about the task completion, and leads to the more than one active TSM error.

#22806 and #23871 are created to fix this hole. However the fix is complicated and there are still ongoing discussions.

This PR proposes a simple fix, which can be easy to backport: mark all existing task set managers as zombie when trying to create a new task set manager.

After this PR, #21131 is still necessary, to avoid launching unnecessary tasks and fix [SPARK-25250](https://issues.apache.org/jira/browse/SPARK-25250 ). #22806 and #23871 are its followups to fix the hole.

## How was this patch tested?

existing tests.

Closes #23927 from cloud-fan/scheduler.

Authored-by: Wenchen Fan <[email protected]>
Signed-off-by: Imran Rashid <[email protected]>
(cherry picked from commit cb20fbc)
Signed-off-by: Imran Rashid <[email protected]>
asfgit pushed a commit that referenced this pull request Mar 6, 2019
…a stage

## What changes were proposed in this pull request?

This is another attempt to fix the more-than-one-active-task-set-managers bug.

#17208 is the first attempt. It marks the TSM as zombie before sending a task completion event to DAGScheduler. This is necessary, because when the DAGScheduler gets the task completion event, and it's for the last partition, then the stage is finished. However, if it's a shuffle stage and it has missing map outputs, DAGScheduler will resubmit it(see the [code](https://github.com/apache/spark/blob/v2.4.0/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala#L1416-L1422)) and create a new TSM for this stage. This leads to more than one active TSM of a stage and fail.

This fix has a hole: Let's say a stage has 10 partitions and 2 task set managers: TSM1(zombie) and TSM2(active). TSM1 has a running task for partition 10 and it completes. TSM2 finishes tasks for partitions 1-9, and thinks he is still active because he hasn't finished partition 10 yet. However, DAGScheduler gets task completion events for all the 10 partitions and thinks the stage is finished. Then the same problem occurs: DAGScheduler may resubmit the stage and cause more than one actice TSM error.

#21131 fixed this hole by notifying all the task set managers when a task finishes. For the above case, TSM2 will know that partition 10 is already completed, so he can mark himself as zombie after partitions 1-9 are completed.

However, #21131 still has a hole: TSM2 may be created after the task from TSM1 is completed. Then TSM2 can't get notified about the task completion, and leads to the more than one active TSM error.

#22806 and #23871 are created to fix this hole. However the fix is complicated and there are still ongoing discussions.

This PR proposes a simple fix, which can be easy to backport: mark all existing task set managers as zombie when trying to create a new task set manager.

After this PR, #21131 is still necessary, to avoid launching unnecessary tasks and fix [SPARK-25250](https://issues.apache.org/jira/browse/SPARK-25250 ). #22806 and #23871 are its followups to fix the hole.

## How was this patch tested?

existing tests.

Closes #23927 from cloud-fan/scheduler.

Authored-by: Wenchen Fan <[email protected]>
Signed-off-by: Imran Rashid <[email protected]>
(cherry picked from commit cb20fbc)
Signed-off-by: Imran Rashid <[email protected]>
asfgit pushed a commit that referenced this pull request Mar 6, 2019
…a stage

## What changes were proposed in this pull request?

This is another attempt to fix the more-than-one-active-task-set-managers bug.

#17208 is the first attempt. It marks the TSM as zombie before sending a task completion event to DAGScheduler. This is necessary, because when the DAGScheduler gets the task completion event, and it's for the last partition, then the stage is finished. However, if it's a shuffle stage and it has missing map outputs, DAGScheduler will resubmit it(see the [code](https://github.com/apache/spark/blob/v2.4.0/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala#L1416-L1422)) and create a new TSM for this stage. This leads to more than one active TSM of a stage and fail.

This fix has a hole: Let's say a stage has 10 partitions and 2 task set managers: TSM1(zombie) and TSM2(active). TSM1 has a running task for partition 10 and it completes. TSM2 finishes tasks for partitions 1-9, and thinks he is still active because he hasn't finished partition 10 yet. However, DAGScheduler gets task completion events for all the 10 partitions and thinks the stage is finished. Then the same problem occurs: DAGScheduler may resubmit the stage and cause more than one actice TSM error.

#21131 fixed this hole by notifying all the task set managers when a task finishes. For the above case, TSM2 will know that partition 10 is already completed, so he can mark himself as zombie after partitions 1-9 are completed.

However, #21131 still has a hole: TSM2 may be created after the task from TSM1 is completed. Then TSM2 can't get notified about the task completion, and leads to the more than one active TSM error.

#22806 and #23871 are created to fix this hole. However the fix is complicated and there are still ongoing discussions.

This PR proposes a simple fix, which can be easy to backport: mark all existing task set managers as zombie when trying to create a new task set manager.

After this PR, #21131 is still necessary, to avoid launching unnecessary tasks and fix [SPARK-25250](https://issues.apache.org/jira/browse/SPARK-25250 ). #22806 and #23871 are its followups to fix the hole.

## How was this patch tested?

existing tests.

Closes #23927 from cloud-fan/scheduler.

Authored-by: Wenchen Fan <[email protected]>
Signed-off-by: Imran Rashid <[email protected]>
@pgandhi999
Copy link

Thank you @squito . Once again I appreciate all the work that you guys do and happy to know that we all learned something as a whole by the end of it.

@Ngone51
Copy link
Member Author

Ngone51 commented Mar 7, 2019

Thank you @pgandhi999 @squito @cloud-fan @srowen .

I'll submit a pr to help backport to 2.3 later, @squito .

cloud-fan added a commit that referenced this pull request Apr 29, 2019
…eption many times

## What changes were proposed in this pull request?

https://issues.apache.org/jira/browse/SPARK-25250 reports a bug that, a task which is failed with `CommitDeniedException` gets retried many times.

This can happen when a stage has 2 task set managers, one is zombie, one is active. A task from the zombie TSM completes, and commits to a central coordinator(assuming it's a file writing task). Then the corresponding task from the active TSM will fail with `CommitDeniedException`. `CommitDeniedException.countTowardsTaskFailures` is false, so the active TSM will keep retrying this task, until the job finishes. This wastes resource a lot.

#21131 firstly implements that a previous successful completed task from zombie `TaskSetManager` could mark the task of the same partition completed in the active `TaskSetManager`. Later #23871 improves the implementation to cover a corner case that, an active `TaskSetManager` hasn't been created when a previous task succeed.

However, #23871 has a bug and was reverted in #24359. With hindsight, #23781 is fragile because we need to sync the states between `DAGScheduler` and `TaskScheduler`, about which partitions are completed.

This PR proposes a new fix:
1. When `DAGScheduler` gets a task success event from an earlier attempt, notify the `TaskSchedulerImpl` about it
2. When `TaskSchedulerImpl` knows a partition is already completed, ask the active `TaskSetManager` to mark the corresponding task as finished, if the task is not finished yet.

This fix covers the corner case, because:
1. If `DAGScheduler` gets the task completion event from zombie TSM before submitting the new stage attempt, then `DAGScheduler` knows that this partition is completed, and it will exclude this partition when creating task set for the new stage attempt. See `DAGScheduler.submitMissingTasks`
2. If `DAGScheduler` gets the task completion event from zombie TSM after submitting the new stage attempt, then the active TSM is already created.

Compared to the previous fix, the message loop becomes longer, so it's likely that, the active task set manager has already retried the task multiple times. But this failure window won't be too big, and we want to avoid the worse case that retries the task many times until the job finishes. So this solution is acceptable.

## How was this patch tested?

a new test case.

Closes #24375 from cloud-fan/fix2.

Authored-by: Wenchen Fan <[email protected]>
Signed-off-by: Wenchen Fan <[email protected]>
kai-chi pushed a commit to kai-chi/spark that referenced this pull request Jul 23, 2019
…bout the finished partitions

## What changes were proposed in this pull request?

This is an optional solution for apache#22806 .

apache#21131 firstly implement that a previous successful completed task from zombie TaskSetManager could also succeed the active TaskSetManager, which based on an assumption that an active TaskSetManager always exists for that stage when this happen.  But that's not always true as an active TaskSetManager may haven't been created when a previous task succeed, and this is the reason why apache#22806 hit the issue.

This pr extends apache#21131 's behavior by adding `stageIdToFinishedPartitions` into TaskSchedulerImpl, which recording the finished partition whenever a task(from zombie or active) succeed. Thus, a later created active TaskSetManager could also learn about the finished partition by looking into `stageIdToFinishedPartitions ` and won't launch any duplicate tasks.

## How was this patch tested?

Add.

Closes apache#23871 from Ngone51/dev-23433-25250.

Lead-authored-by: wuyi <[email protected]>
Co-authored-by: Ngone51 <[email protected]>
Signed-off-by: Imran Rashid <[email protected]>
(cherry picked from commit e5c6143)
Signed-off-by: Imran Rashid <[email protected]>
kai-chi pushed a commit to kai-chi/spark that referenced this pull request Jul 23, 2019
…a stage

## What changes were proposed in this pull request?

This is another attempt to fix the more-than-one-active-task-set-managers bug.

apache#17208 is the first attempt. It marks the TSM as zombie before sending a task completion event to DAGScheduler. This is necessary, because when the DAGScheduler gets the task completion event, and it's for the last partition, then the stage is finished. However, if it's a shuffle stage and it has missing map outputs, DAGScheduler will resubmit it(see the [code](https://github.com/apache/spark/blob/v2.4.0/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala#L1416-L1422)) and create a new TSM for this stage. This leads to more than one active TSM of a stage and fail.

This fix has a hole: Let's say a stage has 10 partitions and 2 task set managers: TSM1(zombie) and TSM2(active). TSM1 has a running task for partition 10 and it completes. TSM2 finishes tasks for partitions 1-9, and thinks he is still active because he hasn't finished partition 10 yet. However, DAGScheduler gets task completion events for all the 10 partitions and thinks the stage is finished. Then the same problem occurs: DAGScheduler may resubmit the stage and cause more than one actice TSM error.

apache#21131 fixed this hole by notifying all the task set managers when a task finishes. For the above case, TSM2 will know that partition 10 is already completed, so he can mark himself as zombie after partitions 1-9 are completed.

However, apache#21131 still has a hole: TSM2 may be created after the task from TSM1 is completed. Then TSM2 can't get notified about the task completion, and leads to the more than one active TSM error.

apache#22806 and apache#23871 are created to fix this hole. However the fix is complicated and there are still ongoing discussions.

This PR proposes a simple fix, which can be easy to backport: mark all existing task set managers as zombie when trying to create a new task set manager.

After this PR, apache#21131 is still necessary, to avoid launching unnecessary tasks and fix [SPARK-25250](https://issues.apache.org/jira/browse/SPARK-25250 ). apache#22806 and apache#23871 are its followups to fix the hole.

## How was this patch tested?

existing tests.

Closes apache#23927 from cloud-fan/scheduler.

Authored-by: Wenchen Fan <[email protected]>
Signed-off-by: Imran Rashid <[email protected]>
(cherry picked from commit cb20fbc)
Signed-off-by: Imran Rashid <[email protected]>
kai-chi pushed a commit to kai-chi/spark that referenced this pull request Jul 25, 2019
…bout the finished partitions

## What changes were proposed in this pull request?

This is an optional solution for apache#22806 .

apache#21131 firstly implement that a previous successful completed task from zombie TaskSetManager could also succeed the active TaskSetManager, which based on an assumption that an active TaskSetManager always exists for that stage when this happen.  But that's not always true as an active TaskSetManager may haven't been created when a previous task succeed, and this is the reason why apache#22806 hit the issue.

This pr extends apache#21131 's behavior by adding `stageIdToFinishedPartitions` into TaskSchedulerImpl, which recording the finished partition whenever a task(from zombie or active) succeed. Thus, a later created active TaskSetManager could also learn about the finished partition by looking into `stageIdToFinishedPartitions ` and won't launch any duplicate tasks.

## How was this patch tested?

Add.

Closes apache#23871 from Ngone51/dev-23433-25250.

Lead-authored-by: wuyi <[email protected]>
Co-authored-by: Ngone51 <[email protected]>
Signed-off-by: Imran Rashid <[email protected]>
(cherry picked from commit e5c6143)
Signed-off-by: Imran Rashid <[email protected]>
kai-chi pushed a commit to kai-chi/spark that referenced this pull request Jul 25, 2019
…a stage

## What changes were proposed in this pull request?

This is another attempt to fix the more-than-one-active-task-set-managers bug.

apache#17208 is the first attempt. It marks the TSM as zombie before sending a task completion event to DAGScheduler. This is necessary, because when the DAGScheduler gets the task completion event, and it's for the last partition, then the stage is finished. However, if it's a shuffle stage and it has missing map outputs, DAGScheduler will resubmit it(see the [code](https://github.com/apache/spark/blob/v2.4.0/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala#L1416-L1422)) and create a new TSM for this stage. This leads to more than one active TSM of a stage and fail.

This fix has a hole: Let's say a stage has 10 partitions and 2 task set managers: TSM1(zombie) and TSM2(active). TSM1 has a running task for partition 10 and it completes. TSM2 finishes tasks for partitions 1-9, and thinks he is still active because he hasn't finished partition 10 yet. However, DAGScheduler gets task completion events for all the 10 partitions and thinks the stage is finished. Then the same problem occurs: DAGScheduler may resubmit the stage and cause more than one actice TSM error.

apache#21131 fixed this hole by notifying all the task set managers when a task finishes. For the above case, TSM2 will know that partition 10 is already completed, so he can mark himself as zombie after partitions 1-9 are completed.

However, apache#21131 still has a hole: TSM2 may be created after the task from TSM1 is completed. Then TSM2 can't get notified about the task completion, and leads to the more than one active TSM error.

apache#22806 and apache#23871 are created to fix this hole. However the fix is complicated and there are still ongoing discussions.

This PR proposes a simple fix, which can be easy to backport: mark all existing task set managers as zombie when trying to create a new task set manager.

After this PR, apache#21131 is still necessary, to avoid launching unnecessary tasks and fix [SPARK-25250](https://issues.apache.org/jira/browse/SPARK-25250 ). apache#22806 and apache#23871 are its followups to fix the hole.

## How was this patch tested?

existing tests.

Closes apache#23927 from cloud-fan/scheduler.

Authored-by: Wenchen Fan <[email protected]>
Signed-off-by: Imran Rashid <[email protected]>
(cherry picked from commit cb20fbc)
Signed-off-by: Imran Rashid <[email protected]>
kai-chi pushed a commit to kai-chi/spark that referenced this pull request Aug 1, 2019
…bout the finished partitions

## What changes were proposed in this pull request?

This is an optional solution for apache#22806 .

apache#21131 firstly implement that a previous successful completed task from zombie TaskSetManager could also succeed the active TaskSetManager, which based on an assumption that an active TaskSetManager always exists for that stage when this happen.  But that's not always true as an active TaskSetManager may haven't been created when a previous task succeed, and this is the reason why apache#22806 hit the issue.

This pr extends apache#21131 's behavior by adding `stageIdToFinishedPartitions` into TaskSchedulerImpl, which recording the finished partition whenever a task(from zombie or active) succeed. Thus, a later created active TaskSetManager could also learn about the finished partition by looking into `stageIdToFinishedPartitions ` and won't launch any duplicate tasks.

## How was this patch tested?

Add.

Closes apache#23871 from Ngone51/dev-23433-25250.

Lead-authored-by: wuyi <[email protected]>
Co-authored-by: Ngone51 <[email protected]>
Signed-off-by: Imran Rashid <[email protected]>
(cherry picked from commit e5c6143)
Signed-off-by: Imran Rashid <[email protected]>
kai-chi pushed a commit to kai-chi/spark that referenced this pull request Aug 1, 2019
…a stage

## What changes were proposed in this pull request?

This is another attempt to fix the more-than-one-active-task-set-managers bug.

apache#17208 is the first attempt. It marks the TSM as zombie before sending a task completion event to DAGScheduler. This is necessary, because when the DAGScheduler gets the task completion event, and it's for the last partition, then the stage is finished. However, if it's a shuffle stage and it has missing map outputs, DAGScheduler will resubmit it(see the [code](https://github.com/apache/spark/blob/v2.4.0/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala#L1416-L1422)) and create a new TSM for this stage. This leads to more than one active TSM of a stage and fail.

This fix has a hole: Let's say a stage has 10 partitions and 2 task set managers: TSM1(zombie) and TSM2(active). TSM1 has a running task for partition 10 and it completes. TSM2 finishes tasks for partitions 1-9, and thinks he is still active because he hasn't finished partition 10 yet. However, DAGScheduler gets task completion events for all the 10 partitions and thinks the stage is finished. Then the same problem occurs: DAGScheduler may resubmit the stage and cause more than one actice TSM error.

apache#21131 fixed this hole by notifying all the task set managers when a task finishes. For the above case, TSM2 will know that partition 10 is already completed, so he can mark himself as zombie after partitions 1-9 are completed.

However, apache#21131 still has a hole: TSM2 may be created after the task from TSM1 is completed. Then TSM2 can't get notified about the task completion, and leads to the more than one active TSM error.

apache#22806 and apache#23871 are created to fix this hole. However the fix is complicated and there are still ongoing discussions.

This PR proposes a simple fix, which can be easy to backport: mark all existing task set managers as zombie when trying to create a new task set manager.

After this PR, apache#21131 is still necessary, to avoid launching unnecessary tasks and fix [SPARK-25250](https://issues.apache.org/jira/browse/SPARK-25250 ). apache#22806 and apache#23871 are its followups to fix the hole.

## How was this patch tested?

existing tests.

Closes apache#23927 from cloud-fan/scheduler.

Authored-by: Wenchen Fan <[email protected]>
Signed-off-by: Imran Rashid <[email protected]>
(cherry picked from commit cb20fbc)
Signed-off-by: Imran Rashid <[email protected]>
sumwale pushed a commit to TIBCOSoftware/snappy-spark that referenced this pull request Jun 27, 2021
…a stage

## What changes were proposed in this pull request?

This is another attempt to fix the more-than-one-active-task-set-managers bug.

apache#17208 is the first attempt. It marks the TSM as zombie before sending a task completion event to DAGScheduler. This is necessary, because when the DAGScheduler gets the task completion event, and it's for the last partition, then the stage is finished. However, if it's a shuffle stage and it has missing map outputs, DAGScheduler will resubmit it(see the [code](https://github.com/apache/spark/blob/v2.4.0/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala#L1416-L1422)) and create a new TSM for this stage. This leads to more than one active TSM of a stage and fail.

This fix has a hole: Let's say a stage has 10 partitions and 2 task set managers: TSM1(zombie) and TSM2(active). TSM1 has a running task for partition 10 and it completes. TSM2 finishes tasks for partitions 1-9, and thinks he is still active because he hasn't finished partition 10 yet. However, DAGScheduler gets task completion events for all the 10 partitions and thinks the stage is finished. Then the same problem occurs: DAGScheduler may resubmit the stage and cause more than one actice TSM error.

apache#21131 fixed this hole by notifying all the task set managers when a task finishes. For the above case, TSM2 will know that partition 10 is already completed, so he can mark himself as zombie after partitions 1-9 are completed.

However, apache#21131 still has a hole: TSM2 may be created after the task from TSM1 is completed. Then TSM2 can't get notified about the task completion, and leads to the more than one active TSM error.

apache#22806 and apache#23871 are created to fix this hole. However the fix is complicated and there are still ongoing discussions.

This PR proposes a simple fix, which can be easy to backport: mark all existing task set managers as zombie when trying to create a new task set manager.

After this PR, apache#21131 is still necessary, to avoid launching unnecessary tasks and fix [SPARK-25250](https://issues.apache.org/jira/browse/SPARK-25250 ). apache#22806 and apache#23871 are its followups to fix the hole.

## How was this patch tested?

existing tests.

Closes apache#23927 from cloud-fan/scheduler.

Authored-by: Wenchen Fan <[email protected]>
Signed-off-by: Imran Rashid <[email protected]>
(cherry picked from commit cb20fbc)
Signed-off-by: Imran Rashid <[email protected]>
otterc pushed a commit to linkedin/spark that referenced this pull request Mar 22, 2023
…eption many times

Ref: LIHADOOP-53705

https://issues.apache.org/jira/browse/SPARK-25250 reports a bug that, a task which is failed with `CommitDeniedException` gets retried many times.

This can happen when a stage has 2 task set managers, one is zombie, one is active. A task from the zombie TSM completes, and commits to a central coordinator(assuming it's a file writing task). Then the corresponding task from the active TSM will fail with `CommitDeniedException`. `CommitDeniedException.countTowardsTaskFailures` is false, so the active TSM will keep retrying this task, until the job finishes. This wastes resource a lot.

However, apache#23871 has a bug and was reverted in apache#24359. With hindsight, apache#23781 is fragile because we need to sync the states between `DAGScheduler` and `TaskScheduler`, about which partitions are completed.

This PR proposes a new fix:
1. When `DAGScheduler` gets a task success event from an earlier attempt, notify the `TaskSchedulerImpl` about it
2. When `TaskSchedulerImpl` knows a partition is already completed, ask the active `TaskSetManager` to mark the corresponding task as finished, if the task is not finished yet.

This fix covers the corner case, because:
1. If `DAGScheduler` gets the task completion event from zombie TSM before submitting the new stage attempt, then `DAGScheduler` knows that this partition is completed, and it will exclude this partition when creating task set for the new stage attempt. See `DAGScheduler.submitMissingTasks`
2. If `DAGScheduler` gets the task completion event from zombie TSM after submitting the new stage attempt, then the active TSM is already created.

Compared to the previous fix, the message loop becomes longer, so it's likely that, the active task set manager has already retried the task multiple times. But this failure window won't be too big, and we want to avoid the worse case that retries the task many times until the job finishes. So this solution is acceptable.

a new test case.

Closes apache#24375 from cloud-fan/fix2.

Authored-by: Wenchen Fan <[email protected]>
Signed-off-by: Wenchen Fan <[email protected]>

RB=2113301
BUG=LIHADOOP-53705
G=spark-reviewers
R=chsingh
A=chsingh
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

6 participants