-
Notifications
You must be signed in to change notification settings - Fork 28.5k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[SPARK-13343] speculative tasks that didn't commit shouldn't be marked as success #21653
Conversation
cc @tgravescs |
ok to test |
Test build #92425 has finished for PR 21653 at commit
|
Test build #92426 has finished for PR 21653 at commit
|
+1, changes look good to me. @squito see any problems with this approach? |
@hthuynh2 can you fix the description "as success of failures" , this is just a copy of my typo in the jira. Can you just change to be "as success" |
I updated it. Thanks. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
approach makes sense to me, I have some suggestions for making the test a bit better.
"exec1" -> "host1", | ||
"exec1" -> "host1", | ||
"exec2" -> "host2", | ||
"exec2" -> "host2")) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: double indent the contents of the List
manager.handleSuccessfulTask(3, createTaskResult(3, accumUpdatesByTask(3))) | ||
// Verify that it kills other running attempt | ||
verify(sched.backend).killTask(4, "exec1", true, "another attempt succeeded") | ||
// Complete another attempt for the running task |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
can you expand this comment to explain why you're doing this? without looking at the bug, it's easy to think this part is wrong, but in fact its the most important part of your test. eg:
There is a race between the scheduler asking to kill the other task, and that task actually finishing. We simulate what happens if the other task finishes before we kill it.
manager.handleSuccessfulTask(4, createTaskResult(3, accumUpdatesByTask(3))) | ||
|
||
assert(manager.taskInfos(3).successful == true) | ||
assert(manager.taskInfos(4).killed == true) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
it seems the main thing you're trying to change here is what gets passed to DAGScheduler.taskEnded
, so shouldn't you be verifying that here?
IIUC this speculative task is not really killed right ? It is actually ignored. Does that worth it to add a new TaskState for this case ? |
@jiangxb1987 yes, you are correct that it is actually ignored. I think it doesn't worth to add a new TaskState because we might need to add changes in many places but does not get much benefit from it. Instead, I think we can add some message to the kill reason to differentiate it from task that is actually killed and to inform the user. |
Test build #92688 has finished for PR 21653 at commit
|
@squito Thanks for the suggestions. I updated it. Could you please have a look at it to see if there is anything else I need to change? Thanks. |
Test build #92690 has finished for PR 21653 at commit
|
@@ -723,6 +723,13 @@ private[spark] class TaskSetManager( | |||
def handleSuccessfulTask(tid: Long, result: DirectTaskResult[_]): Unit = { | |||
val info = taskInfos(tid) | |||
val index = info.index | |||
// Check if any other attempt succeeded before this and this attempt has not been handled | |||
if (successful(index) && killedByOtherAttempt(index)) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
For completeness, we will also need to 'undo' the changes in enqueueSuccessfulTask
: to reverse the counters in canFetchMoreResults
.
(Orthogonal to this PR): Looking at use of killedByOtherAttempt
, I see that there is a bug in executorLost
w.r.t how it is updated - hopefully a fix for SPARK-24755 wont cause issues here.
lets review #21729 before this since its changing the type on killedByOtherAttempt |
…3343 resolve conflict with SPARK-24755
@tgravescs I updated it. Can you please have a look at it when you have time. Thank you. |
Test build #93302 has finished for PR 21653 at commit
|
@@ -723,6 +723,21 @@ private[spark] class TaskSetManager( | |||
def handleSuccessfulTask(tid: Long, result: DirectTaskResult[_]): Unit = { | |||
val info = taskInfos(tid) | |||
val index = info.index | |||
// Check if any other attempt succeeded before this and this attempt has not been handled | |||
if (successful(index) && killedByOtherAttempt.contains(tid)) { | |||
calculatedTasks -= 1 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
please add a comment here about cleaning up things from incremented earlier while handling it as successful
val resultSizeAcc = result.accumUpdates.find(a => | ||
a.name == Some(InternalAccumulator.RESULT_SIZE)) | ||
if (resultSizeAcc.isDefined) { | ||
totalResultSize -= resultSizeAcc.get.asInstanceOf[LongAccumulator].value |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
the downside here is we already incremented and other tasks could have checked and failed before we decrement, but unless someone else has a better idea this is better then it is now.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I agree, I dont see a better option.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM, pending @tgravescs's suggestions.
val resultSizeAcc = result.accumUpdates.find(a => | ||
a.name == Some(InternalAccumulator.RESULT_SIZE)) | ||
if (resultSizeAcc.isDefined) { | ||
totalResultSize -= resultSizeAcc.get.asInstanceOf[LongAccumulator].value |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I agree, I dont see a better option.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM
Test build #93418 has finished for PR 21653 at commit
|
@tgravescs Can you please run the test again, thank you. |
test this please |
Test build #93447 has finished for PR 21653 at commit
|
test this please |
Test build #93474 has finished for PR 21653 at commit
|
test this please |
3 similar comments
test this please |
test this please |
test this please |
I kicked off the test manually at https://spark-prs.appspot.com/users/hthuynh2. I dunno why the test triggering via comments stops workign on some prs |
Test build #4222 has finished for PR 21653 at commit
|
+1 |
merged to master, thanks @hthuynh2 |
Description
Currently Speculative tasks that didn't commit can show up as success (depending on timing of commit). This is a bit confusing because that task didn't really succeed in the sense it didn't write anything.
I think these tasks should be marked as KILLED or something that is more obvious to the user exactly what happened. it is happened to hit the timing where it got a commit denied exception then it shows up as failed and counts against your task failures. It shouldn't count against task failures since that failure really doesn't matter.
MapReduce handles these situation so perhaps we can look there for a model.
How can this issue happen?
When both attempts of a task finish before the driver sends command to kill one of them, both of them send the status update FINISHED to the driver. The driver calls TaskSchedulerImpl to handle one successful task at a time. When it handles the first successful task, it sends the command to kill the other copy of the task, however, because that task is already finished, the executor will ignore the command. After finishing handling the first attempt, it processes the second one, although all actions on the result of this task are skipped, this copy of the task is still marked as SUCCESS. As a result, even though this issue does not affect the result of the job, it might cause confusing to user because both of them appear to be successful.
How does this PR fix the issue?
The simple way to fix this issue is that when taskSetManager handles successful task, it checks if any other attempt succeeded. If this is the case, it will call handleFailedTask with state==KILLED and reason==TaskKilled(“another attempt succeeded”) to handle this task as begin killed.
How was this patch tested?
I tested this manually by running applications, that caused the issue before, a few times, and observed that the issue does not happen again. Also, I added a unit test in TaskSetManagerSuite to test that if we call handleSuccessfulTask to handle status update for 2 copies of a task, only the one that is handled first will be mark as SUCCESS