Skip to content

Commit

Permalink
[SPARK-2298] Encode stage attempt in SparkListener & UI.
Browse files Browse the repository at this point in the history
  • Loading branch information
rxin committed Aug 18, 2014
1 parent 3a5962f commit 4e5faa2
Show file tree
Hide file tree
Showing 16 changed files with 141 additions and 102 deletions.
17 changes: 11 additions & 6 deletions core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
Original file line number Diff line number Diff line change
Expand Up @@ -164,7 +164,7 @@ class DAGScheduler(
*/
def executorHeartbeatReceived(
execId: String,
taskMetrics: Array[(Long, Int, TaskMetrics)], // (taskId, stageId, metrics)
taskMetrics: Array[(Long, Int, Int, TaskMetrics)], // (taskId, stageId, stateAttempt, metrics)
blockManagerId: BlockManagerId): Boolean = {
listenerBus.post(SparkListenerExecutorMetricsUpdate(execId, taskMetrics))
implicit val timeout = Timeout(600 seconds)
Expand Down Expand Up @@ -677,7 +677,8 @@ class DAGScheduler(
}

private[scheduler] def handleBeginEvent(task: Task[_], taskInfo: TaskInfo) {
listenerBus.post(SparkListenerTaskStart(task.stageId, taskInfo))
val stageInfo = stageIdToStage(task.stageId).info
listenerBus.post(SparkListenerTaskStart(task.stageId, stageInfo.attemptId, taskInfo))
submitWaitingStages()
}

Expand Down Expand Up @@ -843,6 +844,8 @@ class DAGScheduler(
}
}

stage.info = StageInfo.fromStage(stage, Some(tasks.size))

if (tasks.size > 0) {
// Preemptively serialize a task to make sure it can be serialized. We are catching this
// exception here because it would be fairly hard to catch the non-serializable exception
Expand Down Expand Up @@ -887,13 +890,14 @@ class DAGScheduler(
private[scheduler] def handleTaskCompletion(event: CompletionEvent) {
val task = event.task
val stageId = task.stageId
val stageInfo = stageIdToStage(task.stageId).info
val taskType = Utils.getFormattedClassName(task)

// The success case is dealt with separately below, since we need to compute accumulator
// updates before posting.
if (event.reason != Success) {
listenerBus.post(SparkListenerTaskEnd(stageId, taskType, event.reason, event.taskInfo,
event.taskMetrics))
listenerBus.post(SparkListenerTaskEnd(stageId, stageInfo.attemptId, taskType, event.reason,
event.taskInfo, event.taskMetrics))
}

if (!stageIdToStage.contains(task.stageId)) {
Expand Down Expand Up @@ -935,8 +939,8 @@ class DAGScheduler(
logError(s"Failed to update accumulators for $task", e)
}
}
listenerBus.post(SparkListenerTaskEnd(stageId, taskType, event.reason, event.taskInfo,
event.taskMetrics))
listenerBus.post(SparkListenerTaskEnd(stageId, stageInfo.attemptId, taskType, event.reason,
event.taskInfo, event.taskMetrics))
stage.pendingTasks -= task
task match {
case rt: ResultTask[_, _] =>
Expand Down Expand Up @@ -1029,6 +1033,7 @@ class DAGScheduler(
case FetchFailed(bmAddress, shuffleId, mapId, reduceId) =>
// Mark the stage that the reducer was in as unrunnable
val failedStage = stageIdToStage(task.stageId)
listenerBus.post(SparkListenerStageCompleted(failedStage.info))
runningStages -= failedStage
// TODO: Cancel running tasks in the stage
logInfo("Marking " + failedStage + " (" + failedStage.name +
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,14 +39,16 @@ case class SparkListenerStageSubmitted(stageInfo: StageInfo, properties: Propert
case class SparkListenerStageCompleted(stageInfo: StageInfo) extends SparkListenerEvent

@DeveloperApi
case class SparkListenerTaskStart(stageId: Int, taskInfo: TaskInfo) extends SparkListenerEvent
case class SparkListenerTaskStart(stageId: Int, stageAttemptId: Int, taskInfo: TaskInfo)
extends SparkListenerEvent

@DeveloperApi
case class SparkListenerTaskGettingResult(taskInfo: TaskInfo) extends SparkListenerEvent

@DeveloperApi
case class SparkListenerTaskEnd(
stageId: Int,
stageAttemptId: Int,
taskType: String,
reason: TaskEndReason,
taskInfo: TaskInfo,
Expand Down Expand Up @@ -75,10 +77,15 @@ case class SparkListenerBlockManagerRemoved(blockManagerId: BlockManagerId)
@DeveloperApi
case class SparkListenerUnpersistRDD(rddId: Int) extends SparkListenerEvent

/**
* Periodic updates from executors.
* @param execId executor id
* @param taskMetrics sequence of (task id, stage id, stage attempt, metrics)
*/
@DeveloperApi
case class SparkListenerExecutorMetricsUpdate(
execId: String,
taskMetrics: Seq[(Long, Int, TaskMetrics)])
taskMetrics: Seq[(Long, Int, Int, TaskMetrics)])
extends SparkListenerEvent

@DeveloperApi
Expand Down
1 change: 1 addition & 0 deletions core/src/main/scala/org/apache/spark/scheduler/Stage.scala
Original file line number Diff line number Diff line change
Expand Up @@ -116,6 +116,7 @@ private[spark] class Stage(
}
}

/** Return a new attempt id, starting with 0. */
def newAttemptId(): Int = {
val id = nextAttemptId
nextAttemptId += 1
Expand Down
11 changes: 9 additions & 2 deletions core/src/main/scala/org/apache/spark/scheduler/StageInfo.scala
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ import org.apache.spark.storage.RDDInfo
@DeveloperApi
class StageInfo(
val stageId: Int,
val attemptId: Int,
val name: String,
val numTasks: Int,
val rddInfos: Seq[RDDInfo],
Expand Down Expand Up @@ -56,9 +57,15 @@ private[spark] object StageInfo {
* shuffle dependencies. Therefore, all ancestor RDDs related to this Stage's RDD through a
* sequence of narrow dependencies should also be associated with this Stage.
*/
def fromStage(stage: Stage): StageInfo = {
def fromStage(stage: Stage, numTasks: Option[Int] = None): StageInfo = {
val ancestorRddInfos = stage.rdd.getNarrowAncestors.map(RDDInfo.fromRdd)
val rddInfos = Seq(RDDInfo.fromRdd(stage.rdd)) ++ ancestorRddInfos
new StageInfo(stage.id, stage.name, stage.numTasks, rddInfos, stage.details)
new StageInfo(
stage.id,
stage.attemptId,
stage.name,
numTasks.getOrElse(stage.numTasks),
rddInfos,
stage.details)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -333,12 +333,12 @@ private[spark] class TaskSchedulerImpl(
execId: String,
taskMetrics: Array[(Long, TaskMetrics)], // taskId -> TaskMetrics
blockManagerId: BlockManagerId): Boolean = {
val metricsWithStageIds = taskMetrics.flatMap {
case (id, metrics) => {

val metricsWithStageIds: Array[(Long, Int, Int, TaskMetrics)] = synchronized {
taskMetrics.flatMap { case (id, metrics) =>
taskIdToTaskSetId.get(id)
.flatMap(activeTaskSets.get)
.map(_.stageId)
.map(x => (id, x, metrics))
.map(taskSetMgr => (id, taskSetMgr.stageId, taskSetMgr.taskSet.attempt, metrics))
}
}
dagScheduler.executorHeartbeatReceived(execId, metricsWithStageIds, blockManagerId)
Expand Down
4 changes: 0 additions & 4 deletions core/src/main/scala/org/apache/spark/scheduler/TaskSet.scala
Original file line number Diff line number Diff line change
Expand Up @@ -31,9 +31,5 @@ private[spark] class TaskSet(
val properties: Properties) {
val id: String = stageId + "." + attempt

def kill(interruptThread: Boolean) {
tasks.foreach(_.kill(interruptThread))
}

override def toString: String = "TaskSet " + id
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,8 @@ import org.apache.spark.ui.{ToolTips, UIUtils}
import org.apache.spark.ui.jobs.UIData.StageUIData
import org.apache.spark.util.Utils

/** Page showing executor summary */
private[ui] class ExecutorTable(stageId: Int, parent: JobProgressTab) {
/** Stage summary grouped by executors. */
private[ui] class ExecutorTable(stageId: Int, stageAttemptId: Int, parent: JobProgressTab) {
private val listener = parent.listener

def toNodeSeq: Seq[Node] = {
Expand Down Expand Up @@ -65,7 +65,7 @@ private[ui] class ExecutorTable(stageId: Int, parent: JobProgressTab) {
executorIdToAddress.put(executorId, address)
}

listener.stageIdToData.get(stageId) match {
listener.stageIdToData.get((stageId, stageAttemptId)) match {
case Some(stageData: StageUIData) =>
stageData.executorSummary.toSeq.sortBy(_._1).map { case (k, v) =>
<tr>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,13 +43,16 @@ class JobProgressListener(conf: SparkConf) extends SparkListener with Logging {
// How many stages to remember
val retainedStages = conf.getInt("spark.ui.retainedStages", DEFAULT_RETAINED_STAGES)

val activeStages = HashMap[Int, StageInfo]()
// Map from stageId to StageInfo
val activeStages = new HashMap[Int, StageInfo]

// Map from (stageId, attemptId) to StageInfo
val stageIdToData = new HashMap[(Int, Int), StageUIData]

val completedStages = ListBuffer[StageInfo]()
val failedStages = ListBuffer[StageInfo]()

val stageIdToData = new HashMap[Int, StageUIData]

val poolToActiveStages = HashMap[String, HashMap[Int, StageInfo]]()
val poolToActiveStages = HashMap[String, HashMap[(Int, Int), StageInfo]]()

val executorIdToBlockManagerId = HashMap[String, BlockManagerId]()

Expand All @@ -59,18 +62,19 @@ class JobProgressListener(conf: SparkConf) extends SparkListener with Logging {

override def onStageCompleted(stageCompleted: SparkListenerStageCompleted) = synchronized {
val stage = stageCompleted.stageInfo
val stageId = stage.stageId
val stageData = stageIdToData.getOrElseUpdate(stageId, {
logWarning("Stage completed for unknown stage " + stageId)
val stageData = stageIdToData.getOrElseUpdate((stage.stageId, stage.attemptId), {
logWarning("Stage completed for unknown stage " + stage.stageId)
new StageUIData
})

for ((id, info) <- stageCompleted.stageInfo.accumulables) {
stageData.accumulables(id) = info
}

poolToActiveStages.get(stageData.schedulingPool).foreach(_.remove(stageId))
activeStages.remove(stageId)
poolToActiveStages.get(stageData.schedulingPool).foreach { hashMap =>
hashMap.remove((stage.stageId, stage.attemptId))
}
activeStages.remove(stage.stageId)
if (stage.failureReason.isEmpty) {
completedStages += stage
trimIfNecessary(completedStages)
Expand All @@ -84,7 +88,7 @@ class JobProgressListener(conf: SparkConf) extends SparkListener with Logging {
private def trimIfNecessary(stages: ListBuffer[StageInfo]) = synchronized {
if (stages.size > retainedStages) {
val toRemove = math.max(retainedStages / 10, 1)
stages.take(toRemove).foreach { s => stageIdToData.remove(s.stageId) }
stages.take(toRemove).foreach { s => stageIdToData.remove((s.stageId, s.attemptId)) }
stages.trimStart(toRemove)
}
}
Expand All @@ -98,21 +102,21 @@ class JobProgressListener(conf: SparkConf) extends SparkListener with Logging {
p => p.getProperty("spark.scheduler.pool", DEFAULT_POOL_NAME)
}.getOrElse(DEFAULT_POOL_NAME)

val stageData = stageIdToData.getOrElseUpdate(stage.stageId, new StageUIData)
val stageData = stageIdToData.getOrElseUpdate((stage.stageId, stage.attemptId), new StageUIData)
stageData.schedulingPool = poolName

stageData.description = Option(stageSubmitted.properties).flatMap {
p => Option(p.getProperty(SparkContext.SPARK_JOB_DESCRIPTION))
}

val stages = poolToActiveStages.getOrElseUpdate(poolName, new HashMap[Int, StageInfo]())
stages(stage.stageId) = stage
val stages = poolToActiveStages.getOrElseUpdate(poolName, new HashMap[(Int, Int), StageInfo]())
stages((stage.stageId, stage.attemptId)) = stage
}

override def onTaskStart(taskStart: SparkListenerTaskStart) = synchronized {
val taskInfo = taskStart.taskInfo
if (taskInfo != null) {
val stageData = stageIdToData.getOrElseUpdate(taskStart.stageId, {
val stageData = stageIdToData.getOrElseUpdate((taskStart.stageId, taskStart.stageAttemptId), {
logWarning("Task start for unknown stage " + taskStart.stageId)
new StageUIData
})
Expand All @@ -129,7 +133,7 @@ class JobProgressListener(conf: SparkConf) extends SparkListener with Logging {
override def onTaskEnd(taskEnd: SparkListenerTaskEnd) = synchronized {
val info = taskEnd.taskInfo
if (info != null) {
val stageData = stageIdToData.getOrElseUpdate(taskEnd.stageId, {
val stageData = stageIdToData.getOrElseUpdate((taskEnd.stageId, taskEnd.stageAttemptId), {
logWarning("Task end for unknown stage " + taskEnd.stageId)
new StageUIData
})
Expand Down Expand Up @@ -222,8 +226,8 @@ class JobProgressListener(conf: SparkConf) extends SparkListener with Logging {
}

override def onExecutorMetricsUpdate(executorMetricsUpdate: SparkListenerExecutorMetricsUpdate) {
for ((taskId, sid, taskMetrics) <- executorMetricsUpdate.taskMetrics) {
val stageData = stageIdToData.getOrElseUpdate(sid, {
for ((taskId, sid, sAttempt, taskMetrics) <- executorMetricsUpdate.taskMetrics) {
val stageData = stageIdToData.getOrElseUpdate((sid, sAttempt), {
logWarning("Metrics update for task in unknown stage " + sid)
new StageUIData
})
Expand Down
4 changes: 2 additions & 2 deletions core/src/main/scala/org/apache/spark/ui/jobs/PoolTable.scala
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ private[ui] class PoolTable(pools: Seq[Schedulable], parent: JobProgressTab) {
}

private def poolTable(
makeRow: (Schedulable, HashMap[String, HashMap[Int, StageInfo]]) => Seq[Node],
makeRow: (Schedulable, HashMap[String, HashMap[(Int, Int), StageInfo]]) => Seq[Node],
rows: Seq[Schedulable]): Seq[Node] = {
<table class="table table-bordered table-striped table-condensed sortable table-fixed">
<thead>
Expand All @@ -53,7 +53,7 @@ private[ui] class PoolTable(pools: Seq[Schedulable], parent: JobProgressTab) {

private def poolRow(
p: Schedulable,
poolToActiveStages: HashMap[String, HashMap[Int, StageInfo]]): Seq[Node] = {
poolToActiveStages: HashMap[String, HashMap[(Int, Int), StageInfo]]): Seq[Node] = {
val activeStages = poolToActiveStages.get(p.name) match {
case Some(stages) => stages.size
case None => 0
Expand Down
8 changes: 5 additions & 3 deletions core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,8 @@ private[ui] class StagePage(parent: JobProgressTab) extends WebUIPage("stage") {
def render(request: HttpServletRequest): Seq[Node] = {
listener.synchronized {
val stageId = request.getParameter("id").toInt
val stageDataOption = listener.stageIdToData.get(stageId)
val stageAttemptId = request.getParameter("attempt").toInt
val stageDataOption = listener.stageIdToData.get((stageId, stageAttemptId))

if (stageDataOption.isEmpty || stageDataOption.get.taskData.isEmpty) {
val content =
Expand All @@ -49,7 +50,7 @@ private[ui] class StagePage(parent: JobProgressTab) extends WebUIPage("stage") {
val tasks = stageData.taskData.values.toSeq.sortBy(_.taskInfo.launchTime)

val numCompleted = tasks.count(_.taskInfo.finished)
val accumulables = listener.stageIdToData(stageId).accumulables
val accumulables = listener.stageIdToData((stageId, stageAttemptId)).accumulables
val hasInput = stageData.inputBytes > 0
val hasShuffleRead = stageData.shuffleReadBytes > 0
val hasShuffleWrite = stageData.shuffleWriteBytes > 0
Expand Down Expand Up @@ -211,7 +212,8 @@ private[ui] class StagePage(parent: JobProgressTab) extends WebUIPage("stage") {
def quantileRow(data: Seq[Node]): Seq[Node] = <tr>{data}</tr>
Some(UIUtils.listingTable(quantileHeaders, quantileRow, listings, fixedWidth = true))
}
val executorTable = new ExecutorTable(stageId, parent)

val executorTable = new ExecutorTable(stageId, stageAttemptId, parent)

val maybeAccumulableTable: Seq[Node] =
if (accumulables.size > 0) { <h4>Accumulators</h4> ++ accumulableTable } else Seq()
Expand Down
14 changes: 9 additions & 5 deletions core/src/main/scala/org/apache/spark/ui/jobs/StageTable.scala
Original file line number Diff line number Diff line change
Expand Up @@ -97,8 +97,8 @@ private[ui] class StageTableBase(
}
// scalastyle:on

val nameLinkUri ="%s/stages/stage?id=%s"
.format(UIUtils.prependBaseUri(parent.basePath), s.stageId)
val nameLinkUri ="%s/stages/stage?id=%s&attempt=%s"
.format(UIUtils.prependBaseUri(parent.basePath), s.stageId, s.attemptId)
val nameLink = <a href={nameLinkUri}>{s.name}</a>

val cachedRddInfos = s.rddInfos.filter(_.numCachedPartitions > 0)
Expand All @@ -121,7 +121,7 @@ private[ui] class StageTableBase(
}

val stageDesc = for {
stageData <- listener.stageIdToData.get(s.stageId)
stageData <- listener.stageIdToData.get((s.stageId, s.attemptId))
desc <- stageData.description
} yield {
<div><em>{desc}</em></div>
Expand All @@ -131,7 +131,7 @@ private[ui] class StageTableBase(
}

protected def stageRow(s: StageInfo): Seq[Node] = {
val stageDataOption = listener.stageIdToData.get(s.stageId)
val stageDataOption = listener.stageIdToData.get((s.stageId, s.attemptId))
if (stageDataOption.isEmpty) {
return <td>{s.stageId}</td><td>No data available for this stage</td>
}
Expand All @@ -154,7 +154,11 @@ private[ui] class StageTableBase(
val shuffleWrite = stageData.shuffleWriteBytes
val shuffleWriteWithUnit = if (shuffleWrite > 0) Utils.bytesToString(shuffleWrite) else ""

<td>{s.stageId}</td> ++
{if (s.attemptId > 0) {
<td>{s.stageId} (attempt {s.attemptId})</td>
} else {
<td>{s.stageId}</td>
}} ++
{if (isFairScheduler) {
<td>
<a href={"%s/stages/pool?poolname=%s"
Expand Down
Loading

0 comments on commit 4e5faa2

Please sign in to comment.