Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-1749] Job cancellation when SchedulerBackend does not implement killTask #686

Closed
wants to merge 4 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 14 additions & 5 deletions core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
Original file line number Diff line number Diff line change
Expand Up @@ -1062,10 +1062,15 @@ class DAGScheduler(
// This is the only job that uses this stage, so fail the stage if it is running.
val stage = stageIdToStage(stageId)
if (runningStages.contains(stage)) {
taskScheduler.cancelTasks(stageId, shouldInterruptThread)
val stageInfo = stageToInfos(stage)
stageInfo.stageFailed(failureReason)
listenerBus.post(SparkListenerStageCompleted(stageToInfos(stage)))
try { // cancelTasks will fail if a SchedulerBackend does not implement killTask
taskScheduler.cancelTasks(stageId, shouldInterruptThread)
val stageInfo = stageToInfos(stage)
stageInfo.stageFailed(failureReason)
listenerBus.post(SparkListenerStageCompleted(stageToInfos(stage)))
} catch {
case e: UnsupportedOperationException =>
logInfo(s"Could not cancel tasks for stage $stageId", e)
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry Mark one more question here -- can we move the job.listener.jobFailed(error) call from line 1041 to here in the "try" clause? It seems weird to tell the user the job has been cancelled when, in fact, it hasn't.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmmm... not sure that I agree. A job being cancelled, stages being cancelled, and tasks being cancelled are all different things. The expectation is that job cancellation will lead to cancellation of independent stages and their associated tasks; but if no stages and tasks get cancelled, it's probably still worthwhile for the information to be sent that the job itself was cancelled. I expect that eventually all of the backends will support task killing, so this whole no-kill path should never be hit. But moving the job cancellation notification within the try-to-cancelTasks block will result in multiple notifications that the parent job was cancelled -- one for each independent stage cancellation. Or am I misunderstanding what you are suggesting?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah you're right that it doesn't make sense to add that here because it will be called for each stage. My intention was that if the job has running stages that don't get cancelled (because the task scheduler doesn't implement cancelTasks()), then we should not call job.listener.jobFailed() -- do you think that makes sense? Seems like the way to implement that would be to set a boolean flag here if the job can't be successfully cancelled, and then call jobFailed() 0 or 1 times at the end of this function depending on that flag.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What you suggest could be done, but there's a question of whether or not notification of cancellation of the job should be made regardless of whether any stages and task are successfully cancelled as a consequence. I don't really know how to answer that because I don't know how all of the listeners are using the notification or whether they are all expecting the same semantics.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@pwendell what do you think here?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we have the meaning of all the listener events fully documented someplace? Or perhaps that needs to be done in a separate PR and then DAGScheduler updated to match the documented expectation?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think the correct semantics here are that we only trigger jobFailed if the job was succesfully halted. This is mostly consumed downstream by tests, but it's also used by code supporting asynchronous actions and approximate results. In those cases, it would be better not to notify jobFailed if the cancellation doesn't succeed, because they both assume that the hob has finished executing if that message is received.

Separately we should probably update the documentation in cancel to explain that it is a "best effort" method and will only be called if supported by the underlying scheduler. Otherwise, we should say it will act as a no-op, i.e. it will act is if cancel was never called. With this approach downstream consumers will only have two cases to worry about (a job was cancelled or it wasn't) rather than a third case, where we say it was cancelled but it secretely actually wasn't.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok, either tonight or tomorrow I can update this PR to reflect that strategy, or you can go ahead and make the change @pwendell. Outside the immediate scope of this PR, what prevents Mesos from being able to kill tasks, and when do we expect that to change?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'll just take care of it... thanks mark!

Task killing is not supported in the fine-grained mode on mesos because, in that mode, we use Mesos's built in support for all of the control plane messages relating to tasks. So we'll have to figure out how to support killing tasks in that model. There are two questions, one is who actually sends the "kill" message to the executor and the other is how we tell Mesos that the cores are freed which were in use by the task. In the course of normal operation that's handled by using the Mesos launchTask and sendStatusUpdate interfaces.

}
}
}
Expand Down Expand Up @@ -1155,7 +1160,11 @@ private[scheduler] class DAGSchedulerActorSupervisor(dagScheduler: DAGScheduler)
case x: Exception =>
logError("eventProcesserActor failed due to the error %s; shutting down SparkContext"
.format(x.getMessage))
dagScheduler.doCancelAllJobs()
try {
dagScheduler.doCancelAllJobs()
} catch {
case t: Throwable => logError("DAGScheduler failed to cancel all jobs.", t)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What throwable do you get here? It looks like the UnsupportedOperationException is caught in DAGScheduler and not re-thrown?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Who knows just what the SchedulerBackend is going to throw now or in the future? UnsupportedOperationException is handled in failJobAndIndependentStages, but if something else is thrown out of the backend or doCancelAllJobs fails for any other reason, we'll just log it here and continue trying to shutdown.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh oh I see I didn't realize that this was part of the shut down code; this makes sense now

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should probably not catch Throwable here (#715 is working towards reverting all of these)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@aarondav So shall I just back off to case e: Exception => here and let Throwable be picked up in a larger refactoring of Akka exception handling?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Alas, I'd have to go with @markhamstra's original solution (catching Throwable) on this one. I didn't include Akka-related code in my refactoring for precisely the reason that we don't have a great solution for it right now. We can factor it back to Exception in a later cleanup when such a solution appears.

}
dagScheduler.sc.stop()
Stop
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -115,6 +115,7 @@ class DAGSchedulerSuite extends TestKit(ActorSystem("DAGSchedulerSuite")) with F
sc = new SparkContext("local", "DAGSchedulerSuite")
sparkListener.successfulStages.clear()
sparkListener.failedStages.clear()
failure = null
sc.addSparkListener(sparkListener)
taskSets.clear()
cancelledStages.clear()
Expand Down Expand Up @@ -314,6 +315,43 @@ class DAGSchedulerSuite extends TestKit(ActorSystem("DAGSchedulerSuite")) with F
assertDataStructuresEmpty
}

test("job cancellation no-kill backend") {
// make sure that the DAGScheduler doesn't crash when the TaskScheduler
// doesn't implement killTask()
val noKillTaskScheduler = new TaskScheduler() {
override def rootPool: Pool = null
override def schedulingMode: SchedulingMode = SchedulingMode.NONE
override def start() = {}
override def stop() = {}
override def submitTasks(taskSet: TaskSet) = {}
override def cancelTasks(stageId: Int, interruptThread: Boolean) {
throw new UnsupportedOperationException
}
override def setDAGScheduler(dagScheduler: DAGScheduler) = {}
override def defaultParallelism() = 2
}
val noKillScheduler = new DAGScheduler(
sc,
noKillTaskScheduler,
sc.listenerBus,
mapOutputTracker,
blockManagerMaster,
sc.env) {
override def runLocally(job: ActiveJob) {
// don't bother with the thread while unit testing
runLocallyWithinThread(job)
}
}
dagEventProcessTestActor = TestActorRef[DAGSchedulerEventProcessActor](
Props(classOf[DAGSchedulerEventProcessActor], noKillScheduler))(system)
val rdd = makeRdd(1, Nil)
val jobId = submit(rdd, Array(0))
cancel(jobId)
assert(failure.getMessage === s"Job $jobId cancelled ")
assert(sc.listenerBus.waitUntilEmpty(WAIT_TIMEOUT_MILLIS))
assertDataStructuresEmpty
}

test("run trivial shuffle") {
val shuffleMapRdd = makeRdd(2, Nil)
val shuffleDep = new ShuffleDependency(shuffleMapRdd, null)
Expand Down