Skip to content

Commit

Permalink
Fixed the hanging in JobCancellationSuite.
Browse files Browse the repository at this point in the history
  • Loading branch information
rxin committed Aug 20, 2014
1 parent b3e2eed commit c414c36
Showing 1 changed file with 4 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -677,8 +677,10 @@ class DAGScheduler(
}

private[scheduler] def handleBeginEvent(task: Task[_], taskInfo: TaskInfo) {
val stageInfo = stageIdToStage(task.stageId).latestInfo
listenerBus.post(SparkListenerTaskStart(task.stageId, stageInfo.attemptId, taskInfo))
// Note that there is a chance that this task is launched after the stage is cancelled.
// In that case, we wouldn't have the stage anymore in stageIdToStage.
val stageAttemptId = stageIdToStage.get(task.stageId).map(_.latestInfo.attemptId).getOrElse(-1)
listenerBus.post(SparkListenerTaskStart(task.stageId, stageAttemptId, taskInfo))
submitWaitingStages()
}

Expand Down

0 comments on commit c414c36

Please sign in to comment.