diff --git a/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala b/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala index 8136d8b3aac55..c1bbb237eec7b 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala @@ -29,7 +29,7 @@ import org.apache.spark._ import org.apache.spark.TaskState.TaskState import org.apache.spark.internal.{config, Logging} import org.apache.spark.scheduler.SchedulingMode._ -import org.apache.spark.util.{AccumulatorV2, Clock, SystemClock, Utils} +import org.apache.spark.util.{AccumulatorV2, Clock, LongAccumulator, SystemClock, Utils} import org.apache.spark.util.collection.MedianHeap /** @@ -724,7 +724,15 @@ private[spark] class TaskSetManager( val info = taskInfos(tid) val index = info.index // Check if any other attempt succeeded before this and this attempt has not been handled - if (successful(index) && killedByOtherAttempt(index)) { + if (successful(index) && killedByOtherAttempt.contains(tid)) { + calculatedTasks -= 1 + + val resultSizeAcc = result.accumUpdates.find(a => + a.name == Some(InternalAccumulator.RESULT_SIZE)) + if (resultSizeAcc.isDefined) { + totalResultSize -= resultSizeAcc.get.asInstanceOf[LongAccumulator].value + } + handleFailedTask(tid, TaskState.KILLED, TaskKilled("Finish but did not commit due to another attempt succeeded")) return