Skip to content

Commit

Permalink
apache#47 ShuffleMapTask ignore spark.driver.maxResultSize
Browse files Browse the repository at this point in the history
  • Loading branch information
7mming7 committed Sep 23, 2019
1 parent 1667db0 commit 2e4637c
Showing 1 changed file with 15 additions and 5 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -734,29 +734,39 @@ private[spark] class TaskSetManager(
}
}

private def releaseTotalResultSize(result: DirectTaskResult[_]) = {
val resultSizeAcc = result.accumUpdates.find(a =>
a.name == Some(InternalAccumulator.RESULT_SIZE))
if (resultSizeAcc.isDefined) {
totalResultSize -= resultSizeAcc.get.asInstanceOf[LongAccumulator].value
}
}

/**
* Marks a task as successful and notifies the DAGScheduler that the task has ended.
*/
def handleSuccessfulTask(tid: Long, result: DirectTaskResult[_]): Unit = {
val info = taskInfos(tid)
val index = info.index
val task = tasks(index)
// Check if any other attempt succeeded before this and this attempt has not been handled
if (successful(index) && killedByOtherAttempt.contains(tid)) {
// Undo the effect on calculatedTasks and totalResultSize made earlier when
// checking if can fetch more results
calculatedTasks -= 1
val resultSizeAcc = result.accumUpdates.find(a =>
a.name == Some(InternalAccumulator.RESULT_SIZE))
if (resultSizeAcc.isDefined) {
totalResultSize -= resultSizeAcc.get.asInstanceOf[LongAccumulator].value
}
releaseTotalResultSize(result)

// Handle this task as a killed task
handleFailedTask(tid, TaskState.KILLED,
TaskKilled("Finish but did not commit due to another attempt succeeded"))
return
}

task match {
case smt: ShuffleMapTask => releaseTotalResultSize(result)
case _ => None
}

info.markFinished(TaskState.FINISHED, clock.getTimeMillis())
if (speculationEnabled) {
successfulTaskDurations.insert(info.duration)
Expand Down

0 comments on commit 2e4637c

Please sign in to comment.