Skip to content

Commit

Permalink
undo effect on totalResultSize and calculatedTasks
Browse files Browse the repository at this point in the history
  • Loading branch information
Hieu Huynh authored and Hieu Huynh committed Jul 19, 2018
1 parent 7d9e4bb commit 2c7d33d
Showing 1 changed file with 10 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ import org.apache.spark._
import org.apache.spark.TaskState.TaskState
import org.apache.spark.internal.{config, Logging}
import org.apache.spark.scheduler.SchedulingMode._
import org.apache.spark.util.{AccumulatorV2, Clock, SystemClock, Utils}
import org.apache.spark.util.{AccumulatorV2, Clock, LongAccumulator, SystemClock, Utils}
import org.apache.spark.util.collection.MedianHeap

/**
Expand Down Expand Up @@ -724,7 +724,15 @@ private[spark] class TaskSetManager(
val info = taskInfos(tid)
val index = info.index
// Check if any other attempt succeeded before this and this attempt has not been handled
if (successful(index) && killedByOtherAttempt(index)) {
if (successful(index) && killedByOtherAttempt.contains(tid)) {
calculatedTasks -= 1

val resultSizeAcc = result.accumUpdates.find(a =>
a.name == Some(InternalAccumulator.RESULT_SIZE))
if (resultSizeAcc.isDefined) {
totalResultSize -= resultSizeAcc.get.asInstanceOf[LongAccumulator].value
}

handleFailedTask(tid, TaskState.KILLED,
TaskKilled("Finish but did not commit due to another attempt succeeded"))
return
Expand Down

0 comments on commit 2c7d33d

Please sign in to comment.