Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-24755][Core] Executor loss can cause task to not be resubmitted #21729

Closed
wants to merge 7 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -84,10 +84,10 @@ private[spark] class TaskSetManager(
val successful = new Array[Boolean](numTasks)
private val numFailures = new Array[Int](numTasks)

// Set the coresponding index of Boolean var when the task killed by other attempt tasks,
// this happened while we set the `spark.speculation` to true. The task killed by others
// Add the tid of task into this HashSet when the task is killed by other attempt tasks.
// This happened while we set the `spark.speculation` to true. The task killed by others
// should not resubmit while executor lost.
private val killedByOtherAttempt: Array[Boolean] = new Array[Boolean](numTasks)
private val killedByOtherAttempt = new HashSet[Long]

val taskAttempts = Array.fill[List[TaskInfo]](numTasks)(Nil)
private[scheduler] var tasksSuccessful = 0
Expand Down Expand Up @@ -735,7 +735,7 @@ private[spark] class TaskSetManager(
logInfo(s"Killing attempt ${attemptInfo.attemptNumber} for task ${attemptInfo.id} " +
s"in stage ${taskSet.id} (TID ${attemptInfo.taskId}) on ${attemptInfo.host} " +
s"as the attempt ${info.attemptNumber} succeeded on ${info.host}")
killedByOtherAttempt(index) = true
killedByOtherAttempt += attemptInfo.taskId
sched.backend.killTask(
attemptInfo.taskId,
attemptInfo.executorId,
Expand Down Expand Up @@ -947,7 +947,7 @@ private[spark] class TaskSetManager(
&& !isZombie) {
for ((tid, info) <- taskInfos if info.executorId == execId) {
val index = taskInfos(tid).index
if (successful(index) && !killedByOtherAttempt(index)) {
if (successful(index) && !killedByOtherAttempt.contains(tid)) {
successful(index) = false
copiesRunning(index) -= 1
tasksSuccessful -= 1
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1414,6 +1414,118 @@ class TaskSetManagerSuite extends SparkFunSuite with LocalSparkContext with Logg
taskSetManager2.checkSpeculatableTasks(0)
}


test("SPARK-24755 Executor loss can cause task to not be resubmitted") {
val conf = new SparkConf().set("spark.speculation", "true")
sc = new SparkContext("local", "test", conf)
// Set the speculation multiplier to be 0 so speculative tasks are launched immediately
sc.conf.set("spark.speculation.multiplier", "0.0")

sc.conf.set("spark.speculation.quantile", "0.5")
sc.conf.set("spark.speculation", "true")

var killTaskCalled = false
sched = new FakeTaskScheduler(sc, ("exec1", "host1"),
("exec2", "host2"), ("exec3", "host3"))
sched.initialize(new FakeSchedulerBackend() {
override def killTask(
taskId: Long,
executorId: String,
interruptThread: Boolean,
reason: String): Unit = {
// Check the only one killTask event in this case, which triggered by
// task 2.1 completed.
assert(taskId === 2)
assert(executorId === "exec3")
assert(interruptThread)
assert(reason === "another attempt succeeded")
killTaskCalled = true
}
})

// Keep track of the index of tasks that are resubmitted,
// so that the test can check that task is resubmitted correctly
var resubmittedTasks = new mutable.HashSet[Int]
val dagScheduler = new FakeDAGScheduler(sc, sched) {
override def taskEnded(
task: Task[_],
reason: TaskEndReason,
result: Any,
accumUpdates: Seq[AccumulatorV2[_, _]],
taskInfo: TaskInfo): Unit = {
super.taskEnded(task, reason, result, accumUpdates, taskInfo)
reason match {
case Resubmitted => resubmittedTasks += taskInfo.index
case _ =>
}
}
}
sched.dagScheduler.stop()
sched.setDAGScheduler(dagScheduler)

val taskSet = FakeTask.createShuffleMapTaskSet(4, 0, 0,
Seq(TaskLocation("host1", "exec1")),
Seq(TaskLocation("host1", "exec1")),
Seq(TaskLocation("host3", "exec3")),
Seq(TaskLocation("host2", "exec2")))

val clock = new ManualClock()
val manager = new TaskSetManager(sched, taskSet, MAX_TASK_FAILURES, clock = clock)
val accumUpdatesByTask: Array[Seq[AccumulatorV2[_, _]]] = taskSet.tasks.map { task =>
task.metrics.internalAccums
}
// Offer resources for 4 tasks to start
for ((exec, host) <- Seq(
"exec1" -> "host1",
"exec1" -> "host1",
"exec3" -> "host3",
"exec2" -> "host2")) {
val taskOption = manager.resourceOffer(exec, host, NO_PREF)
assert(taskOption.isDefined)
val task = taskOption.get
assert(task.executorId === exec)
// Add an extra assert to make sure task 2.0 is running on exec3
if (task.index == 2) {
assert(task.attemptNumber === 0)
assert(task.executorId === "exec3")
}
}
assert(sched.startedTasks.toSet === Set(0, 1, 2, 3))
clock.advance(1)
// Complete the 2 tasks and leave 2 task in running
for (id <- Set(0, 1)) {
manager.handleSuccessfulTask(id, createTaskResult(id, accumUpdatesByTask(id)))
assert(sched.endedTasks(id) === Success)
}

// checkSpeculatableTasks checks that the task runtime is greater than the threshold for
// speculating. Since we use a threshold of 0 for speculation, tasks need to be running for
// > 0ms, so advance the clock by 1ms here.
clock.advance(1)
assert(manager.checkSpeculatableTasks(0))
assert(sched.speculativeTasks.toSet === Set(2, 3))

// Offer resource to start the speculative attempt for the running task 2.0
val taskOption = manager.resourceOffer("exec2", "host2", ANY)
assert(taskOption.isDefined)
val task4 = taskOption.get
assert(task4.index === 2)
assert(task4.taskId === 4)
assert(task4.executorId === "exec2")
assert(task4.attemptNumber === 1)
// Complete the speculative attempt for the running task
manager.handleSuccessfulTask(4, createTaskResult(2, accumUpdatesByTask(2)))
// Make sure schedBackend.killTask(2, "exec3", true, "another attempt succeeded") gets called
assert(killTaskCalled)

assert(resubmittedTasks.isEmpty)
// Host 2 Losts, meaning we lost the map output task4
manager.executorLost("exec2", "host2", SlaveLost())
// Make sure that task with index 2 is re-submitted
assert(resubmittedTasks.contains(2))

}

private def createTaskResult(
id: Int,
accumUpdates: Seq[AccumulatorV2[_, _]] = Seq.empty): DirectTaskResult[Int] = {
Expand Down