Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-13343] speculative tasks that didn't commit shouldn't be marked as success #21653

Closed
wants to merge 8 commits into from

Conversation

hthuynh2
Copy link

@hthuynh2 hthuynh2 commented Jun 28, 2018

Description
Currently Speculative tasks that didn't commit can show up as success (depending on timing of commit). This is a bit confusing because that task didn't really succeed in the sense it didn't write anything.
I think these tasks should be marked as KILLED or something that is more obvious to the user exactly what happened. it is happened to hit the timing where it got a commit denied exception then it shows up as failed and counts against your task failures. It shouldn't count against task failures since that failure really doesn't matter.
MapReduce handles these situation so perhaps we can look there for a model.

unknown

How can this issue happen?
When both attempts of a task finish before the driver sends command to kill one of them, both of them send the status update FINISHED to the driver. The driver calls TaskSchedulerImpl to handle one successful task at a time. When it handles the first successful task, it sends the command to kill the other copy of the task, however, because that task is already finished, the executor will ignore the command. After finishing handling the first attempt, it processes the second one, although all actions on the result of this task are skipped, this copy of the task is still marked as SUCCESS. As a result, even though this issue does not affect the result of the job, it might cause confusing to user because both of them appear to be successful.

How does this PR fix the issue?
The simple way to fix this issue is that when taskSetManager handles successful task, it checks if any other attempt succeeded. If this is the case, it will call handleFailedTask with state==KILLED and reason==TaskKilled(“another attempt succeeded”) to handle this task as begin killed.

How was this patch tested?
I tested this manually by running applications, that caused the issue before, a few times, and observed that the issue does not happen again. Also, I added a unit test in TaskSetManagerSuite to test that if we call handleSuccessfulTask to handle status update for 2 copies of a task, only the one that is handled first will be mark as SUCCESS

@hthuynh2
Copy link
Author

cc @tgravescs

@tgravescs
Copy link
Contributor

ok to test

@SparkQA
Copy link

SparkQA commented Jun 28, 2018

Test build #92425 has finished for PR 21653 at commit 8f7d981.

  • This patch fails Scala style tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Jun 28, 2018

Test build #92426 has finished for PR 21653 at commit 980a933.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@tgravescs
Copy link
Contributor

+1, changes look good to me.

@squito see any problems with this approach?

@tgravescs
Copy link
Contributor

@hthuynh2 can you fix the description "as success of failures" , this is just a copy of my typo in the jira. Can you just change to be "as success"

@hthuynh2
Copy link
Author

hthuynh2 commented Jul 2, 2018

I updated it. Thanks.

Copy link
Contributor

@squito squito left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

approach makes sense to me, I have some suggestions for making the test a bit better.

"exec1" -> "host1",
"exec1" -> "host1",
"exec2" -> "host2",
"exec2" -> "host2")) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: double indent the contents of the List

manager.handleSuccessfulTask(3, createTaskResult(3, accumUpdatesByTask(3)))
// Verify that it kills other running attempt
verify(sched.backend).killTask(4, "exec1", true, "another attempt succeeded")
// Complete another attempt for the running task
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can you expand this comment to explain why you're doing this? without looking at the bug, it's easy to think this part is wrong, but in fact its the most important part of your test. eg:

There is a race between the scheduler asking to kill the other task, and that task actually finishing. We simulate what happens if the other task finishes before we kill it.

manager.handleSuccessfulTask(4, createTaskResult(3, accumUpdatesByTask(3)))

assert(manager.taskInfos(3).successful == true)
assert(manager.taskInfos(4).killed == true)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it seems the main thing you're trying to change here is what gets passed to DAGScheduler.taskEnded, so shouldn't you be verifying that here?

@jiangxb1987
Copy link
Contributor

IIUC this speculative task is not really killed right ? It is actually ignored. Does that worth it to add a new TaskState for this case ?

@hthuynh2
Copy link
Author

hthuynh2 commented Jul 3, 2018

@jiangxb1987 yes, you are correct that it is actually ignored. I think it doesn't worth to add a new TaskState because we might need to add changes in many places but does not get much benefit from it. Instead, I think we can add some message to the kill reason to differentiate it from task that is actually killed and to inform the user.

@SparkQA
Copy link

SparkQA commented Jul 6, 2018

Test build #92688 has finished for PR 21653 at commit f66fcab.

  • This patch fails Scala style tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@hthuynh2
Copy link
Author

hthuynh2 commented Jul 6, 2018

@squito Thanks for the suggestions. I updated it. Could you please have a look at it to see if there is anything else I need to change? Thanks.

@SparkQA
Copy link

SparkQA commented Jul 6, 2018

Test build #92690 has finished for PR 21653 at commit 3c655f2.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@@ -723,6 +723,13 @@ private[spark] class TaskSetManager(
def handleSuccessfulTask(tid: Long, result: DirectTaskResult[_]): Unit = {
val info = taskInfos(tid)
val index = info.index
// Check if any other attempt succeeded before this and this attempt has not been handled
if (successful(index) && killedByOtherAttempt(index)) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For completeness, we will also need to 'undo' the changes in enqueueSuccessfulTask : to reverse the counters in canFetchMoreResults.

(Orthogonal to this PR): Looking at use of killedByOtherAttempt, I see that there is a bug in executorLost w.r.t how it is updated - hopefully a fix for SPARK-24755 wont cause issues here.

@tgravescs
Copy link
Contributor

lets review #21729 before this since its changing the type on killedByOtherAttempt

@tgravescs
Copy link
Contributor

#21729 has been merged @hthuynh2 can you update this one?

@hthuynh2
Copy link
Author

@tgravescs I updated it. Can you please have a look at it when you have time. Thank you.

@SparkQA
Copy link

SparkQA commented Jul 20, 2018

Test build #93302 has finished for PR 21653 at commit 2c7d33d.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@@ -723,6 +723,21 @@ private[spark] class TaskSetManager(
def handleSuccessfulTask(tid: Long, result: DirectTaskResult[_]): Unit = {
val info = taskInfos(tid)
val index = info.index
// Check if any other attempt succeeded before this and this attempt has not been handled
if (successful(index) && killedByOtherAttempt.contains(tid)) {
calculatedTasks -= 1
Copy link
Contributor

@tgravescs tgravescs Jul 20, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

please add a comment here about cleaning up things from incremented earlier while handling it as successful

val resultSizeAcc = result.accumUpdates.find(a =>
a.name == Some(InternalAccumulator.RESULT_SIZE))
if (resultSizeAcc.isDefined) {
totalResultSize -= resultSizeAcc.get.asInstanceOf[LongAccumulator].value
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the downside here is we already incremented and other tasks could have checked and failed before we decrement, but unless someone else has a better idea this is better then it is now.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree, I dont see a better option.

Copy link
Contributor

@mridulm mridulm left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM, pending @tgravescs's suggestions.

val resultSizeAcc = result.accumUpdates.find(a =>
a.name == Some(InternalAccumulator.RESULT_SIZE))
if (resultSizeAcc.isDefined) {
totalResultSize -= resultSizeAcc.get.asInstanceOf[LongAccumulator].value
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree, I dont see a better option.

Copy link
Contributor

@jiangxb1987 jiangxb1987 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM

@SparkQA
Copy link

SparkQA commented Jul 23, 2018

Test build #93418 has finished for PR 21653 at commit b6585da.

  • This patch fails PySpark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@hthuynh2
Copy link
Author

@tgravescs Can you please run the test again, thank you.

@tgravescs
Copy link
Contributor

test this please

@SparkQA
Copy link

SparkQA commented Jul 23, 2018

Test build #93447 has finished for PR 21653 at commit b6585da.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@HyukjinKwon
Copy link
Member

test this please

@SparkQA
Copy link

SparkQA commented Jul 24, 2018

Test build #93474 has finished for PR 21653 at commit b6585da.

  • This patch fails due to an unknown error code, -9.
  • This patch merges cleanly.
  • This patch adds no public classes.

@tgravescs
Copy link
Contributor

test this please

3 similar comments
@tgravescs
Copy link
Contributor

test this please

@tgravescs
Copy link
Contributor

test this please

@tgravescs
Copy link
Contributor

test this please

@squito
Copy link
Contributor

squito commented Jul 26, 2018

I kicked off the test manually at https://spark-prs.appspot.com/users/hthuynh2. I dunno why the test triggering via comments stops workign on some prs

@SparkQA
Copy link

SparkQA commented Jul 26, 2018

Test build #4222 has finished for PR 21653 at commit b6585da.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@tgravescs
Copy link
Contributor

+1

@tgravescs
Copy link
Contributor

merged to master, thanks @hthuynh2

@asfgit asfgit closed this in 5828f41 Jul 27, 2018
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

7 participants