-
Notifications
You must be signed in to change notification settings - Fork 28.5k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[SPARK-1749] Job cancellation when SchedulerBackend does not implement killTask #686
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -1062,10 +1062,15 @@ class DAGScheduler( | |
// This is the only job that uses this stage, so fail the stage if it is running. | ||
val stage = stageIdToStage(stageId) | ||
if (runningStages.contains(stage)) { | ||
taskScheduler.cancelTasks(stageId, shouldInterruptThread) | ||
val stageInfo = stageToInfos(stage) | ||
stageInfo.stageFailed(failureReason) | ||
listenerBus.post(SparkListenerStageCompleted(stageToInfos(stage))) | ||
try { // cancelTasks will fail if a SchedulerBackend does not implement killTask | ||
taskScheduler.cancelTasks(stageId, shouldInterruptThread) | ||
val stageInfo = stageToInfos(stage) | ||
stageInfo.stageFailed(failureReason) | ||
listenerBus.post(SparkListenerStageCompleted(stageToInfos(stage))) | ||
} catch { | ||
case e: UnsupportedOperationException => | ||
logInfo(s"Could not cancel tasks for stage $stageId", e) | ||
} | ||
} | ||
} | ||
} | ||
|
@@ -1155,7 +1160,11 @@ private[scheduler] class DAGSchedulerActorSupervisor(dagScheduler: DAGScheduler) | |
case x: Exception => | ||
logError("eventProcesserActor failed due to the error %s; shutting down SparkContext" | ||
.format(x.getMessage)) | ||
dagScheduler.doCancelAllJobs() | ||
try { | ||
dagScheduler.doCancelAllJobs() | ||
} catch { | ||
case t: Throwable => logError("DAGScheduler failed to cancel all jobs.", t) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. What throwable do you get here? It looks like the UnsupportedOperationException is caught in DAGScheduler and not re-thrown? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Who knows just what the SchedulerBackend is going to throw now or in the future? UnsupportedOperationException is handled in failJobAndIndependentStages, but if something else is thrown out of the backend or doCancelAllJobs fails for any other reason, we'll just log it here and continue trying to shutdown. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Oh oh I see I didn't realize that this was part of the shut down code; this makes sense now There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. should probably not catch There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. @aarondav So shall I just back off to There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Alas, I'd have to go with @markhamstra's original solution (catching Throwable) on this one. I didn't include Akka-related code in my refactoring for precisely the reason that we don't have a great solution for it right now. We can factor it back to Exception in a later cleanup when such a solution appears. |
||
} | ||
dagScheduler.sc.stop() | ||
Stop | ||
} | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sorry Mark one more question here -- can we move the job.listener.jobFailed(error) call from line 1041 to here in the "try" clause? It seems weird to tell the user the job has been cancelled when, in fact, it hasn't.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hmmm... not sure that I agree. A job being cancelled, stages being cancelled, and tasks being cancelled are all different things. The expectation is that job cancellation will lead to cancellation of independent stages and their associated tasks; but if no stages and tasks get cancelled, it's probably still worthwhile for the information to be sent that the job itself was cancelled. I expect that eventually all of the backends will support task killing, so this whole no-kill path should never be hit. But moving the job cancellation notification within the try-to-cancelTasks block will result in multiple notifications that the parent job was cancelled -- one for each independent stage cancellation. Or am I misunderstanding what you are suggesting?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ah you're right that it doesn't make sense to add that here because it will be called for each stage. My intention was that if the job has running stages that don't get cancelled (because the task scheduler doesn't implement cancelTasks()), then we should not call job.listener.jobFailed() -- do you think that makes sense? Seems like the way to implement that would be to set a boolean flag here if the job can't be successfully cancelled, and then call jobFailed() 0 or 1 times at the end of this function depending on that flag.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What you suggest could be done, but there's a question of whether or not notification of cancellation of the job should be made regardless of whether any stages and task are successfully cancelled as a consequence. I don't really know how to answer that because I don't know how all of the listeners are using the notification or whether they are all expecting the same semantics.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@pwendell what do you think here?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do we have the meaning of all the listener events fully documented someplace? Or perhaps that needs to be done in a separate PR and then DAGScheduler updated to match the documented expectation?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think the correct semantics here are that we only trigger
jobFailed
if the job was succesfully halted. This is mostly consumed downstream by tests, but it's also used by code supporting asynchronous actions and approximate results. In those cases, it would be better not to notifyjobFailed
if the cancellation doesn't succeed, because they both assume that the hob has finished executing if that message is received.Separately we should probably update the documentation in cancel to explain that it is a "best effort" method and will only be called if supported by the underlying scheduler. Otherwise, we should say it will act as a no-op, i.e. it will act is if
cancel
was never called. With this approach downstream consumers will only have two cases to worry about (a job was cancelled or it wasn't) rather than a third case, where we say it was cancelled but it secretely actually wasn't.There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ok, either tonight or tomorrow I can update this PR to reflect that strategy, or you can go ahead and make the change @pwendell. Outside the immediate scope of this PR, what prevents Mesos from being able to kill tasks, and when do we expect that to change?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'll just take care of it... thanks mark!
Task killing is not supported in the fine-grained mode on mesos because, in that mode, we use Mesos's built in support for all of the control plane messages relating to tasks. So we'll have to figure out how to support killing tasks in that model. There are two questions, one is who actually sends the "kill" message to the executor and the other is how we tell Mesos that the cores are freed which were in use by the task. In the course of normal operation that's handled by using the Mesos launchTask and sendStatusUpdate interfaces.