-
Notifications
You must be signed in to change notification settings - Fork 28.5k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[SPARK-23433][SPARK-25250] [CORE] Later created TaskSet should learn about the finished partitions #23871
Conversation
ping @squito @cloud-fan @pgandhi999 please have a look at this. |
@Ngone51 I understand that you had a proposal and we were actively discussing on various solutions in the PR #22806 , but however, I have been working on that PR tirelessly for a few months and we still have an ongoing discussion going on there. Any specific reasons as to why did you create your own PR for the same issue? WDYT @squito @cloud-fan ? |
I saw the quite different understanding when I reviewed your updated pr corresponding to my proposal. So, I think it may be better to implement it by myself. And I just want to make the changes clean and easy for people(not only the guys already involved) to review, since it just bases on #21131 and no need to mix in other changes. I'm sorry if you do care about this and I'm fine for crediting this to you. No offence. |
ok keeping that aside for now, I was curious to know how did you test the changes? I have tested my changes on a really large dataset(10 TB approx.) where the issue was more easily reproducible and prominent in the event of fetch failures. If you have tested both the issue as well the fix with your change, posting screenshots from the UI might help here. |
ok to test |
@pgandhi999 I know this can be frustrating, but you shouldn't take offense at this. you both have been putting a lot of effort in on this and have contributed a ton. regardless of whose change is actually merged, the discussion and thought from everyone has driven this forward. sometimes when an idea isn't properly understood in normal english, its just easier to explain an idea by showing it with the actual code. This is one of the thornier problems I've seen, as you've seen I've made more than one mistake on it myself. |
Jenkins, add to whitelist |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
overall I think this approach makes sense, but lets get more input from others. I did leave some comments on the code, but nothing big.
I will be gone for a week, so I will probably not make any more comments on this for a while.
@@ -208,7 +211,7 @@ private[spark] class TaskSchedulerImpl( | |||
taskSetsByStageIdAndAttempt.getOrElseUpdate(stage, new HashMap[Int, TaskSetManager]) | |||
stageTaskSets(taskSet.stageAttemptId) = manager | |||
val conflictingTaskSet = stageTaskSets.exists { case (_, ts) => | |||
ts.taskSet != taskSet && !ts.isZombie | |||
ts.taskSet != manager.taskSet && !ts.isZombie |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why this change? taskSet
should be the same as manager.taskSet
, as manager
is created from taskSet
with val manager = createTaskSetManager(taskSet, maxTaskFailures)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I have the same question, why use manager.taskSet
here?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Previously, I filter the already completed tasks in createTaskSetManager()
and creates a new TaskSet by filtered tasks. So, the submitted TaskSet form DAGScheduler is not the same with TaskSet in TaskSetManager. But it failed plenty of unit tests since those tests would keep the original submitted TaskSet for some operations.
And now I have moved the filter
logic into the TaskSetManager.
val finishedPartitions = | ||
stageIdToFinishedPartitions.getOrElseUpdate(taskSet.stageId, new HashSet[Int]) | ||
// filter the task which has been finished by previous attempts | ||
val tasks = taskSet.tasks.filterNot{ t => finishedPartitions(t.partitionId) } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'd change this comment a bit, to explain why this is necessary, even briefly. Something like
"The DAGScheduler does not always know all the tasks that have been completed by other tasksets when completing a stage, so we do an extra filter here, while holding the TaskSchedulerImpl lock. See SPARK-25250 and markPartitionCompletedInAllTaskSets()`
@@ -1206,6 +1206,67 @@ class TaskSchedulerImplSuite extends SparkFunSuite with LocalSparkContext with B | |||
} | |||
} | |||
|
|||
test("successful tasks from previous attempts could be learnt by later active taskset") { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this test is good, but could it instead be folded into "Completions in zombie tasksets update status of non-zombie taskset"? that would make it easier to maintain (if you think there is a reason it needs to be separate, feel free to push back)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good idea!
assert(taskScheduler.stageIdToFinishedPartitions(0).contains(1)) | ||
|
||
// submit a new taskset with 10 tasks after someone previous task attempt succeed | ||
val attempt1 = FakeTask.createTaskSet(10, stageId = 0, stageAttemptId = 1) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
also be sure to expand the comment here, about why this is an important test case -- it simulates the race between the DAGScheduler event loop and the task-result-getter threads, where a taskset gets submitted with already complete tasks.
* partitions for that stage here and exclude the already finished tasks when we creating active | ||
* TaskSetManagers later by looking into stageIdToFinishedPartitions. Thus, active TaskSetManager | ||
* could be always notified about the finished partitions whether it has been created or not at | ||
* the time we call this method. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This comment is good, but a little too wordy. I'd cut it down to something like:
There is also the possibility that the DAGScheduler submits another taskset at the same time as we're marking a task completed here -- that taskset would have a task for a partition that was already completed. We maintain the set of finished partitions in stageIdToFinishedPartitions, protected by this, so we can filter those tasks when the taskset is submitted. See SPARK-25250 for more details.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
And can you also add a comment that this should only be called with a lock on this
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
updated.
@@ -96,6 +96,9 @@ private[spark] class TaskSchedulerImpl( | |||
private[scheduler] val taskIdToTaskSetManager = new ConcurrentHashMap[Long, TaskSetManager] | |||
val taskIdToExecutorId = new HashMap[Long, String] | |||
|
|||
// Protected by `this` | |||
private[scheduler] val stageIdToFinishedPartitions = new HashMap[Int, HashSet[Int]] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
you can use a HashMap[Int, BitSet]
here to save some memory.
@@ -297,6 +306,7 @@ private[spark] class TaskSchedulerImpl( | |||
taskSetsForStage -= manager.taskSet.stageAttemptId | |||
if (taskSetsForStage.isEmpty) { | |||
taskSetsByStageIdAndAttempt -= manager.taskSet.stageId | |||
stageIdToFinishedPartitions -= manager.taskSet.stageId |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should we also add this code when calling killTasks
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I didn't find method killTasks
in TaskSchedulerImpl, and for some similar func, e.g. cancelTasks
, I think it's unnecessary.
I get where you're coming from @pgandhi999 . I'd read this as part of one larger discussion, and indeed it happens sometimes that the easiest way to consider an alternative is to take a look at it concretely. I don't read this as a hijack or anything. I haven't read the long discussion on the other PR but can see it's a tricky change and you have a couple people with deep experience working on it with you to get it right. It may be that you cross-pollinate ideas between the two PRs. It doesn't so much matter which one gets merged as long as it's a consensus and the best we can come up with. If the issue is credit then obviously anyone who reads these knows you've driven a lot of the discussion on the important change and this is based on your change. |
Test build #102671 has finished for PR 23871 at commit
|
Test build #102711 has finished for PR 23871 at commit
|
@pgandhi999 Thanks for your understanding and helping test this, appreciate a lot. |
@pgandhi999 @squito Thank you for review, have updated. |
Test build #102712 has finished for PR 23871 at commit
|
// holding the TaskSchedulerImpl lock. | ||
// See SPARK-25250 and markPartitionCompletedInAllTaskSets()` | ||
sched.stageIdToFinishedPartitions | ||
.getOrElseUpdate(taskSet.stageId, new BitSet) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
we can avoid creating empty bit set
sched.stageIdToFinishedPartitions.get(taskSet.stageId).foreach {
finishedPartitions => finishedPartitions.foreach(markPartitionCompleted(_, None))
}
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
good idea!
*/ | ||
private[scheduler] def markPartitionCompletedInAllTaskSets( | ||
stageId: Int, | ||
partitionId: Int, | ||
taskInfo: TaskInfo) = { | ||
taskInfo: Option[TaskInfo]) = { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
seems no one calls this method with taskInfo = None
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
good catch!
can we address #21131 (comment) as well? |
I revisit the discussion and code, I think maybe we could not remove Stage 0 has 5 partitions and there's an active TSM 0.1 and a zombie TSM 0.0 in TaskScheduler currently. TSM 0.1 has finished partition [0, 1, 2, 3] and no running tasks(may be blocked by resource offer or delay scheduling) at the moment a succeed task from TSM 0.0 finished. Thus, we have WDYT ? @cloud-fan |
makes sense, since |
Test build #102794 has finished for PR 23871 at commit
|
// holding the TaskSchedulerImpl lock. | ||
// See SPARK-25250 and markPartitionCompletedInAllTaskSets()` | ||
sched.stageIdToFinishedPartitions.get(taskSet.stageId).foreach { | ||
finishedPartitions => finishedPartitions.foreach(markPartitionCompleted(_, None)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
How do we guarantee that there is no race condition between the creation of TaskSetManager
and the updating of TaskSchedulerImpl. stageIdToFinishedPartitions
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
we always hold lock on TaskSchedulerImpl
while accessing stageIdToFinishedPartitions
:
-
TaskSchedulerImpl.submitTasks
-
TaskSchedulerImpl.handleSuccessfulTask
-
TaskSchedulerImpl.taskSetFinished
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ah i see, TaskSetManager
is always created at * TaskSchedulerImpl.submitTasks
LGTM |
Test build #102814 has finished for PR 23871 at commit
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Just some very minor style stuff, otherwise lgtm
// been completed by other tasksets when completing a stage, so we mark | ||
// those tasks as finished here to avoid launching duplicate tasks, while | ||
// holding the TaskSchedulerImpl lock. | ||
// See SPARK-25250 and markPartitionCompletedInAllTaskSets()` |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
super nit: run-on sentence and stray '`'.
@@ -797,16 +801,20 @@ private[spark] class TaskSetManager( | |||
maybeFinishTaskSet() | |||
} | |||
|
|||
private[scheduler] def markPartitionCompleted(partitionId: Int, taskInfo: TaskInfo): Unit = { | |||
private[scheduler] def markPartitionCompleted(partitionId: Int, taskInfo: Option[TaskInfo]) | |||
: Unit = { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
super nit: I'm pretty sure that even if just the return type goes over the line, our style is to switch to multi-line definition
@@ -1133,30 +1133,51 @@ class TaskSchedulerImplSuite extends SparkFunSuite with LocalSparkContext with B | |||
assert(tsm.runningTasks === 9) | |||
tsm | |||
} | |||
// we've now got 2 zombie attempts, each with 9 tasks still active but zero active attempt | |||
// in taskScheduler. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
i don't understand "zero active attempt in taskScheduler", can you explain what you mean?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hmm, I mean "there's no active attempt in taskScheduler", updated.
// in taskScheduler. | ||
|
||
// finish partition 1,2 by completing the tasks before a new attempt for the same stage submit. | ||
// And it's possible since the behaviour of submitting new attempt and handling successful task |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is possible since ...
val finalAttempt = FakeTask.createTaskSet(10, stageAttemptId = 2) | ||
taskScheduler.submitTasks(finalAttempt) | ||
val finalTsm = taskScheduler.taskSetManagerForAttempt(0, 2).get | ||
// Though, finalTsm gets submitted after some tasks succeeds, but it could also know about the | ||
// finished partition by looking into `stageIdToFinishedPartitions` when it is being created, | ||
// so that it won't launch any duplicate tasks later. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'd reword this to
Though finalTSM gets submitted with 10 tasks, the call to taskScheduler.submitTasks
should realize that 2 tasks have already completed, and mark them appropriately, so it won't launch any duplicate tasks later (SPARK-25250).
@@ -124,7 +124,8 @@ class FakeTaskScheduler(sc: SparkContext, liveExecutors: (String, String)* /* ex | |||
} | |||
} | |||
|
|||
override def taskSetFinished(manager: TaskSetManager): Unit = finishedManagers += manager | |||
override def taskSetFinished(manager: TaskSetManager) | |||
: Unit = finishedManagers += manager |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
if this really needs to be multiline (though I dont' think it does), be sure to use multi-line def and wrap the body in braces
@squito Thanks for your comments, have updated. |
Test build #103075 has finished for PR 23871 at commit
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
lgtm
…bout the finished partitions ## What changes were proposed in this pull request? This is an optional solution for #22806 . #21131 firstly implement that a previous successful completed task from zombie TaskSetManager could also succeed the active TaskSetManager, which based on an assumption that an active TaskSetManager always exists for that stage when this happen. But that's not always true as an active TaskSetManager may haven't been created when a previous task succeed, and this is the reason why #22806 hit the issue. This pr extends #21131 's behavior by adding `stageIdToFinishedPartitions` into TaskSchedulerImpl, which recording the finished partition whenever a task(from zombie or active) succeed. Thus, a later created active TaskSetManager could also learn about the finished partition by looking into `stageIdToFinishedPartitions ` and won't launch any duplicate tasks. ## How was this patch tested? Add. Closes #23871 from Ngone51/dev-23433-25250. Lead-authored-by: wuyi <[email protected]> Co-authored-by: Ngone51 <[email protected]> Signed-off-by: Imran Rashid <[email protected]> (cherry picked from commit e5c6143) Signed-off-by: Imran Rashid <[email protected]>
merged to master & 2.4. @Ngone51 there was a merge conflict against branch 2.3, would you like to open another PR for that branch? Thanks for all the help from everyone on this. @pgandhi999 as a way to split credit, I assigned the jira to you. definitely appreciate all the work you put in on it |
…a stage ## What changes were proposed in this pull request? This is another attempt to fix the more-than-one-active-task-set-managers bug. #17208 is the first attempt. It marks the TSM as zombie before sending a task completion event to DAGScheduler. This is necessary, because when the DAGScheduler gets the task completion event, and it's for the last partition, then the stage is finished. However, if it's a shuffle stage and it has missing map outputs, DAGScheduler will resubmit it(see the [code](https://github.com/apache/spark/blob/v2.4.0/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala#L1416-L1422)) and create a new TSM for this stage. This leads to more than one active TSM of a stage and fail. This fix has a hole: Let's say a stage has 10 partitions and 2 task set managers: TSM1(zombie) and TSM2(active). TSM1 has a running task for partition 10 and it completes. TSM2 finishes tasks for partitions 1-9, and thinks he is still active because he hasn't finished partition 10 yet. However, DAGScheduler gets task completion events for all the 10 partitions and thinks the stage is finished. Then the same problem occurs: DAGScheduler may resubmit the stage and cause more than one actice TSM error. #21131 fixed this hole by notifying all the task set managers when a task finishes. For the above case, TSM2 will know that partition 10 is already completed, so he can mark himself as zombie after partitions 1-9 are completed. However, #21131 still has a hole: TSM2 may be created after the task from TSM1 is completed. Then TSM2 can't get notified about the task completion, and leads to the more than one active TSM error. #22806 and #23871 are created to fix this hole. However the fix is complicated and there are still ongoing discussions. This PR proposes a simple fix, which can be easy to backport: mark all existing task set managers as zombie when trying to create a new task set manager. After this PR, #21131 is still necessary, to avoid launching unnecessary tasks and fix [SPARK-25250](https://issues.apache.org/jira/browse/SPARK-25250 ). #22806 and #23871 are its followups to fix the hole. ## How was this patch tested? existing tests. Closes #23927 from cloud-fan/scheduler. Authored-by: Wenchen Fan <[email protected]> Signed-off-by: Imran Rashid <[email protected]> (cherry picked from commit cb20fbc) Signed-off-by: Imran Rashid <[email protected]>
…a stage ## What changes were proposed in this pull request? This is another attempt to fix the more-than-one-active-task-set-managers bug. #17208 is the first attempt. It marks the TSM as zombie before sending a task completion event to DAGScheduler. This is necessary, because when the DAGScheduler gets the task completion event, and it's for the last partition, then the stage is finished. However, if it's a shuffle stage and it has missing map outputs, DAGScheduler will resubmit it(see the [code](https://github.com/apache/spark/blob/v2.4.0/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala#L1416-L1422)) and create a new TSM for this stage. This leads to more than one active TSM of a stage and fail. This fix has a hole: Let's say a stage has 10 partitions and 2 task set managers: TSM1(zombie) and TSM2(active). TSM1 has a running task for partition 10 and it completes. TSM2 finishes tasks for partitions 1-9, and thinks he is still active because he hasn't finished partition 10 yet. However, DAGScheduler gets task completion events for all the 10 partitions and thinks the stage is finished. Then the same problem occurs: DAGScheduler may resubmit the stage and cause more than one actice TSM error. #21131 fixed this hole by notifying all the task set managers when a task finishes. For the above case, TSM2 will know that partition 10 is already completed, so he can mark himself as zombie after partitions 1-9 are completed. However, #21131 still has a hole: TSM2 may be created after the task from TSM1 is completed. Then TSM2 can't get notified about the task completion, and leads to the more than one active TSM error. #22806 and #23871 are created to fix this hole. However the fix is complicated and there are still ongoing discussions. This PR proposes a simple fix, which can be easy to backport: mark all existing task set managers as zombie when trying to create a new task set manager. After this PR, #21131 is still necessary, to avoid launching unnecessary tasks and fix [SPARK-25250](https://issues.apache.org/jira/browse/SPARK-25250 ). #22806 and #23871 are its followups to fix the hole. ## How was this patch tested? existing tests. Closes #23927 from cloud-fan/scheduler. Authored-by: Wenchen Fan <[email protected]> Signed-off-by: Imran Rashid <[email protected]> (cherry picked from commit cb20fbc) Signed-off-by: Imran Rashid <[email protected]>
…a stage ## What changes were proposed in this pull request? This is another attempt to fix the more-than-one-active-task-set-managers bug. #17208 is the first attempt. It marks the TSM as zombie before sending a task completion event to DAGScheduler. This is necessary, because when the DAGScheduler gets the task completion event, and it's for the last partition, then the stage is finished. However, if it's a shuffle stage and it has missing map outputs, DAGScheduler will resubmit it(see the [code](https://github.com/apache/spark/blob/v2.4.0/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala#L1416-L1422)) and create a new TSM for this stage. This leads to more than one active TSM of a stage and fail. This fix has a hole: Let's say a stage has 10 partitions and 2 task set managers: TSM1(zombie) and TSM2(active). TSM1 has a running task for partition 10 and it completes. TSM2 finishes tasks for partitions 1-9, and thinks he is still active because he hasn't finished partition 10 yet. However, DAGScheduler gets task completion events for all the 10 partitions and thinks the stage is finished. Then the same problem occurs: DAGScheduler may resubmit the stage and cause more than one actice TSM error. #21131 fixed this hole by notifying all the task set managers when a task finishes. For the above case, TSM2 will know that partition 10 is already completed, so he can mark himself as zombie after partitions 1-9 are completed. However, #21131 still has a hole: TSM2 may be created after the task from TSM1 is completed. Then TSM2 can't get notified about the task completion, and leads to the more than one active TSM error. #22806 and #23871 are created to fix this hole. However the fix is complicated and there are still ongoing discussions. This PR proposes a simple fix, which can be easy to backport: mark all existing task set managers as zombie when trying to create a new task set manager. After this PR, #21131 is still necessary, to avoid launching unnecessary tasks and fix [SPARK-25250](https://issues.apache.org/jira/browse/SPARK-25250 ). #22806 and #23871 are its followups to fix the hole. ## How was this patch tested? existing tests. Closes #23927 from cloud-fan/scheduler. Authored-by: Wenchen Fan <[email protected]> Signed-off-by: Imran Rashid <[email protected]>
Thank you @squito . Once again I appreciate all the work that you guys do and happy to know that we all learned something as a whole by the end of it. |
Thank you @pgandhi999 @squito @cloud-fan @srowen . I'll submit a pr to help backport to 2.3 later, @squito . |
…eption many times ## What changes were proposed in this pull request? https://issues.apache.org/jira/browse/SPARK-25250 reports a bug that, a task which is failed with `CommitDeniedException` gets retried many times. This can happen when a stage has 2 task set managers, one is zombie, one is active. A task from the zombie TSM completes, and commits to a central coordinator(assuming it's a file writing task). Then the corresponding task from the active TSM will fail with `CommitDeniedException`. `CommitDeniedException.countTowardsTaskFailures` is false, so the active TSM will keep retrying this task, until the job finishes. This wastes resource a lot. #21131 firstly implements that a previous successful completed task from zombie `TaskSetManager` could mark the task of the same partition completed in the active `TaskSetManager`. Later #23871 improves the implementation to cover a corner case that, an active `TaskSetManager` hasn't been created when a previous task succeed. However, #23871 has a bug and was reverted in #24359. With hindsight, #23781 is fragile because we need to sync the states between `DAGScheduler` and `TaskScheduler`, about which partitions are completed. This PR proposes a new fix: 1. When `DAGScheduler` gets a task success event from an earlier attempt, notify the `TaskSchedulerImpl` about it 2. When `TaskSchedulerImpl` knows a partition is already completed, ask the active `TaskSetManager` to mark the corresponding task as finished, if the task is not finished yet. This fix covers the corner case, because: 1. If `DAGScheduler` gets the task completion event from zombie TSM before submitting the new stage attempt, then `DAGScheduler` knows that this partition is completed, and it will exclude this partition when creating task set for the new stage attempt. See `DAGScheduler.submitMissingTasks` 2. If `DAGScheduler` gets the task completion event from zombie TSM after submitting the new stage attempt, then the active TSM is already created. Compared to the previous fix, the message loop becomes longer, so it's likely that, the active task set manager has already retried the task multiple times. But this failure window won't be too big, and we want to avoid the worse case that retries the task many times until the job finishes. So this solution is acceptable. ## How was this patch tested? a new test case. Closes #24375 from cloud-fan/fix2. Authored-by: Wenchen Fan <[email protected]> Signed-off-by: Wenchen Fan <[email protected]>
…bout the finished partitions ## What changes were proposed in this pull request? This is an optional solution for apache#22806 . apache#21131 firstly implement that a previous successful completed task from zombie TaskSetManager could also succeed the active TaskSetManager, which based on an assumption that an active TaskSetManager always exists for that stage when this happen. But that's not always true as an active TaskSetManager may haven't been created when a previous task succeed, and this is the reason why apache#22806 hit the issue. This pr extends apache#21131 's behavior by adding `stageIdToFinishedPartitions` into TaskSchedulerImpl, which recording the finished partition whenever a task(from zombie or active) succeed. Thus, a later created active TaskSetManager could also learn about the finished partition by looking into `stageIdToFinishedPartitions ` and won't launch any duplicate tasks. ## How was this patch tested? Add. Closes apache#23871 from Ngone51/dev-23433-25250. Lead-authored-by: wuyi <[email protected]> Co-authored-by: Ngone51 <[email protected]> Signed-off-by: Imran Rashid <[email protected]> (cherry picked from commit e5c6143) Signed-off-by: Imran Rashid <[email protected]>
…a stage ## What changes were proposed in this pull request? This is another attempt to fix the more-than-one-active-task-set-managers bug. apache#17208 is the first attempt. It marks the TSM as zombie before sending a task completion event to DAGScheduler. This is necessary, because when the DAGScheduler gets the task completion event, and it's for the last partition, then the stage is finished. However, if it's a shuffle stage and it has missing map outputs, DAGScheduler will resubmit it(see the [code](https://github.com/apache/spark/blob/v2.4.0/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala#L1416-L1422)) and create a new TSM for this stage. This leads to more than one active TSM of a stage and fail. This fix has a hole: Let's say a stage has 10 partitions and 2 task set managers: TSM1(zombie) and TSM2(active). TSM1 has a running task for partition 10 and it completes. TSM2 finishes tasks for partitions 1-9, and thinks he is still active because he hasn't finished partition 10 yet. However, DAGScheduler gets task completion events for all the 10 partitions and thinks the stage is finished. Then the same problem occurs: DAGScheduler may resubmit the stage and cause more than one actice TSM error. apache#21131 fixed this hole by notifying all the task set managers when a task finishes. For the above case, TSM2 will know that partition 10 is already completed, so he can mark himself as zombie after partitions 1-9 are completed. However, apache#21131 still has a hole: TSM2 may be created after the task from TSM1 is completed. Then TSM2 can't get notified about the task completion, and leads to the more than one active TSM error. apache#22806 and apache#23871 are created to fix this hole. However the fix is complicated and there are still ongoing discussions. This PR proposes a simple fix, which can be easy to backport: mark all existing task set managers as zombie when trying to create a new task set manager. After this PR, apache#21131 is still necessary, to avoid launching unnecessary tasks and fix [SPARK-25250](https://issues.apache.org/jira/browse/SPARK-25250 ). apache#22806 and apache#23871 are its followups to fix the hole. ## How was this patch tested? existing tests. Closes apache#23927 from cloud-fan/scheduler. Authored-by: Wenchen Fan <[email protected]> Signed-off-by: Imran Rashid <[email protected]> (cherry picked from commit cb20fbc) Signed-off-by: Imran Rashid <[email protected]>
…bout the finished partitions ## What changes were proposed in this pull request? This is an optional solution for apache#22806 . apache#21131 firstly implement that a previous successful completed task from zombie TaskSetManager could also succeed the active TaskSetManager, which based on an assumption that an active TaskSetManager always exists for that stage when this happen. But that's not always true as an active TaskSetManager may haven't been created when a previous task succeed, and this is the reason why apache#22806 hit the issue. This pr extends apache#21131 's behavior by adding `stageIdToFinishedPartitions` into TaskSchedulerImpl, which recording the finished partition whenever a task(from zombie or active) succeed. Thus, a later created active TaskSetManager could also learn about the finished partition by looking into `stageIdToFinishedPartitions ` and won't launch any duplicate tasks. ## How was this patch tested? Add. Closes apache#23871 from Ngone51/dev-23433-25250. Lead-authored-by: wuyi <[email protected]> Co-authored-by: Ngone51 <[email protected]> Signed-off-by: Imran Rashid <[email protected]> (cherry picked from commit e5c6143) Signed-off-by: Imran Rashid <[email protected]>
…a stage ## What changes were proposed in this pull request? This is another attempt to fix the more-than-one-active-task-set-managers bug. apache#17208 is the first attempt. It marks the TSM as zombie before sending a task completion event to DAGScheduler. This is necessary, because when the DAGScheduler gets the task completion event, and it's for the last partition, then the stage is finished. However, if it's a shuffle stage and it has missing map outputs, DAGScheduler will resubmit it(see the [code](https://github.com/apache/spark/blob/v2.4.0/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala#L1416-L1422)) and create a new TSM for this stage. This leads to more than one active TSM of a stage and fail. This fix has a hole: Let's say a stage has 10 partitions and 2 task set managers: TSM1(zombie) and TSM2(active). TSM1 has a running task for partition 10 and it completes. TSM2 finishes tasks for partitions 1-9, and thinks he is still active because he hasn't finished partition 10 yet. However, DAGScheduler gets task completion events for all the 10 partitions and thinks the stage is finished. Then the same problem occurs: DAGScheduler may resubmit the stage and cause more than one actice TSM error. apache#21131 fixed this hole by notifying all the task set managers when a task finishes. For the above case, TSM2 will know that partition 10 is already completed, so he can mark himself as zombie after partitions 1-9 are completed. However, apache#21131 still has a hole: TSM2 may be created after the task from TSM1 is completed. Then TSM2 can't get notified about the task completion, and leads to the more than one active TSM error. apache#22806 and apache#23871 are created to fix this hole. However the fix is complicated and there are still ongoing discussions. This PR proposes a simple fix, which can be easy to backport: mark all existing task set managers as zombie when trying to create a new task set manager. After this PR, apache#21131 is still necessary, to avoid launching unnecessary tasks and fix [SPARK-25250](https://issues.apache.org/jira/browse/SPARK-25250 ). apache#22806 and apache#23871 are its followups to fix the hole. ## How was this patch tested? existing tests. Closes apache#23927 from cloud-fan/scheduler. Authored-by: Wenchen Fan <[email protected]> Signed-off-by: Imran Rashid <[email protected]> (cherry picked from commit cb20fbc) Signed-off-by: Imran Rashid <[email protected]>
…bout the finished partitions ## What changes were proposed in this pull request? This is an optional solution for apache#22806 . apache#21131 firstly implement that a previous successful completed task from zombie TaskSetManager could also succeed the active TaskSetManager, which based on an assumption that an active TaskSetManager always exists for that stage when this happen. But that's not always true as an active TaskSetManager may haven't been created when a previous task succeed, and this is the reason why apache#22806 hit the issue. This pr extends apache#21131 's behavior by adding `stageIdToFinishedPartitions` into TaskSchedulerImpl, which recording the finished partition whenever a task(from zombie or active) succeed. Thus, a later created active TaskSetManager could also learn about the finished partition by looking into `stageIdToFinishedPartitions ` and won't launch any duplicate tasks. ## How was this patch tested? Add. Closes apache#23871 from Ngone51/dev-23433-25250. Lead-authored-by: wuyi <[email protected]> Co-authored-by: Ngone51 <[email protected]> Signed-off-by: Imran Rashid <[email protected]> (cherry picked from commit e5c6143) Signed-off-by: Imran Rashid <[email protected]>
…a stage ## What changes were proposed in this pull request? This is another attempt to fix the more-than-one-active-task-set-managers bug. apache#17208 is the first attempt. It marks the TSM as zombie before sending a task completion event to DAGScheduler. This is necessary, because when the DAGScheduler gets the task completion event, and it's for the last partition, then the stage is finished. However, if it's a shuffle stage and it has missing map outputs, DAGScheduler will resubmit it(see the [code](https://github.com/apache/spark/blob/v2.4.0/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala#L1416-L1422)) and create a new TSM for this stage. This leads to more than one active TSM of a stage and fail. This fix has a hole: Let's say a stage has 10 partitions and 2 task set managers: TSM1(zombie) and TSM2(active). TSM1 has a running task for partition 10 and it completes. TSM2 finishes tasks for partitions 1-9, and thinks he is still active because he hasn't finished partition 10 yet. However, DAGScheduler gets task completion events for all the 10 partitions and thinks the stage is finished. Then the same problem occurs: DAGScheduler may resubmit the stage and cause more than one actice TSM error. apache#21131 fixed this hole by notifying all the task set managers when a task finishes. For the above case, TSM2 will know that partition 10 is already completed, so he can mark himself as zombie after partitions 1-9 are completed. However, apache#21131 still has a hole: TSM2 may be created after the task from TSM1 is completed. Then TSM2 can't get notified about the task completion, and leads to the more than one active TSM error. apache#22806 and apache#23871 are created to fix this hole. However the fix is complicated and there are still ongoing discussions. This PR proposes a simple fix, which can be easy to backport: mark all existing task set managers as zombie when trying to create a new task set manager. After this PR, apache#21131 is still necessary, to avoid launching unnecessary tasks and fix [SPARK-25250](https://issues.apache.org/jira/browse/SPARK-25250 ). apache#22806 and apache#23871 are its followups to fix the hole. ## How was this patch tested? existing tests. Closes apache#23927 from cloud-fan/scheduler. Authored-by: Wenchen Fan <[email protected]> Signed-off-by: Imran Rashid <[email protected]> (cherry picked from commit cb20fbc) Signed-off-by: Imran Rashid <[email protected]>
…a stage ## What changes were proposed in this pull request? This is another attempt to fix the more-than-one-active-task-set-managers bug. apache#17208 is the first attempt. It marks the TSM as zombie before sending a task completion event to DAGScheduler. This is necessary, because when the DAGScheduler gets the task completion event, and it's for the last partition, then the stage is finished. However, if it's a shuffle stage and it has missing map outputs, DAGScheduler will resubmit it(see the [code](https://github.com/apache/spark/blob/v2.4.0/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala#L1416-L1422)) and create a new TSM for this stage. This leads to more than one active TSM of a stage and fail. This fix has a hole: Let's say a stage has 10 partitions and 2 task set managers: TSM1(zombie) and TSM2(active). TSM1 has a running task for partition 10 and it completes. TSM2 finishes tasks for partitions 1-9, and thinks he is still active because he hasn't finished partition 10 yet. However, DAGScheduler gets task completion events for all the 10 partitions and thinks the stage is finished. Then the same problem occurs: DAGScheduler may resubmit the stage and cause more than one actice TSM error. apache#21131 fixed this hole by notifying all the task set managers when a task finishes. For the above case, TSM2 will know that partition 10 is already completed, so he can mark himself as zombie after partitions 1-9 are completed. However, apache#21131 still has a hole: TSM2 may be created after the task from TSM1 is completed. Then TSM2 can't get notified about the task completion, and leads to the more than one active TSM error. apache#22806 and apache#23871 are created to fix this hole. However the fix is complicated and there are still ongoing discussions. This PR proposes a simple fix, which can be easy to backport: mark all existing task set managers as zombie when trying to create a new task set manager. After this PR, apache#21131 is still necessary, to avoid launching unnecessary tasks and fix [SPARK-25250](https://issues.apache.org/jira/browse/SPARK-25250 ). apache#22806 and apache#23871 are its followups to fix the hole. ## How was this patch tested? existing tests. Closes apache#23927 from cloud-fan/scheduler. Authored-by: Wenchen Fan <[email protected]> Signed-off-by: Imran Rashid <[email protected]> (cherry picked from commit cb20fbc) Signed-off-by: Imran Rashid <[email protected]>
…eption many times Ref: LIHADOOP-53705 https://issues.apache.org/jira/browse/SPARK-25250 reports a bug that, a task which is failed with `CommitDeniedException` gets retried many times. This can happen when a stage has 2 task set managers, one is zombie, one is active. A task from the zombie TSM completes, and commits to a central coordinator(assuming it's a file writing task). Then the corresponding task from the active TSM will fail with `CommitDeniedException`. `CommitDeniedException.countTowardsTaskFailures` is false, so the active TSM will keep retrying this task, until the job finishes. This wastes resource a lot. However, apache#23871 has a bug and was reverted in apache#24359. With hindsight, apache#23781 is fragile because we need to sync the states between `DAGScheduler` and `TaskScheduler`, about which partitions are completed. This PR proposes a new fix: 1. When `DAGScheduler` gets a task success event from an earlier attempt, notify the `TaskSchedulerImpl` about it 2. When `TaskSchedulerImpl` knows a partition is already completed, ask the active `TaskSetManager` to mark the corresponding task as finished, if the task is not finished yet. This fix covers the corner case, because: 1. If `DAGScheduler` gets the task completion event from zombie TSM before submitting the new stage attempt, then `DAGScheduler` knows that this partition is completed, and it will exclude this partition when creating task set for the new stage attempt. See `DAGScheduler.submitMissingTasks` 2. If `DAGScheduler` gets the task completion event from zombie TSM after submitting the new stage attempt, then the active TSM is already created. Compared to the previous fix, the message loop becomes longer, so it's likely that, the active task set manager has already retried the task multiple times. But this failure window won't be too big, and we want to avoid the worse case that retries the task many times until the job finishes. So this solution is acceptable. a new test case. Closes apache#24375 from cloud-fan/fix2. Authored-by: Wenchen Fan <[email protected]> Signed-off-by: Wenchen Fan <[email protected]> RB=2113301 BUG=LIHADOOP-53705 G=spark-reviewers R=chsingh A=chsingh
What changes were proposed in this pull request?
This is an optional solution for #22806 .
#21131 firstly implement that a previous successful completed task from zombie TaskSetManager could also succeed the active TaskSetManager, which based on an assumption that an active TaskSetManager always exists for that stage when this happen. But that's not always true as an active TaskSetManager may haven't been created when a previous task succeed, and this is the reason why #22806 hit the issue.
This pr extends #21131 's behavior by adding
stageIdToFinishedPartitions
into TaskSchedulerImpl, which recording the finished partition whenever a task(from zombie or active) succeed. Thus, a later created active TaskSetManager could also learn about the finished partition by looking intostageIdToFinishedPartitions
and won't launch any duplicate tasks.How was this patch tested?
Add.