Skip to content

Commit

Permalink
Potential solution to posting listener events
Browse files Browse the repository at this point in the history
  • Loading branch information
pwendell committed Aug 4, 2014
1 parent 0bb0e33 commit 9860c55
Showing 1 changed file with 8 additions and 3 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -792,9 +792,14 @@ class DAGScheduler(
val stageId = task.stageId
val taskType = Utils.getFormattedClassName(task)

if (!stageIdToStage.contains(task.stageId)) {
// The success case is dealt with separately below, since we need to compute accumulator
// updates before posting.
if (event.reason != Success) {
listenerBus.post(SparkListenerTaskEnd(stageId, taskType, event.reason, event.taskInfo,
event.taskMetrics))
}

if (!stageIdToStage.contains(task.stageId)) {
// Skip all the actions if the stage has been cancelled.
return
}
Expand Down Expand Up @@ -829,6 +834,8 @@ class DAGScheduler(
AccumulableInfo(id, name, Some(stringPartialValue), stringValue)
}
}
listenerBus.post(SparkListenerTaskEnd(stageId, taskType, event.reason, event.taskInfo,
event.taskMetrics))
}
pendingTasks(stage) -= task
task match {
Expand Down Expand Up @@ -959,8 +966,6 @@ class DAGScheduler(
// Unrecognized failure - also do nothing. If the task fails repeatedly, the TaskScheduler
// will abort the job.
}
listenerBus.post(SparkListenerTaskEnd(stageId, taskType, event.reason, event.taskInfo,
event.taskMetrics))
submitWaitingStages()
}

Expand Down

0 comments on commit 9860c55

Please sign in to comment.