Skip to content

Commit

Permalink
Fixed issue and added unit test
Browse files Browse the repository at this point in the history
  • Loading branch information
Hieu Huynh authored and Hieu Huynh committed Jun 28, 2018
1 parent b070ded commit 8f7d981
Show file tree
Hide file tree
Showing 2 changed files with 66 additions and 0 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -723,6 +723,12 @@ private[spark] class TaskSetManager(
def handleSuccessfulTask(tid: Long, result: DirectTaskResult[_]): Unit = {
val info = taskInfos(tid)
val index = info.index
// Check if any other attempt succeeded before this and this attempt has not been handled
if (successful(index) && killedByOtherAttempt(index)) {
handleFailedTask(tid, TaskState.KILLED, TaskKilled("another attempt succeeded"))
return
}

info.markFinished(TaskState.FINISHED, clock.getTimeMillis())
if (speculationEnabled) {
successfulTaskDurations.insert(info.duration)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1371,4 +1371,64 @@ class TaskSetManagerSuite extends SparkFunSuite with LocalSparkContext with Logg
val valueSer = SparkEnv.get.serializer.newInstance()
new DirectTaskResult[Int](valueSer.serialize(id), accumUpdates)
}

test("SPARK-13343 speculative tasks that didn't commit shouldn't be marked as success"){
sc = new SparkContext("local", "test")
sched = new FakeTaskScheduler(sc, ("exec1", "host1"), ("exec2", "host2"))
val taskSet = FakeTask.createTaskSet(4)
// Set the speculation multiplier to be 0 so speculative tasks are launched immediately
sc.conf.set("spark.speculation.multiplier", "0.0")
sc.conf.set("spark.speculation", "true")
val clock = new ManualClock()
val manager = new TaskSetManager(sched, taskSet, MAX_TASK_FAILURES, clock = clock)
val accumUpdatesByTask: Array[Seq[AccumulatorV2[_, _]]] = taskSet.tasks.map { task =>
task.metrics.internalAccums
}

// Offer resources for 4 tasks to start
for ((k, v) <- List(
"exec1" -> "host1",
"exec1" -> "host1",
"exec2" -> "host2",
"exec2" -> "host2")) {
val taskOption = manager.resourceOffer(k, v, NO_PREF)
assert(taskOption.isDefined)
val task = taskOption.get
assert(task.executorId === k)
}
assert(sched.startedTasks.toSet === Set(0, 1, 2, 3))
clock.advance(1)
// Complete the 3 tasks and leave 1 task in running
for (id <- Set(0, 1, 2)) {
manager.handleSuccessfulTask(id, createTaskResult(id, accumUpdatesByTask(id)))
assert(sched.endedTasks(id) === Success)
}

// checkSpeculatableTasks checks that the task runtime is greater than the threshold for
// speculating. Since we use a threshold of 0 for speculation, tasks need to be running for
// > 0ms, so advance the clock by 1ms here.
clock.advance(1)
assert(manager.checkSpeculatableTasks(0))
assert(sched.speculativeTasks.toSet === Set(3))

// Offer resource to start the speculative attempt for the running task
val taskOption5 = manager.resourceOffer("exec1", "host1", NO_PREF)
assert(taskOption5.isDefined)
val task5 = taskOption5.get
assert(task5.index === 3)
assert(task5.taskId === 4)
assert(task5.executorId === "exec1")
assert(task5.attemptNumber === 1)
sched.backend = mock(classOf[SchedulerBackend])

// Complete one attempt for the running task
manager.handleSuccessfulTask(3, createTaskResult(3, accumUpdatesByTask(3)))
// Verify that it kills other running attempt
verify(sched.backend).killTask(4, "exec1", true, "another attempt succeeded")
// Complete another attempt for the running task
manager.handleSuccessfulTask(4, createTaskResult(3, accumUpdatesByTask(3)))

assert(manager.taskInfos(3).successful == true)
assert(manager.taskInfos(4).killed == true)
}
}

0 comments on commit 8f7d981

Please sign in to comment.