-
Notifications
You must be signed in to change notification settings - Fork 28.5k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[SPARK-27474][CORE] avoid retrying a task failed with CommitDeniedException many times #24375
Conversation
cc @Ngone51 @pgandhi999 @squito @jiangxb1987 @zsxwing I think this is a more robust fix. |
*/ | ||
private[scheduler] def markPartitionCompletedInAllTaskSets( | ||
private[scheduler] def markPartitionCompleted( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Doesn't this method need to acquire synchronization on TaskSchedulerImpl
object?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
no it doesn't. It's called in the thread task-result-getter
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Correct me if I am wrong, but if I follow the call chain correctly, you are calling the method markPartitionCompleted
from enqueuePartitionCompletionNotification
, which is called from notifyPartitionCompletion
, and which gets invoked inside the DAG Scheduler event loop thread
. Am I missing something here?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
see https://github.com/apache/spark/pull/24375/files#diff-c69e9db8cf7981fd484d4665181695a1R159
enqueuePartitionCompletionNotification
calls markPartitionCompleted
within getTaskResultExecutor
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ahh I see it now, thanks.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
taskResultGetter is a multithreaded pool (default 4) so I think you still need extra protection here
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ah good catch! I should do synchronization here.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Just fyi, after adding the synchronized statement, this PR is almost similar to PR #22806 before it got restructured. I could have simply added synchronized to markPartitionCompletedInAllTaskSets()
instead of restructuring the PR but did not do so because of the following suggestion by @squito which I agree with.
I'm not sure how to fix this. You could make TaskSchedulerImpl.markPartitionCompletedInAllTaskSets() synchronized, but then you're getting a lock on the taskSchedulerImpl in the DAGScheduler event loop. That's not good for scheduling throughput, and also want to make sure there is no change of deadlock.
So I would like to once again ask the same question as above: is the scheduling throughput getting impacted by adding the extra synchronization? Also is there a possibility of hitting any deadlock due to the synchronized block?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ah thanks for pointing it out! Yes it's very similar, except that this PR sends message to the task result getter thread pool first, to avoid locking in the DAGScheduler event loop thread.
I don't think we will hit a deadlock. The task result getter thread pool calls TaskSchedulerImpl.handleSuccessfulTask
, which is synchronized too. And handleSuccessfulTask
calls method of TaskSetManager
, which is the same as the newly added handlePartitionCompleted
.
Test build #104587 has finished for PR 24375 at commit
|
Retest this please. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Have you considered making the set of completed partitions a structure that is safe to access from multiple threads, eg. an AtomicBitSet? the more I think about it, the more that seems like the best solution to deal with how important that set is to multiple threads.
*/ | ||
private[scheduler] def markPartitionCompletedInAllTaskSets( | ||
private[scheduler] def markPartitionCompleted( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
taskResultGetter is a multithreaded pool (default 4) so I think you still need extra protection here
partitionToIndex.get(partitionId).foreach { index => | ||
if (!successful(index)) { | ||
if (speculationEnabled && !isZombie) { | ||
successfulTaskDurations.insert(taskInfo.duration) | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm undecided about this part of the change in general (I see an argument for doing it both ways) but I'd still prefer it is not in this PR.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
then how about we use 0
as the task duration here? There is no taskInfo
so I have to update this part.
Test build #104598 has finished for PR 24375 at commit
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes it's kind of a variant of #21131. This PR covers the corner case that, an active There is a corner case that is un-fixable: when a task from a zombie TSM completes, and before we notify the active TSM about it, the active TSM has already submitted and completed the task for the same partition. #21131 doesn't cover it, and this PR doesn't cover it either. But this PR does make it more likely to happen, because we go through the event loop, so the active TSM takes longer to know that a partition has completed. Maybe we can combine the solution? e.g. when a task from zombie TSM completes, notify the active TSM immediately. Later the |
Though we have nothing to do with finding task in TSM 1.1(active) submitted and completed successfully, and TSM 1.0(zombie) has a running task for the same partition. So, task in TSM 1.0 (zombie) would fail later, but it do not have any impact on TSM 1.1(active), and TSM 1.0/1.1 would both finished normally. |
I think we are discussing the optimization(saving resource) instead of bug? Nothing will go wrong even without #21131 UPDATE: For tasks that write to file sources, which need to commit to the central coordinator, only one task can complete for one partition. In this case, if a task from zombie TSM completes first, then the corresponding task in the active TSM will fail and get re-tried, and fail again, until the stage finishes(all partitions complete). The job doesn't fail, but the resource is wasted a lot. If the task from the active TSM completes first, then the corresponding task from the zombie TSM will fail. This is totally fine, as zombie TSM does not retry tasks. That said, this PR tries to avoid the worst case described above. Even if we go through the event loop now, I don't think it will take a very long time that the active TSM is already finished. |
Test build #104611 has finished for PR 24375 at commit
|
retest this please |
Test build #104620 has finished for PR 24375 at commit
|
Does this pr really cover the corner case ? Do I miss something ?
Yeah, I think this pr could avoid the worst case event if we go through the event loop. But, doesn't #21131 could do it either ? What's the advantage compares to #21131 ?
I'm neutral on combining the solution since we're doing optimization(saving resource) instead of bug. Though, I'm wodering whether it would do a good help. |
#21131 did not cover the corner case, that's why we open #23871 . Do I miss something? Maybe you were asking why I prefer this PR over #23871 . The reason is, we don't need to sync the states between |
I've updated the PR description, to make it clearer about what the problem is and why propose this fix. |
No, I'm asking "does this pr really covers the corner case that an active TaskSetManager hasn't been created when a previous task succeed." If it does, how ? Because I didn't see it from the code. |
I've added it to the PR description. let me quote it here
|
Oh, yes, I get to know how this pr could fix the corner case and work finally. It's really a good solution. And now, I would suggest not combine #21131. |
@cloud-fan I tested the PR a couple of times, tests look good. LGTM. |
LGTM |
So currently there are three places that tracks the partition status:
It shall be ideal that we keep track of the pending partitions of each stage in a data structure and update it in a synchronous way. The major problem here is that the DAGScheduler rely on MapStatusTracker to read the shuffle partitions statuses, which is updated asynchronously. If we don't want to make major change to current infrastructure, the best approach I can think of is to just let DAGScheduler make all the final decision whether a stage has been completed, and all TSM shall update their own status according to that. The only shortcoming here is just there is a time window that some TSM(s) has finished all the tasks but the MapStatusTracker is not yet updated, in this case we shall see unnecessary tasks still running. To further avoid this case we can implement another approach that Wenchen suggested -- To have a status cache for TSM, when a task from zombie TSM completes, notify the active TSM immediately. |
The PR looks good from my side. The only concern I have is whether we shall backport this to 2.4? I know the previous PRs are backported, but my feeling is, strictly speaking the PR is a improvement instead of a bugfix, and it's making major change to a relatively critical and fragile part of Spark. Considering there has been one instance that a PR has caused regression of 2.4.1, personally I would suggest we be more conservative and only include this in master branch. Anyway it's just my two cents, feel free to correct me @squito @cloud-fan |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
some minor comments but overall I think this is fine.
As for what branch to put it in ... I would have liked to get this into 2.4 (I do think this is a bug fix). But I'm also pretty embarrassed about the prior regression, and it probably makes more sense to tread carefully here and only put this in master.
// finished. Here we notify the task scheduler to skip running tasks for the same partition, | ||
// to save resource. | ||
if (task.stageAttemptId < stage.latestInfo.attemptNumber()) { | ||
taskScheduler.notifyPartitionCompletion(stageId, task.partitionId) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
you could pass in event.taskInfo
here, or perhaps even jsut event.taskInfo.duration
partitionToIndex.get(partitionId).foreach { index => | ||
if (!successful(index)) { | ||
if (speculationEnabled && !isZombie) { | ||
successfulTaskDurations.insert(taskInfo.duration) | ||
// The task is skipped, its duration should be 0. | ||
successfulTaskDurations.insert(0) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
sorry I wasn't clear before -- I don't think you should insert 0, that would really mess up the speculation calculation. I suggested above to pass in the duration. But if there is some reason I'm missing for why that isn't possible, then its OK to go back to the way you originally had it, by entirely skipping adding to successfulTaskDurations.
@@ -155,6 +155,12 @@ private[spark] class TaskResultGetter(sparkEnv: SparkEnv, scheduler: TaskSchedul | |||
} | |||
} | |||
|
|||
def enqueuePartitionCompletionNotification(stageId: Int, partitionId: Int): Unit = { | |||
getTaskResultExecutor.execute(() => Utils.logUncaughtExceptions { | |||
scheduler.handlePartitionCompleted(stageId, partitionId) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
as we have to get a lock on the TaskSchedulerImpl
anyway, is there any advantage to this indirection here? Just avoiding aquiring the TaskSchedulerImpl
lock inside the DAGScheduler event loop? If so, that is worth a comment here
Test build #104735 has finished for PR 24375 at commit
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
lgtm assuming tests pass
for (id <- Set(0, 1)) { | ||
taskSetManager1.handleSuccessfulTask(id, createTaskResult(id, accumUpdatesByTask(id))) | ||
assert(sched.endedTasks(id) === Success) | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think now that you're using the duration across stage attempts for speculation, you don't need to change this test anymore -- but I also don't feel strongly that we really need to keep the old test either.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This test case creates 2 TSM, then complete the task of one TSM, and expect it to mark the corresponding task of the other TSM to completed.
This is not true anymore, as we need to go through DAGScheduler to let it happen, but this test suite uses a fake DAGScheduler. To simplify the code, I just create one TSM, and then call markPartitionCompleted
directly.
Test build #104738 has finished for PR 24375 at commit
|
cc @dbtsai |
Retest this please. |
Test build #104789 has finished for PR 24375 at commit
|
retest this please |
Test build #104980 has finished for PR 24375 at commit
|
thanks, merging to master! |
…culative tasks ## What changes were proposed in this pull request? This is a followup of #24375 When `TaskSetManager` skips a task because its corresponding partition is already completed by other `TaskSetManager`s, we should not consider the duration of the task that is finished by other `TaskSetManager`s to schedule the speculative tasks of this `TaskSetManager`. ## How was this patch tested? updated test case Closes #24485 from cloud-fan/minor. Authored-by: Wenchen Fan <[email protected]> Signed-off-by: Imran Rashid <[email protected]>
…eption many times Ref: LIHADOOP-53705 https://issues.apache.org/jira/browse/SPARK-25250 reports a bug that, a task which is failed with `CommitDeniedException` gets retried many times. This can happen when a stage has 2 task set managers, one is zombie, one is active. A task from the zombie TSM completes, and commits to a central coordinator(assuming it's a file writing task). Then the corresponding task from the active TSM will fail with `CommitDeniedException`. `CommitDeniedException.countTowardsTaskFailures` is false, so the active TSM will keep retrying this task, until the job finishes. This wastes resource a lot. However, apache#23871 has a bug and was reverted in apache#24359. With hindsight, apache#23781 is fragile because we need to sync the states between `DAGScheduler` and `TaskScheduler`, about which partitions are completed. This PR proposes a new fix: 1. When `DAGScheduler` gets a task success event from an earlier attempt, notify the `TaskSchedulerImpl` about it 2. When `TaskSchedulerImpl` knows a partition is already completed, ask the active `TaskSetManager` to mark the corresponding task as finished, if the task is not finished yet. This fix covers the corner case, because: 1. If `DAGScheduler` gets the task completion event from zombie TSM before submitting the new stage attempt, then `DAGScheduler` knows that this partition is completed, and it will exclude this partition when creating task set for the new stage attempt. See `DAGScheduler.submitMissingTasks` 2. If `DAGScheduler` gets the task completion event from zombie TSM after submitting the new stage attempt, then the active TSM is already created. Compared to the previous fix, the message loop becomes longer, so it's likely that, the active task set manager has already retried the task multiple times. But this failure window won't be too big, and we want to avoid the worse case that retries the task many times until the job finishes. So this solution is acceptable. a new test case. Closes apache#24375 from cloud-fan/fix2. Authored-by: Wenchen Fan <[email protected]> Signed-off-by: Wenchen Fan <[email protected]> RB=2113301 BUG=LIHADOOP-53705 G=spark-reviewers R=chsingh A=chsingh
What changes were proposed in this pull request?
https://issues.apache.org/jira/browse/SPARK-25250 reports a bug that, a task which is failed with
CommitDeniedException
gets retried many times.This can happen when a stage has 2 task set managers, one is zombie, one is active. A task from the zombie TSM completes, and commits to a central coordinator(assuming it's a file writing task). Then the corresponding task from the active TSM will fail with
CommitDeniedException
.CommitDeniedException.countTowardsTaskFailures
is false, so the active TSM will keep retrying this task, until the job finishes. This wastes resource a lot.#21131 firstly implements that a previous successful completed task from zombie
TaskSetManager
could mark the task of the same partition completed in the activeTaskSetManager
. Later #23871 improves the implementation to cover a corner case that, an activeTaskSetManager
hasn't been created when a previous task succeed.However, #23871 has a bug and was reverted in #24359. With hindsight, #23781 is fragile because we need to sync the states between
DAGScheduler
andTaskScheduler
, about which partitions are completed.This PR proposes a new fix:
DAGScheduler
gets a task success event from an earlier attempt, notify theTaskSchedulerImpl
about itTaskSchedulerImpl
knows a partition is already completed, ask the activeTaskSetManager
to mark the corresponding task as finished, if the task is not finished yet.This fix covers the corner case, because:
DAGScheduler
gets the task completion event from zombie TSM before submitting the new stage attempt, thenDAGScheduler
knows that this partition is completed, and it will exclude this partition when creating task set for the new stage attempt. SeeDAGScheduler.submitMissingTasks
DAGScheduler
gets the task completion event from zombie TSM after submitting the new stage attempt, then the active TSM is already created.Compared to the previous fix, the message loop becomes longer, so it's likely that, the active task set manager has already retried the task multiple times. But this failure window won't be too big, and we want to avoid the worse case that retries the task many times until the job finishes. So this solution is acceptable.
How was this patch tested?
a new test case.