From 4e5faa2d619ec03d24b268d6db0d96cbb07e1730 Mon Sep 17 00:00:00 2001 From: Reynold Xin Date: Wed, 23 Jul 2014 02:09:58 -0700 Subject: [PATCH 1/7] [SPARK-2298] Encode stage attempt in SparkListener & UI. --- .../apache/spark/scheduler/DAGScheduler.scala | 17 +++-- .../spark/scheduler/SparkListener.scala | 11 ++- .../org/apache/spark/scheduler/Stage.scala | 1 + .../apache/spark/scheduler/StageInfo.scala | 11 ++- .../spark/scheduler/TaskSchedulerImpl.scala | 8 +-- .../org/apache/spark/scheduler/TaskSet.scala | 4 -- .../apache/spark/ui/jobs/ExecutorTable.scala | 6 +- .../spark/ui/jobs/JobProgressListener.scala | 38 ++++++----- .../org/apache/spark/ui/jobs/PoolTable.scala | 4 +- .../org/apache/spark/ui/jobs/StagePage.scala | 8 ++- .../org/apache/spark/ui/jobs/StageTable.scala | 14 ++-- .../org/apache/spark/util/JsonProtocol.scala | 12 +++- .../storage/StorageStatusListenerSuite.scala | 17 ++--- .../ui/jobs/JobProgressListenerSuite.scala | 68 ++++++++++--------- .../spark/ui/storage/StorageTabSuite.scala | 16 ++--- .../apache/spark/util/JsonProtocolSuite.scala | 8 +-- 16 files changed, 141 insertions(+), 102 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala index b86cfbfa48fbe..f2652de086709 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala @@ -164,7 +164,7 @@ class DAGScheduler( */ def executorHeartbeatReceived( execId: String, - taskMetrics: Array[(Long, Int, TaskMetrics)], // (taskId, stageId, metrics) + taskMetrics: Array[(Long, Int, Int, TaskMetrics)], // (taskId, stageId, stateAttempt, metrics) blockManagerId: BlockManagerId): Boolean = { listenerBus.post(SparkListenerExecutorMetricsUpdate(execId, taskMetrics)) implicit val timeout = Timeout(600 seconds) @@ -677,7 +677,8 @@ class DAGScheduler( } private[scheduler] def handleBeginEvent(task: Task[_], taskInfo: TaskInfo) { - listenerBus.post(SparkListenerTaskStart(task.stageId, taskInfo)) + val stageInfo = stageIdToStage(task.stageId).info + listenerBus.post(SparkListenerTaskStart(task.stageId, stageInfo.attemptId, taskInfo)) submitWaitingStages() } @@ -843,6 +844,8 @@ class DAGScheduler( } } + stage.info = StageInfo.fromStage(stage, Some(tasks.size)) + if (tasks.size > 0) { // Preemptively serialize a task to make sure it can be serialized. We are catching this // exception here because it would be fairly hard to catch the non-serializable exception @@ -887,13 +890,14 @@ class DAGScheduler( private[scheduler] def handleTaskCompletion(event: CompletionEvent) { val task = event.task val stageId = task.stageId + val stageInfo = stageIdToStage(task.stageId).info val taskType = Utils.getFormattedClassName(task) // The success case is dealt with separately below, since we need to compute accumulator // updates before posting. if (event.reason != Success) { - listenerBus.post(SparkListenerTaskEnd(stageId, taskType, event.reason, event.taskInfo, - event.taskMetrics)) + listenerBus.post(SparkListenerTaskEnd(stageId, stageInfo.attemptId, taskType, event.reason, + event.taskInfo, event.taskMetrics)) } if (!stageIdToStage.contains(task.stageId)) { @@ -935,8 +939,8 @@ class DAGScheduler( logError(s"Failed to update accumulators for $task", e) } } - listenerBus.post(SparkListenerTaskEnd(stageId, taskType, event.reason, event.taskInfo, - event.taskMetrics)) + listenerBus.post(SparkListenerTaskEnd(stageId, stageInfo.attemptId, taskType, event.reason, + event.taskInfo, event.taskMetrics)) stage.pendingTasks -= task task match { case rt: ResultTask[_, _] => @@ -1029,6 +1033,7 @@ class DAGScheduler( case FetchFailed(bmAddress, shuffleId, mapId, reduceId) => // Mark the stage that the reducer was in as unrunnable val failedStage = stageIdToStage(task.stageId) + listenerBus.post(SparkListenerStageCompleted(failedStage.info)) runningStages -= failedStage // TODO: Cancel running tasks in the stage logInfo("Marking " + failedStage + " (" + failedStage.name + diff --git a/core/src/main/scala/org/apache/spark/scheduler/SparkListener.scala b/core/src/main/scala/org/apache/spark/scheduler/SparkListener.scala index d01d318633877..86ca8445a1124 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/SparkListener.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/SparkListener.scala @@ -39,7 +39,8 @@ case class SparkListenerStageSubmitted(stageInfo: StageInfo, properties: Propert case class SparkListenerStageCompleted(stageInfo: StageInfo) extends SparkListenerEvent @DeveloperApi -case class SparkListenerTaskStart(stageId: Int, taskInfo: TaskInfo) extends SparkListenerEvent +case class SparkListenerTaskStart(stageId: Int, stageAttemptId: Int, taskInfo: TaskInfo) + extends SparkListenerEvent @DeveloperApi case class SparkListenerTaskGettingResult(taskInfo: TaskInfo) extends SparkListenerEvent @@ -47,6 +48,7 @@ case class SparkListenerTaskGettingResult(taskInfo: TaskInfo) extends SparkListe @DeveloperApi case class SparkListenerTaskEnd( stageId: Int, + stageAttemptId: Int, taskType: String, reason: TaskEndReason, taskInfo: TaskInfo, @@ -75,10 +77,15 @@ case class SparkListenerBlockManagerRemoved(blockManagerId: BlockManagerId) @DeveloperApi case class SparkListenerUnpersistRDD(rddId: Int) extends SparkListenerEvent +/** + * Periodic updates from executors. + * @param execId executor id + * @param taskMetrics sequence of (task id, stage id, stage attempt, metrics) + */ @DeveloperApi case class SparkListenerExecutorMetricsUpdate( execId: String, - taskMetrics: Seq[(Long, Int, TaskMetrics)]) + taskMetrics: Seq[(Long, Int, Int, TaskMetrics)]) extends SparkListenerEvent @DeveloperApi diff --git a/core/src/main/scala/org/apache/spark/scheduler/Stage.scala b/core/src/main/scala/org/apache/spark/scheduler/Stage.scala index 800905413d145..1636da24fa78a 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/Stage.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/Stage.scala @@ -116,6 +116,7 @@ private[spark] class Stage( } } + /** Return a new attempt id, starting with 0. */ def newAttemptId(): Int = { val id = nextAttemptId nextAttemptId += 1 diff --git a/core/src/main/scala/org/apache/spark/scheduler/StageInfo.scala b/core/src/main/scala/org/apache/spark/scheduler/StageInfo.scala index 2a407e47a05bd..c6dc3369ba5cc 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/StageInfo.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/StageInfo.scala @@ -29,6 +29,7 @@ import org.apache.spark.storage.RDDInfo @DeveloperApi class StageInfo( val stageId: Int, + val attemptId: Int, val name: String, val numTasks: Int, val rddInfos: Seq[RDDInfo], @@ -56,9 +57,15 @@ private[spark] object StageInfo { * shuffle dependencies. Therefore, all ancestor RDDs related to this Stage's RDD through a * sequence of narrow dependencies should also be associated with this Stage. */ - def fromStage(stage: Stage): StageInfo = { + def fromStage(stage: Stage, numTasks: Option[Int] = None): StageInfo = { val ancestorRddInfos = stage.rdd.getNarrowAncestors.map(RDDInfo.fromRdd) val rddInfos = Seq(RDDInfo.fromRdd(stage.rdd)) ++ ancestorRddInfos - new StageInfo(stage.id, stage.name, stage.numTasks, rddInfos, stage.details) + new StageInfo( + stage.id, + stage.attemptId, + stage.name, + numTasks.getOrElse(stage.numTasks), + rddInfos, + stage.details) } } diff --git a/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala b/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala index 6c0d1b2752a81..ad051e59af86d 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala @@ -333,12 +333,12 @@ private[spark] class TaskSchedulerImpl( execId: String, taskMetrics: Array[(Long, TaskMetrics)], // taskId -> TaskMetrics blockManagerId: BlockManagerId): Boolean = { - val metricsWithStageIds = taskMetrics.flatMap { - case (id, metrics) => { + + val metricsWithStageIds: Array[(Long, Int, Int, TaskMetrics)] = synchronized { + taskMetrics.flatMap { case (id, metrics) => taskIdToTaskSetId.get(id) .flatMap(activeTaskSets.get) - .map(_.stageId) - .map(x => (id, x, metrics)) + .map(taskSetMgr => (id, taskSetMgr.stageId, taskSetMgr.taskSet.attempt, metrics)) } } dagScheduler.executorHeartbeatReceived(execId, metricsWithStageIds, blockManagerId) diff --git a/core/src/main/scala/org/apache/spark/scheduler/TaskSet.scala b/core/src/main/scala/org/apache/spark/scheduler/TaskSet.scala index 613fa7850bb25..c3ad325156f53 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/TaskSet.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/TaskSet.scala @@ -31,9 +31,5 @@ private[spark] class TaskSet( val properties: Properties) { val id: String = stageId + "." + attempt - def kill(interruptThread: Boolean) { - tasks.foreach(_.kill(interruptThread)) - } - override def toString: String = "TaskSet " + id } diff --git a/core/src/main/scala/org/apache/spark/ui/jobs/ExecutorTable.scala b/core/src/main/scala/org/apache/spark/ui/jobs/ExecutorTable.scala index 0cc51c873727d..2987dc04494a5 100644 --- a/core/src/main/scala/org/apache/spark/ui/jobs/ExecutorTable.scala +++ b/core/src/main/scala/org/apache/spark/ui/jobs/ExecutorTable.scala @@ -24,8 +24,8 @@ import org.apache.spark.ui.{ToolTips, UIUtils} import org.apache.spark.ui.jobs.UIData.StageUIData import org.apache.spark.util.Utils -/** Page showing executor summary */ -private[ui] class ExecutorTable(stageId: Int, parent: JobProgressTab) { +/** Stage summary grouped by executors. */ +private[ui] class ExecutorTable(stageId: Int, stageAttemptId: Int, parent: JobProgressTab) { private val listener = parent.listener def toNodeSeq: Seq[Node] = { @@ -65,7 +65,7 @@ private[ui] class ExecutorTable(stageId: Int, parent: JobProgressTab) { executorIdToAddress.put(executorId, address) } - listener.stageIdToData.get(stageId) match { + listener.stageIdToData.get((stageId, stageAttemptId)) match { case Some(stageData: StageUIData) => stageData.executorSummary.toSeq.sortBy(_._1).map { case (k, v) => diff --git a/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressListener.scala b/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressListener.scala index 74cd637d88155..6b72a27e81aff 100644 --- a/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressListener.scala +++ b/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressListener.scala @@ -43,13 +43,16 @@ class JobProgressListener(conf: SparkConf) extends SparkListener with Logging { // How many stages to remember val retainedStages = conf.getInt("spark.ui.retainedStages", DEFAULT_RETAINED_STAGES) - val activeStages = HashMap[Int, StageInfo]() + // Map from stageId to StageInfo + val activeStages = new HashMap[Int, StageInfo] + + // Map from (stageId, attemptId) to StageInfo + val stageIdToData = new HashMap[(Int, Int), StageUIData] + val completedStages = ListBuffer[StageInfo]() val failedStages = ListBuffer[StageInfo]() - val stageIdToData = new HashMap[Int, StageUIData] - - val poolToActiveStages = HashMap[String, HashMap[Int, StageInfo]]() + val poolToActiveStages = HashMap[String, HashMap[(Int, Int), StageInfo]]() val executorIdToBlockManagerId = HashMap[String, BlockManagerId]() @@ -59,9 +62,8 @@ class JobProgressListener(conf: SparkConf) extends SparkListener with Logging { override def onStageCompleted(stageCompleted: SparkListenerStageCompleted) = synchronized { val stage = stageCompleted.stageInfo - val stageId = stage.stageId - val stageData = stageIdToData.getOrElseUpdate(stageId, { - logWarning("Stage completed for unknown stage " + stageId) + val stageData = stageIdToData.getOrElseUpdate((stage.stageId, stage.attemptId), { + logWarning("Stage completed for unknown stage " + stage.stageId) new StageUIData }) @@ -69,8 +71,10 @@ class JobProgressListener(conf: SparkConf) extends SparkListener with Logging { stageData.accumulables(id) = info } - poolToActiveStages.get(stageData.schedulingPool).foreach(_.remove(stageId)) - activeStages.remove(stageId) + poolToActiveStages.get(stageData.schedulingPool).foreach { hashMap => + hashMap.remove((stage.stageId, stage.attemptId)) + } + activeStages.remove(stage.stageId) if (stage.failureReason.isEmpty) { completedStages += stage trimIfNecessary(completedStages) @@ -84,7 +88,7 @@ class JobProgressListener(conf: SparkConf) extends SparkListener with Logging { private def trimIfNecessary(stages: ListBuffer[StageInfo]) = synchronized { if (stages.size > retainedStages) { val toRemove = math.max(retainedStages / 10, 1) - stages.take(toRemove).foreach { s => stageIdToData.remove(s.stageId) } + stages.take(toRemove).foreach { s => stageIdToData.remove((s.stageId, s.attemptId)) } stages.trimStart(toRemove) } } @@ -98,21 +102,21 @@ class JobProgressListener(conf: SparkConf) extends SparkListener with Logging { p => p.getProperty("spark.scheduler.pool", DEFAULT_POOL_NAME) }.getOrElse(DEFAULT_POOL_NAME) - val stageData = stageIdToData.getOrElseUpdate(stage.stageId, new StageUIData) + val stageData = stageIdToData.getOrElseUpdate((stage.stageId, stage.attemptId), new StageUIData) stageData.schedulingPool = poolName stageData.description = Option(stageSubmitted.properties).flatMap { p => Option(p.getProperty(SparkContext.SPARK_JOB_DESCRIPTION)) } - val stages = poolToActiveStages.getOrElseUpdate(poolName, new HashMap[Int, StageInfo]()) - stages(stage.stageId) = stage + val stages = poolToActiveStages.getOrElseUpdate(poolName, new HashMap[(Int, Int), StageInfo]()) + stages((stage.stageId, stage.attemptId)) = stage } override def onTaskStart(taskStart: SparkListenerTaskStart) = synchronized { val taskInfo = taskStart.taskInfo if (taskInfo != null) { - val stageData = stageIdToData.getOrElseUpdate(taskStart.stageId, { + val stageData = stageIdToData.getOrElseUpdate((taskStart.stageId, taskStart.stageAttemptId), { logWarning("Task start for unknown stage " + taskStart.stageId) new StageUIData }) @@ -129,7 +133,7 @@ class JobProgressListener(conf: SparkConf) extends SparkListener with Logging { override def onTaskEnd(taskEnd: SparkListenerTaskEnd) = synchronized { val info = taskEnd.taskInfo if (info != null) { - val stageData = stageIdToData.getOrElseUpdate(taskEnd.stageId, { + val stageData = stageIdToData.getOrElseUpdate((taskEnd.stageId, taskEnd.stageAttemptId), { logWarning("Task end for unknown stage " + taskEnd.stageId) new StageUIData }) @@ -222,8 +226,8 @@ class JobProgressListener(conf: SparkConf) extends SparkListener with Logging { } override def onExecutorMetricsUpdate(executorMetricsUpdate: SparkListenerExecutorMetricsUpdate) { - for ((taskId, sid, taskMetrics) <- executorMetricsUpdate.taskMetrics) { - val stageData = stageIdToData.getOrElseUpdate(sid, { + for ((taskId, sid, sAttempt, taskMetrics) <- executorMetricsUpdate.taskMetrics) { + val stageData = stageIdToData.getOrElseUpdate((sid, sAttempt), { logWarning("Metrics update for task in unknown stage " + sid) new StageUIData }) diff --git a/core/src/main/scala/org/apache/spark/ui/jobs/PoolTable.scala b/core/src/main/scala/org/apache/spark/ui/jobs/PoolTable.scala index 64178e1e33d41..0421f2d1322d8 100644 --- a/core/src/main/scala/org/apache/spark/ui/jobs/PoolTable.scala +++ b/core/src/main/scala/org/apache/spark/ui/jobs/PoolTable.scala @@ -34,7 +34,7 @@ private[ui] class PoolTable(pools: Seq[Schedulable], parent: JobProgressTab) { } private def poolTable( - makeRow: (Schedulable, HashMap[String, HashMap[Int, StageInfo]]) => Seq[Node], + makeRow: (Schedulable, HashMap[String, HashMap[(Int, Int), StageInfo]]) => Seq[Node], rows: Seq[Schedulable]): Seq[Node] = { @@ -53,7 +53,7 @@ private[ui] class PoolTable(pools: Seq[Schedulable], parent: JobProgressTab) { private def poolRow( p: Schedulable, - poolToActiveStages: HashMap[String, HashMap[Int, StageInfo]]): Seq[Node] = { + poolToActiveStages: HashMap[String, HashMap[(Int, Int), StageInfo]]): Seq[Node] = { val activeStages = poolToActiveStages.get(p.name) match { case Some(stages) => stages.size case None => 0 diff --git a/core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala b/core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala index d4eb02722ad12..8448fb40899c4 100644 --- a/core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala +++ b/core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala @@ -34,7 +34,8 @@ private[ui] class StagePage(parent: JobProgressTab) extends WebUIPage("stage") { def render(request: HttpServletRequest): Seq[Node] = { listener.synchronized { val stageId = request.getParameter("id").toInt - val stageDataOption = listener.stageIdToData.get(stageId) + val stageAttemptId = request.getParameter("attempt").toInt + val stageDataOption = listener.stageIdToData.get((stageId, stageAttemptId)) if (stageDataOption.isEmpty || stageDataOption.get.taskData.isEmpty) { val content = @@ -49,7 +50,7 @@ private[ui] class StagePage(parent: JobProgressTab) extends WebUIPage("stage") { val tasks = stageData.taskData.values.toSeq.sortBy(_.taskInfo.launchTime) val numCompleted = tasks.count(_.taskInfo.finished) - val accumulables = listener.stageIdToData(stageId).accumulables + val accumulables = listener.stageIdToData((stageId, stageAttemptId)).accumulables val hasInput = stageData.inputBytes > 0 val hasShuffleRead = stageData.shuffleReadBytes > 0 val hasShuffleWrite = stageData.shuffleWriteBytes > 0 @@ -211,7 +212,8 @@ private[ui] class StagePage(parent: JobProgressTab) extends WebUIPage("stage") { def quantileRow(data: Seq[Node]): Seq[Node] = {data} Some(UIUtils.listingTable(quantileHeaders, quantileRow, listings, fixedWidth = true)) } - val executorTable = new ExecutorTable(stageId, parent) + + val executorTable = new ExecutorTable(stageId, stageAttemptId, parent) val maybeAccumulableTable: Seq[Node] = if (accumulables.size > 0) {

Accumulators

++ accumulableTable } else Seq() diff --git a/core/src/main/scala/org/apache/spark/ui/jobs/StageTable.scala b/core/src/main/scala/org/apache/spark/ui/jobs/StageTable.scala index 16ad0df45aa0d..acc48345d5d7f 100644 --- a/core/src/main/scala/org/apache/spark/ui/jobs/StageTable.scala +++ b/core/src/main/scala/org/apache/spark/ui/jobs/StageTable.scala @@ -97,8 +97,8 @@ private[ui] class StageTableBase( } // scalastyle:on - val nameLinkUri ="%s/stages/stage?id=%s" - .format(UIUtils.prependBaseUri(parent.basePath), s.stageId) + val nameLinkUri ="%s/stages/stage?id=%s&attempt=%s" + .format(UIUtils.prependBaseUri(parent.basePath), s.stageId, s.attemptId) val nameLink = {s.name} val cachedRddInfos = s.rddInfos.filter(_.numCachedPartitions > 0) @@ -121,7 +121,7 @@ private[ui] class StageTableBase( } val stageDesc = for { - stageData <- listener.stageIdToData.get(s.stageId) + stageData <- listener.stageIdToData.get((s.stageId, s.attemptId)) desc <- stageData.description } yield {
{desc}
@@ -131,7 +131,7 @@ private[ui] class StageTableBase( } protected def stageRow(s: StageInfo): Seq[Node] = { - val stageDataOption = listener.stageIdToData.get(s.stageId) + val stageDataOption = listener.stageIdToData.get((s.stageId, s.attemptId)) if (stageDataOption.isEmpty) { return } @@ -154,7 +154,11 @@ private[ui] class StageTableBase( val shuffleWrite = stageData.shuffleWriteBytes val shuffleWriteWithUnit = if (shuffleWrite > 0) Utils.bytesToString(shuffleWrite) else "" - ++ + {if (s.attemptId > 0) { + + } else { + + }} ++ {if (isFairScheduler) {
{s.stageId}No data available for this stage{s.stageId}{s.stageId} (attempt {s.attemptId}){s.stageId} Utils.getFormattedClassName(taskStart)) ~ ("Stage ID" -> taskStart.stageId) ~ + ("Stage Attempt ID" -> taskStart.stageAttemptId) ~ ("Task Info" -> taskInfoToJson(taskInfo)) } @@ -112,6 +113,7 @@ private[spark] object JsonProtocol { val taskMetricsJson = if (taskMetrics != null) taskMetricsToJson(taskMetrics) else JNothing ("Event" -> Utils.getFormattedClassName(taskEnd)) ~ ("Stage ID" -> taskEnd.stageId) ~ + ("Stage Attempt ID" -> taskEnd.stageAttemptId) ~ ("Task Type" -> taskEnd.taskType) ~ ("Task End Reason" -> taskEndReason) ~ ("Task Info" -> taskInfoToJson(taskInfo)) ~ @@ -187,6 +189,7 @@ private[spark] object JsonProtocol { val completionTime = stageInfo.completionTime.map(JInt(_)).getOrElse(JNothing) val failureReason = stageInfo.failureReason.map(JString(_)).getOrElse(JNothing) ("Stage ID" -> stageInfo.stageId) ~ + ("Attempt ID" -> stageInfo.attemptId) ~ ("Stage Name" -> stageInfo.name) ~ ("Number of Tasks" -> stageInfo.numTasks) ~ ("RDD Info" -> rddInfo) ~ @@ -419,8 +422,9 @@ private[spark] object JsonProtocol { def taskStartFromJson(json: JValue): SparkListenerTaskStart = { val stageId = (json \ "Stage ID").extract[Int] + val stageAttemptId = (json \ "Stage Attempt ID").extractOpt[Int].getOrElse(0) val taskInfo = taskInfoFromJson(json \ "Task Info") - SparkListenerTaskStart(stageId, taskInfo) + SparkListenerTaskStart(stageId, stageAttemptId, taskInfo) } def taskGettingResultFromJson(json: JValue): SparkListenerTaskGettingResult = { @@ -430,11 +434,12 @@ private[spark] object JsonProtocol { def taskEndFromJson(json: JValue): SparkListenerTaskEnd = { val stageId = (json \ "Stage ID").extract[Int] + val stageAttemptId = (json \ "Stage Attempt ID").extractOpt[Int].getOrElse(0) val taskType = (json \ "Task Type").extract[String] val taskEndReason = taskEndReasonFromJson(json \ "Task End Reason") val taskInfo = taskInfoFromJson(json \ "Task Info") val taskMetrics = taskMetricsFromJson(json \ "Task Metrics") - SparkListenerTaskEnd(stageId, taskType, taskEndReason, taskInfo, taskMetrics) + SparkListenerTaskEnd(stageId, stageAttemptId, taskType, taskEndReason, taskInfo, taskMetrics) } def jobStartFromJson(json: JValue): SparkListenerJobStart = { @@ -492,6 +497,7 @@ private[spark] object JsonProtocol { def stageInfoFromJson(json: JValue): StageInfo = { val stageId = (json \ "Stage ID").extract[Int] + val attemptId = (json \ "Attempt ID").extractOpt[Int].getOrElse(0) val stageName = (json \ "Stage Name").extract[String] val numTasks = (json \ "Number of Tasks").extract[Int] val rddInfos = (json \ "RDD Info").extract[List[JValue]].map(rddInfoFromJson(_)) @@ -504,7 +510,7 @@ private[spark] object JsonProtocol { case None => Seq[AccumulableInfo]() } - val stageInfo = new StageInfo(stageId, stageName, numTasks, rddInfos, details) + val stageInfo = new StageInfo(stageId, attemptId, stageName, numTasks, rddInfos, details) stageInfo.submissionTime = submissionTime stageInfo.completionTime = completionTime stageInfo.failureReason = failureReason diff --git a/core/src/test/scala/org/apache/spark/storage/StorageStatusListenerSuite.scala b/core/src/test/scala/org/apache/spark/storage/StorageStatusListenerSuite.scala index 51fb646a3cb61..7671cb969a26b 100644 --- a/core/src/test/scala/org/apache/spark/storage/StorageStatusListenerSuite.scala +++ b/core/src/test/scala/org/apache/spark/storage/StorageStatusListenerSuite.scala @@ -69,10 +69,10 @@ class StorageStatusListenerSuite extends FunSuite { // Task end with no updated blocks assert(listener.executorIdToStorageStatus("big").numBlocks === 0) assert(listener.executorIdToStorageStatus("fat").numBlocks === 0) - listener.onTaskEnd(SparkListenerTaskEnd(1, "obliteration", Success, taskInfo1, taskMetrics)) + listener.onTaskEnd(SparkListenerTaskEnd(1, 0, "obliteration", Success, taskInfo1, taskMetrics)) assert(listener.executorIdToStorageStatus("big").numBlocks === 0) assert(listener.executorIdToStorageStatus("fat").numBlocks === 0) - listener.onTaskEnd(SparkListenerTaskEnd(1, "obliteration", Success, taskInfo2, taskMetrics)) + listener.onTaskEnd(SparkListenerTaskEnd(1, 0, "obliteration", Success, taskInfo2, taskMetrics)) assert(listener.executorIdToStorageStatus("big").numBlocks === 0) assert(listener.executorIdToStorageStatus("fat").numBlocks === 0) } @@ -92,13 +92,13 @@ class StorageStatusListenerSuite extends FunSuite { // Task end with new blocks assert(listener.executorIdToStorageStatus("big").numBlocks === 0) assert(listener.executorIdToStorageStatus("fat").numBlocks === 0) - listener.onTaskEnd(SparkListenerTaskEnd(1, "obliteration", Success, taskInfo1, taskMetrics1)) + listener.onTaskEnd(SparkListenerTaskEnd(1, 0, "obliteration", Success, taskInfo1, taskMetrics1)) assert(listener.executorIdToStorageStatus("big").numBlocks === 2) assert(listener.executorIdToStorageStatus("fat").numBlocks === 0) assert(listener.executorIdToStorageStatus("big").containsBlock(RDDBlockId(1, 1))) assert(listener.executorIdToStorageStatus("big").containsBlock(RDDBlockId(1, 2))) assert(listener.executorIdToStorageStatus("fat").numBlocks === 0) - listener.onTaskEnd(SparkListenerTaskEnd(1, "obliteration", Success, taskInfo2, taskMetrics2)) + listener.onTaskEnd(SparkListenerTaskEnd(1, 0, "obliteration", Success, taskInfo2, taskMetrics2)) assert(listener.executorIdToStorageStatus("big").numBlocks === 2) assert(listener.executorIdToStorageStatus("fat").numBlocks === 1) assert(listener.executorIdToStorageStatus("big").containsBlock(RDDBlockId(1, 1))) @@ -111,13 +111,14 @@ class StorageStatusListenerSuite extends FunSuite { val droppedBlock3 = (RDDBlockId(4, 0), BlockStatus(StorageLevel.NONE, 0L, 0L, 0L)) taskMetrics1.updatedBlocks = Some(Seq(droppedBlock1, droppedBlock3)) taskMetrics2.updatedBlocks = Some(Seq(droppedBlock2, droppedBlock3)) - listener.onTaskEnd(SparkListenerTaskEnd(1, "obliteration", Success, taskInfo1, taskMetrics1)) + + listener.onTaskEnd(SparkListenerTaskEnd(1, 0, "obliteration", Success, taskInfo1, taskMetrics1)) assert(listener.executorIdToStorageStatus("big").numBlocks === 1) assert(listener.executorIdToStorageStatus("fat").numBlocks === 1) assert(!listener.executorIdToStorageStatus("big").containsBlock(RDDBlockId(1, 1))) assert(listener.executorIdToStorageStatus("big").containsBlock(RDDBlockId(1, 2))) assert(listener.executorIdToStorageStatus("fat").containsBlock(RDDBlockId(4, 0))) - listener.onTaskEnd(SparkListenerTaskEnd(1, "obliteration", Success, taskInfo2, taskMetrics2)) + listener.onTaskEnd(SparkListenerTaskEnd(1, 0, "obliteration", Success, taskInfo2, taskMetrics2)) assert(listener.executorIdToStorageStatus("big").numBlocks === 1) assert(listener.executorIdToStorageStatus("fat").numBlocks === 0) assert(!listener.executorIdToStorageStatus("big").containsBlock(RDDBlockId(1, 1))) @@ -135,8 +136,8 @@ class StorageStatusListenerSuite extends FunSuite { val block3 = (RDDBlockId(4, 0), BlockStatus(StorageLevel.DISK_ONLY, 0L, 300L, 0L)) taskMetrics1.updatedBlocks = Some(Seq(block1, block2)) taskMetrics2.updatedBlocks = Some(Seq(block3)) - listener.onTaskEnd(SparkListenerTaskEnd(1, "obliteration", Success, taskInfo1, taskMetrics1)) - listener.onTaskEnd(SparkListenerTaskEnd(1, "obliteration", Success, taskInfo1, taskMetrics2)) + listener.onTaskEnd(SparkListenerTaskEnd(1, 0, "obliteration", Success, taskInfo1, taskMetrics1)) + listener.onTaskEnd(SparkListenerTaskEnd(1, 0, "obliteration", Success, taskInfo1, taskMetrics2)) assert(listener.executorIdToStorageStatus("big").numBlocks === 3) // Unpersist RDD diff --git a/core/src/test/scala/org/apache/spark/ui/jobs/JobProgressListenerSuite.scala b/core/src/test/scala/org/apache/spark/ui/jobs/JobProgressListenerSuite.scala index 147ec0bc52e39..a3c1c0c765fc9 100644 --- a/core/src/test/scala/org/apache/spark/ui/jobs/JobProgressListenerSuite.scala +++ b/core/src/test/scala/org/apache/spark/ui/jobs/JobProgressListenerSuite.scala @@ -34,12 +34,12 @@ class JobProgressListenerSuite extends FunSuite with LocalSparkContext with Matc val listener = new JobProgressListener(conf) def createStageStartEvent(stageId: Int) = { - val stageInfo = new StageInfo(stageId, stageId.toString, 0, null, "") + val stageInfo = new StageInfo(stageId, 0, stageId.toString, 0, null, "") SparkListenerStageSubmitted(stageInfo) } def createStageEndEvent(stageId: Int) = { - val stageInfo = new StageInfo(stageId, stageId.toString, 0, null, "") + val stageInfo = new StageInfo(stageId, 0, stageId.toString, 0, null, "") SparkListenerStageCompleted(stageInfo) } @@ -70,33 +70,37 @@ class JobProgressListenerSuite extends FunSuite with LocalSparkContext with Matc taskInfo.finishTime = 1 var task = new ShuffleMapTask(0) val taskType = Utils.getFormattedClassName(task) - listener.onTaskEnd(SparkListenerTaskEnd(task.stageId, taskType, Success, taskInfo, taskMetrics)) - assert(listener.stageIdToData.getOrElse(0, fail()).executorSummary.getOrElse("exe-1", fail()) - .shuffleRead === 1000) + listener.onTaskEnd( + SparkListenerTaskEnd(task.stageId, 0, taskType, Success, taskInfo, taskMetrics)) + assert(listener.stageIdToData.getOrElse((0, 0), fail()) + .executorSummary.getOrElse("exe-1", fail()).shuffleRead === 1000) // finish a task with unknown executor-id, nothing should happen taskInfo = new TaskInfo(1234L, 0, 1, 1000L, "exe-unknown", "host1", TaskLocality.NODE_LOCAL, true) taskInfo.finishTime = 1 task = new ShuffleMapTask(0) - listener.onTaskEnd(SparkListenerTaskEnd(task.stageId, taskType, Success, taskInfo, taskMetrics)) + listener.onTaskEnd( + SparkListenerTaskEnd(task.stageId, 0, taskType, Success, taskInfo, taskMetrics)) assert(listener.stageIdToData.size === 1) // finish this task, should get updated duration taskInfo = new TaskInfo(1235L, 0, 1, 0L, "exe-1", "host1", TaskLocality.NODE_LOCAL, false) taskInfo.finishTime = 1 task = new ShuffleMapTask(0) - listener.onTaskEnd(SparkListenerTaskEnd(task.stageId, taskType, Success, taskInfo, taskMetrics)) - assert(listener.stageIdToData.getOrElse(0, fail()).executorSummary.getOrElse("exe-1", fail()) - .shuffleRead === 2000) + listener.onTaskEnd( + SparkListenerTaskEnd(task.stageId, 0, taskType, Success, taskInfo, taskMetrics)) + assert(listener.stageIdToData.getOrElse((0, 0), fail()) + .executorSummary.getOrElse("exe-1", fail()).shuffleRead === 2000) // finish this task, should get updated duration taskInfo = new TaskInfo(1236L, 0, 2, 0L, "exe-2", "host1", TaskLocality.NODE_LOCAL, false) taskInfo.finishTime = 1 task = new ShuffleMapTask(0) - listener.onTaskEnd(SparkListenerTaskEnd(task.stageId, taskType, Success, taskInfo, taskMetrics)) - assert(listener.stageIdToData.getOrElse(0, fail()).executorSummary.getOrElse("exe-2", fail()) - .shuffleRead === 1000) + listener.onTaskEnd( + SparkListenerTaskEnd(task.stageId, 0, taskType, Success, taskInfo, taskMetrics)) + assert(listener.stageIdToData.getOrElse((0, 0), fail()) + .executorSummary.getOrElse("exe-2", fail()).shuffleRead === 1000) } test("test task success vs failure counting for different task end reasons") { @@ -119,16 +123,18 @@ class JobProgressListenerSuite extends FunSuite with LocalSparkContext with Matc UnknownReason) var failCount = 0 for (reason <- taskFailedReasons) { - listener.onTaskEnd(SparkListenerTaskEnd(task.stageId, taskType, reason, taskInfo, metrics)) + listener.onTaskEnd( + SparkListenerTaskEnd(task.stageId, 0, taskType, reason, taskInfo, metrics)) failCount += 1 - assert(listener.stageIdToData(task.stageId).numCompleteTasks === 0) - assert(listener.stageIdToData(task.stageId).numFailedTasks === failCount) + assert(listener.stageIdToData((task.stageId, 0)).numCompleteTasks === 0) + assert(listener.stageIdToData((task.stageId, 0)).numFailedTasks === failCount) } // Make sure we count success as success. - listener.onTaskEnd(SparkListenerTaskEnd(task.stageId, taskType, Success, taskInfo, metrics)) - assert(listener.stageIdToData(task.stageId).numCompleteTasks === 1) - assert(listener.stageIdToData(task.stageId).numFailedTasks === failCount) + listener.onTaskEnd( + SparkListenerTaskEnd(task.stageId, 1, taskType, Success, taskInfo, metrics)) + assert(listener.stageIdToData((task.stageId, 1)).numCompleteTasks === 1) + assert(listener.stageIdToData((task.stageId, 1)).numFailedTasks === failCount) } test("test update metrics") { @@ -163,18 +169,18 @@ class JobProgressListenerSuite extends FunSuite with LocalSparkContext with Matc taskInfo } - listener.onTaskStart(SparkListenerTaskStart(0, makeTaskInfo(1234L))) - listener.onTaskStart(SparkListenerTaskStart(0, makeTaskInfo(1235L))) - listener.onTaskStart(SparkListenerTaskStart(1, makeTaskInfo(1236L))) - listener.onTaskStart(SparkListenerTaskStart(1, makeTaskInfo(1237L))) + listener.onTaskStart(SparkListenerTaskStart(0, 0, makeTaskInfo(1234L))) + listener.onTaskStart(SparkListenerTaskStart(0, 0, makeTaskInfo(1235L))) + listener.onTaskStart(SparkListenerTaskStart(1, 0, makeTaskInfo(1236L))) + listener.onTaskStart(SparkListenerTaskStart(1, 0, makeTaskInfo(1237L))) listener.onExecutorMetricsUpdate(SparkListenerExecutorMetricsUpdate(execId, Array( - (1234L, 0, makeTaskMetrics(0)), - (1235L, 0, makeTaskMetrics(100)), - (1236L, 1, makeTaskMetrics(200))))) + (1234L, 0, 0, makeTaskMetrics(0)), + (1235L, 0, 0, makeTaskMetrics(100)), + (1236L, 1, 0, makeTaskMetrics(200))))) - var stage0Data = listener.stageIdToData.get(0).get - var stage1Data = listener.stageIdToData.get(1).get + var stage0Data = listener.stageIdToData.get((0, 0)).get + var stage1Data = listener.stageIdToData.get((1, 0)).get assert(stage0Data.shuffleReadBytes == 102) assert(stage1Data.shuffleReadBytes == 201) assert(stage0Data.shuffleWriteBytes == 106) @@ -195,14 +201,14 @@ class JobProgressListenerSuite extends FunSuite with LocalSparkContext with Matc .totalBlocksFetched == 202) // task that was included in a heartbeat - listener.onTaskEnd(SparkListenerTaskEnd(0, taskType, Success, makeTaskInfo(1234L, 1), + listener.onTaskEnd(SparkListenerTaskEnd(0, 0, taskType, Success, makeTaskInfo(1234L, 1), makeTaskMetrics(300))) // task that wasn't included in a heartbeat - listener.onTaskEnd(SparkListenerTaskEnd(1, taskType, Success, makeTaskInfo(1237L, 1), + listener.onTaskEnd(SparkListenerTaskEnd(1, 0, taskType, Success, makeTaskInfo(1237L, 1), makeTaskMetrics(400))) - stage0Data = listener.stageIdToData.get(0).get - stage1Data = listener.stageIdToData.get(1).get + stage0Data = listener.stageIdToData.get((0, 0)).get + stage1Data = listener.stageIdToData.get((1, 0)).get assert(stage0Data.shuffleReadBytes == 402) assert(stage1Data.shuffleReadBytes == 602) assert(stage0Data.shuffleWriteBytes == 406) diff --git a/core/src/test/scala/org/apache/spark/ui/storage/StorageTabSuite.scala b/core/src/test/scala/org/apache/spark/ui/storage/StorageTabSuite.scala index 6e68dcb3425aa..b860177705d84 100644 --- a/core/src/test/scala/org/apache/spark/ui/storage/StorageTabSuite.scala +++ b/core/src/test/scala/org/apache/spark/ui/storage/StorageTabSuite.scala @@ -53,7 +53,7 @@ class StorageTabSuite extends FunSuite with BeforeAndAfter { assert(storageListener.rddInfoList.isEmpty) // 2 RDDs are known, but none are cached - val stageInfo0 = new StageInfo(0, "0", 100, Seq(rddInfo0, rddInfo1), "details") + val stageInfo0 = new StageInfo(0, 0, "0", 100, Seq(rddInfo0, rddInfo1), "details") bus.postToAll(SparkListenerStageSubmitted(stageInfo0)) assert(storageListener._rddInfoMap.size === 2) assert(storageListener.rddInfoList.isEmpty) @@ -63,7 +63,7 @@ class StorageTabSuite extends FunSuite with BeforeAndAfter { val rddInfo3Cached = rddInfo3 rddInfo2Cached.numCachedPartitions = 1 rddInfo3Cached.numCachedPartitions = 1 - val stageInfo1 = new StageInfo(1, "0", 100, Seq(rddInfo2Cached, rddInfo3Cached), "details") + val stageInfo1 = new StageInfo(1, 0, "0", 100, Seq(rddInfo2Cached, rddInfo3Cached), "details") bus.postToAll(SparkListenerStageSubmitted(stageInfo1)) assert(storageListener._rddInfoMap.size === 4) assert(storageListener.rddInfoList.size === 2) @@ -71,7 +71,7 @@ class StorageTabSuite extends FunSuite with BeforeAndAfter { // Submitting RDDInfos with duplicate IDs does nothing val rddInfo0Cached = new RDDInfo(0, "freedom", 100, StorageLevel.MEMORY_ONLY) rddInfo0Cached.numCachedPartitions = 1 - val stageInfo0Cached = new StageInfo(0, "0", 100, Seq(rddInfo0), "details") + val stageInfo0Cached = new StageInfo(0, 0, "0", 100, Seq(rddInfo0), "details") bus.postToAll(SparkListenerStageSubmitted(stageInfo0Cached)) assert(storageListener._rddInfoMap.size === 4) assert(storageListener.rddInfoList.size === 2) @@ -87,7 +87,7 @@ class StorageTabSuite extends FunSuite with BeforeAndAfter { val rddInfo1Cached = rddInfo1 rddInfo0Cached.numCachedPartitions = 1 rddInfo1Cached.numCachedPartitions = 1 - val stageInfo0 = new StageInfo(0, "0", 100, Seq(rddInfo0Cached, rddInfo1Cached), "details") + val stageInfo0 = new StageInfo(0, 0, "0", 100, Seq(rddInfo0Cached, rddInfo1Cached), "details") bus.postToAll(SparkListenerStageSubmitted(stageInfo0)) assert(storageListener._rddInfoMap.size === 2) assert(storageListener.rddInfoList.size === 2) @@ -106,7 +106,7 @@ class StorageTabSuite extends FunSuite with BeforeAndAfter { val myRddInfo0 = rddInfo0 val myRddInfo1 = rddInfo1 val myRddInfo2 = rddInfo2 - val stageInfo0 = new StageInfo(0, "0", 100, Seq(myRddInfo0, myRddInfo1, myRddInfo2), "details") + val stageInfo0 = new StageInfo(0, 0, "0", 100, Seq(myRddInfo0, myRddInfo1, myRddInfo2), "details") bus.postToAll(SparkListenerBlockManagerAdded(bm1, 1000L)) bus.postToAll(SparkListenerStageSubmitted(stageInfo0)) assert(storageListener._rddInfoMap.size === 3) @@ -116,7 +116,7 @@ class StorageTabSuite extends FunSuite with BeforeAndAfter { assert(!storageListener._rddInfoMap(2).isCached) // Task end with no updated blocks. This should not change anything. - bus.postToAll(SparkListenerTaskEnd(0, "obliteration", Success, taskInfo, new TaskMetrics)) + bus.postToAll(SparkListenerTaskEnd(0, 0, "obliteration", Success, taskInfo, new TaskMetrics)) assert(storageListener._rddInfoMap.size === 3) assert(storageListener.rddInfoList.size === 0) @@ -128,7 +128,7 @@ class StorageTabSuite extends FunSuite with BeforeAndAfter { (RDDBlockId(0, 102), BlockStatus(memAndDisk, 400L, 0L, 200L)), (RDDBlockId(1, 20), BlockStatus(memAndDisk, 0L, 240L, 0L)) )) - bus.postToAll(SparkListenerTaskEnd(1, "obliteration", Success, taskInfo, metrics1)) + bus.postToAll(SparkListenerTaskEnd(1, 0, "obliteration", Success, taskInfo, metrics1)) assert(storageListener._rddInfoMap(0).memSize === 800L) assert(storageListener._rddInfoMap(0).diskSize === 400L) assert(storageListener._rddInfoMap(0).tachyonSize === 200L) @@ -150,7 +150,7 @@ class StorageTabSuite extends FunSuite with BeforeAndAfter { (RDDBlockId(2, 40), BlockStatus(none, 0L, 0L, 0L)), // doesn't actually exist (RDDBlockId(4, 80), BlockStatus(none, 0L, 0L, 0L)) // doesn't actually exist )) - bus.postToAll(SparkListenerTaskEnd(2, "obliteration", Success, taskInfo, metrics2)) + bus.postToAll(SparkListenerTaskEnd(2, 0, "obliteration", Success, taskInfo, metrics2)) assert(storageListener._rddInfoMap(0).memSize === 400L) assert(storageListener._rddInfoMap(0).diskSize === 400L) assert(storageListener._rddInfoMap(0).tachyonSize === 200L) diff --git a/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala b/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala index 97ffb07662482..7164c3abf6a27 100644 --- a/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala +++ b/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala @@ -35,13 +35,13 @@ class JsonProtocolSuite extends FunSuite { val stageSubmitted = SparkListenerStageSubmitted(makeStageInfo(100, 200, 300, 400L, 500L), properties) val stageCompleted = SparkListenerStageCompleted(makeStageInfo(101, 201, 301, 401L, 501L)) - val taskStart = SparkListenerTaskStart(111, makeTaskInfo(222L, 333, 1, 444L, false)) + val taskStart = SparkListenerTaskStart(111, 0, makeTaskInfo(222L, 333, 1, 444L, false)) val taskGettingResult = SparkListenerTaskGettingResult(makeTaskInfo(1000L, 2000, 5, 3000L, true)) - val taskEnd = SparkListenerTaskEnd(1, "ShuffleMapTask", Success, + val taskEnd = SparkListenerTaskEnd(1, 0, "ShuffleMapTask", Success, makeTaskInfo(123L, 234, 67, 345L, false), makeTaskMetrics(300L, 400L, 500L, 600L, 700, 800, hasHadoopInput = false)) - val taskEndWithHadoopInput = SparkListenerTaskEnd(1, "ShuffleMapTask", Success, + val taskEndWithHadoopInput = SparkListenerTaskEnd(1, 0, "ShuffleMapTask", Success, makeTaskInfo(123L, 234, 67, 345L, false), makeTaskMetrics(300L, 400L, 500L, 600L, 700, 800, hasHadoopInput = true)) val jobStart = SparkListenerJobStart(10, Seq[Int](1, 2, 3, 4), properties) @@ -485,7 +485,7 @@ class JsonProtocolSuite extends FunSuite { private def makeStageInfo(a: Int, b: Int, c: Int, d: Long, e: Long) = { val rddInfos = (0 until a % 5).map { i => makeRddInfo(a + i, b + i, c + i, d + i, e + i) } - val stageInfo = new StageInfo(a, "greetings", b, rddInfos, "details") + val stageInfo = new StageInfo(a, 0, "greetings", b, rddInfos, "details") val (acc1, acc2) = (makeAccumulableInfo(1), makeAccumulableInfo(2)) stageInfo.accumulables(acc1.id) = acc1 stageInfo.accumulables(acc2.id) = acc2 From 6c08b07522302c4a21e8c70a8552250856b3820e Mon Sep 17 00:00:00 2001 From: Reynold Xin Date: Mon, 18 Aug 2014 18:55:05 -0700 Subject: [PATCH 2/7] Addressed code review feedback. --- .../apache/spark/scheduler/DAGScheduler.scala | 72 +++++++++++-------- .../org/apache/spark/scheduler/Stage.scala | 7 +- .../spark/ui/jobs/JobProgressListener.scala | 11 +-- .../org/apache/spark/ui/jobs/PoolTable.scala | 4 +- .../org/apache/spark/ui/jobs/StagePage.scala | 3 +- 5 files changed, 57 insertions(+), 40 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala index f2652de086709..6b505429d15f5 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala @@ -677,7 +677,7 @@ class DAGScheduler( } private[scheduler] def handleBeginEvent(task: Task[_], taskInfo: TaskInfo) { - val stageInfo = stageIdToStage(task.stageId).info + val stageInfo = stageIdToStage(task.stageId).latestInfo listenerBus.post(SparkListenerTaskStart(task.stageId, stageInfo.attemptId, taskInfo)) submitWaitingStages() } @@ -696,8 +696,8 @@ class DAGScheduler( // is in the process of getting stopped. val stageFailedMessage = "Stage cancelled because SparkContext was shut down" runningStages.foreach { stage => - stage.info.stageFailed(stageFailedMessage) - listenerBus.post(SparkListenerStageCompleted(stage.info)) + stage.latestInfo.stageFailed(stageFailedMessage) + listenerBus.post(SparkListenerStageCompleted(stage.latestInfo)) } listenerBus.post(SparkListenerJobEnd(job.jobId, JobFailed(error))) } @@ -782,7 +782,16 @@ class DAGScheduler( logDebug("submitMissingTasks(" + stage + ")") // Get our pending tasks and remember them in our pendingTasks entry stage.pendingTasks.clear() - var tasks = ArrayBuffer[Task[_]]() + + // First figure out the indexes of partition ids to compute. + val partitionsToCompute: Seq[Int] = { + if (stage.isShuffleMap) { + (0 until stage.numPartitions).filter(id => stage.outputLocs(id) == Nil) + } else { + val job = stage.resultOfJob.get + (0 until job.numPartitions).filter(id => !job.finished(id)) + } + } val properties = if (jobIdToActiveJob.contains(jobId)) { jobIdToActiveJob(stage.jobId).properties @@ -796,7 +805,8 @@ class DAGScheduler( // serializable. If tasks are not serializable, a SparkListenerStageCompleted event // will be posted, which should always come after a corresponding SparkListenerStageSubmitted // event. - listenerBus.post(SparkListenerStageSubmitted(stage.info, properties)) + stage.latestInfo = StageInfo.fromStage(stage, Some(partitionsToCompute.size)) + listenerBus.post(SparkListenerStageSubmitted(stage.latestInfo, properties)) // TODO: Maybe we can keep the taskBinary in Stage to avoid serializing it multiple times. // Broadcasted binary for the task, used to dispatch tasks to executors. Note that we broadcast @@ -827,25 +837,22 @@ class DAGScheduler( return } - if (stage.isShuffleMap) { - for (p <- 0 until stage.numPartitions if stage.outputLocs(p) == Nil) { - val locs = getPreferredLocs(stage.rdd, p) - val part = stage.rdd.partitions(p) - tasks += new ShuffleMapTask(stage.id, taskBinary, part, locs) + val tasks: Seq[Task[_]] = if (stage.isShuffleMap) { + partitionsToCompute.map { id => + val locs = getPreferredLocs(stage.rdd, id) + val part = stage.rdd.partitions(id) + new ShuffleMapTask(stage.id, taskBinary, part, locs) } } else { - // This is a final stage; figure out its job's missing partitions val job = stage.resultOfJob.get - for (id <- 0 until job.numPartitions if !job.finished(id)) { + partitionsToCompute.map { id => val p: Int = job.partitions(id) val part = stage.rdd.partitions(p) val locs = getPreferredLocs(stage.rdd, p) - tasks += new ResultTask(stage.id, taskBinary, part, locs, id) + new ResultTask(stage.id, taskBinary, part, locs, id) } } - stage.info = StageInfo.fromStage(stage, Some(tasks.size)) - if (tasks.size > 0) { // Preemptively serialize a task to make sure it can be serialized. We are catching this // exception here because it would be fairly hard to catch the non-serializable exception @@ -872,11 +879,11 @@ class DAGScheduler( logDebug("New pending tasks: " + stage.pendingTasks) taskScheduler.submitTasks( new TaskSet(tasks.toArray, stage.id, stage.newAttemptId(), stage.jobId, properties)) - stage.info.submissionTime = Some(clock.getTime()) + stage.latestInfo.submissionTime = Some(clock.getTime()) } else { // Because we posted SparkListenerStageSubmitted earlier, we should post // SparkListenerStageCompleted here in case there are no tasks to run. - listenerBus.post(SparkListenerStageCompleted(stage.info)) + listenerBus.post(SparkListenerStageCompleted(stage.latestInfo)) logDebug("Stage " + stage + " is actually done; %b %d %d".format( stage.isAvailable, stage.numAvailableOutputs, stage.numPartitions)) runningStages -= stage @@ -890,7 +897,7 @@ class DAGScheduler( private[scheduler] def handleTaskCompletion(event: CompletionEvent) { val task = event.task val stageId = task.stageId - val stageInfo = stageIdToStage(task.stageId).info + val stageInfo = stageIdToStage(task.stageId).latestInfo val taskType = Utils.getFormattedClassName(task) // The success case is dealt with separately below, since we need to compute accumulator @@ -906,14 +913,19 @@ class DAGScheduler( } val stage = stageIdToStage(task.stageId) - def markStageAsFinished(stage: Stage) = { - val serviceTime = stage.info.submissionTime match { + def markStageAsFinished(stage: Stage, isSuccessful: Boolean) = { + val serviceTime = stage.latestInfo.submissionTime match { case Some(t) => "%.03f".format((clock.getTime() - t) / 1000.0) case _ => "Unknown" } - logInfo("%s (%s) finished in %s s".format(stage, stage.name, serviceTime)) - stage.info.completionTime = Some(clock.getTime()) - listenerBus.post(SparkListenerStageCompleted(stage.info)) + if (isSuccessful) { + logInfo("%s (%s) finished in %s s".format(stage, stage.name, serviceTime)) + } else { + + logInfo("%s (%s) failed in %s s".format(stage, stage.name, serviceTime)) + } + stage.latestInfo.completionTime = Some(clock.getTime()) + listenerBus.post(SparkListenerStageCompleted(stage.latestInfo)) runningStages -= stage } event.reason match { @@ -928,7 +940,7 @@ class DAGScheduler( val name = acc.name.get val stringPartialValue = Accumulators.stringifyPartialValue(partialValue) val stringValue = Accumulators.stringifyValue(acc.value) - stage.info.accumulables(id) = AccumulableInfo(id, name, stringValue) + stage.latestInfo.accumulables(id) = AccumulableInfo(id, name, stringValue) event.taskInfo.accumulables += AccumulableInfo(id, name, Some(stringPartialValue), stringValue) } @@ -951,7 +963,7 @@ class DAGScheduler( job.numFinished += 1 // If the whole job has finished, remove it if (job.numFinished == job.numPartitions) { - markStageAsFinished(stage) + markStageAsFinished(stage, isSuccessful = true) cleanupStateForJobAndIndependentStages(job) listenerBus.post(SparkListenerJobEnd(job.jobId, JobSucceeded)) } @@ -980,7 +992,7 @@ class DAGScheduler( stage.addOutputLoc(smt.partitionId, status) } if (runningStages.contains(stage) && stage.pendingTasks.isEmpty) { - markStageAsFinished(stage) + markStageAsFinished(stage, isSuccessful = true) logInfo("looking for newly runnable stages") logInfo("running: " + runningStages) logInfo("waiting: " + waitingStages) @@ -1033,7 +1045,7 @@ class DAGScheduler( case FetchFailed(bmAddress, shuffleId, mapId, reduceId) => // Mark the stage that the reducer was in as unrunnable val failedStage = stageIdToStage(task.stageId) - listenerBus.post(SparkListenerStageCompleted(failedStage.info)) + markStageAsFinished(failedStage, isSuccessful = false) runningStages -= failedStage // TODO: Cancel running tasks in the stage logInfo("Marking " + failedStage + " (" + failedStage.name + @@ -1147,7 +1159,7 @@ class DAGScheduler( } val dependentJobs: Seq[ActiveJob] = activeJobs.filter(job => stageDependsOn(job.finalStage, failedStage)).toSeq - failedStage.info.completionTime = Some(clock.getTime()) + failedStage.latestInfo.completionTime = Some(clock.getTime()) for (job <- dependentJobs) { failJobAndIndependentStages(job, s"Job aborted due to stage failure: $reason") } @@ -1187,8 +1199,8 @@ class DAGScheduler( if (runningStages.contains(stage)) { try { // cancelTasks will fail if a SchedulerBackend does not implement killTask taskScheduler.cancelTasks(stageId, shouldInterruptThread) - stage.info.stageFailed(failureReason) - listenerBus.post(SparkListenerStageCompleted(stage.info)) + stage.latestInfo.stageFailed(failureReason) + listenerBus.post(SparkListenerStageCompleted(stage.latestInfo)) } catch { case e: UnsupportedOperationException => logInfo(s"Could not cancel tasks for stage $stageId", e) diff --git a/core/src/main/scala/org/apache/spark/scheduler/Stage.scala b/core/src/main/scala/org/apache/spark/scheduler/Stage.scala index 1636da24fa78a..071568cdfb429 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/Stage.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/Stage.scala @@ -43,6 +43,9 @@ import org.apache.spark.util.CallSite * stage, the callSite gives the user code that created the RDD being shuffled. For a result * stage, the callSite gives the user code that executes the associated action (e.g. count()). * + * A single stage can consist of multiple attempts. In that case, the latestInfo field will + * be updated for each attempt. + * */ private[spark] class Stage( val id: Int, @@ -71,8 +74,8 @@ private[spark] class Stage( val name = callSite.shortForm val details = callSite.longForm - /** Pointer to the [StageInfo] object, set by DAGScheduler. */ - var info: StageInfo = StageInfo.fromStage(this) + /** Pointer to the latest [StageInfo] object, set by DAGScheduler. */ + var latestInfo: StageInfo = StageInfo.fromStage(this) def isAvailable: Boolean = { if (!isShuffleMap) { diff --git a/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressListener.scala b/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressListener.scala index 6b72a27e81aff..6ac852fefca34 100644 --- a/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressListener.scala +++ b/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressListener.scala @@ -46,13 +46,14 @@ class JobProgressListener(conf: SparkConf) extends SparkListener with Logging { // Map from stageId to StageInfo val activeStages = new HashMap[Int, StageInfo] - // Map from (stageId, attemptId) to StageInfo + // Map from (stageId, attemptId) to StageUIData val stageIdToData = new HashMap[(Int, Int), StageUIData] val completedStages = ListBuffer[StageInfo]() val failedStages = ListBuffer[StageInfo]() - val poolToActiveStages = HashMap[String, HashMap[(Int, Int), StageInfo]]() + // Map from pool name to a hash map (map from stage id to StageInfo). + val poolToActiveStages = HashMap[String, HashMap[Int, StageInfo]]() val executorIdToBlockManagerId = HashMap[String, BlockManagerId]() @@ -72,7 +73,7 @@ class JobProgressListener(conf: SparkConf) extends SparkListener with Logging { } poolToActiveStages.get(stageData.schedulingPool).foreach { hashMap => - hashMap.remove((stage.stageId, stage.attemptId)) + hashMap.remove(stage.stageId) } activeStages.remove(stage.stageId) if (stage.failureReason.isEmpty) { @@ -109,8 +110,8 @@ class JobProgressListener(conf: SparkConf) extends SparkListener with Logging { p => Option(p.getProperty(SparkContext.SPARK_JOB_DESCRIPTION)) } - val stages = poolToActiveStages.getOrElseUpdate(poolName, new HashMap[(Int, Int), StageInfo]()) - stages((stage.stageId, stage.attemptId)) = stage + val stages = poolToActiveStages.getOrElseUpdate(poolName, new HashMap[Int, StageInfo]) + stages(stage.stageId) = stage } override def onTaskStart(taskStart: SparkListenerTaskStart) = synchronized { diff --git a/core/src/main/scala/org/apache/spark/ui/jobs/PoolTable.scala b/core/src/main/scala/org/apache/spark/ui/jobs/PoolTable.scala index 0421f2d1322d8..64178e1e33d41 100644 --- a/core/src/main/scala/org/apache/spark/ui/jobs/PoolTable.scala +++ b/core/src/main/scala/org/apache/spark/ui/jobs/PoolTable.scala @@ -34,7 +34,7 @@ private[ui] class PoolTable(pools: Seq[Schedulable], parent: JobProgressTab) { } private def poolTable( - makeRow: (Schedulable, HashMap[String, HashMap[(Int, Int), StageInfo]]) => Seq[Node], + makeRow: (Schedulable, HashMap[String, HashMap[Int, StageInfo]]) => Seq[Node], rows: Seq[Schedulable]): Seq[Node] = { @@ -53,7 +53,7 @@ private[ui] class PoolTable(pools: Seq[Schedulable], parent: JobProgressTab) { private def poolRow( p: Schedulable, - poolToActiveStages: HashMap[String, HashMap[(Int, Int), StageInfo]]): Seq[Node] = { + poolToActiveStages: HashMap[String, HashMap[Int, StageInfo]]): Seq[Node] = { val activeStages = poolToActiveStages.get(p.name) match { case Some(stages) => stages.size case None => 0 diff --git a/core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala b/core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala index 8448fb40899c4..db01be596e073 100644 --- a/core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala +++ b/core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala @@ -43,7 +43,8 @@ private[ui] class StagePage(parent: JobProgressTab) extends WebUIPage("stage") {

Summary Metrics

No tasks have started yet

Tasks

No tasks have started yet - return UIUtils.headerSparkPage("Details for Stage %s".format(stageId), content, parent) + return UIUtils.headerSparkPage( + s"Details for Stage $stageId (Attempt $stageAttemptId)", content, parent) } val stageData = stageDataOption.get From 0f3607501325cf2a1cf9443371c1a43a578e7119 Mon Sep 17 00:00:00 2001 From: Reynold Xin Date: Tue, 19 Aug 2014 00:03:01 -0700 Subject: [PATCH 3/7] Mark unknown stage attempt with id -1 and drop that in JobProgressListener. --- .../main/scala/org/apache/spark/scheduler/DAGScheduler.scala | 5 ++--- .../scala/org/apache/spark/ui/jobs/JobProgressListener.scala | 5 ++++- 2 files changed, 6 insertions(+), 4 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala index 6b505429d15f5..6b37efae51020 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala @@ -897,13 +897,13 @@ class DAGScheduler( private[scheduler] def handleTaskCompletion(event: CompletionEvent) { val task = event.task val stageId = task.stageId - val stageInfo = stageIdToStage(task.stageId).latestInfo val taskType = Utils.getFormattedClassName(task) // The success case is dealt with separately below, since we need to compute accumulator // updates before posting. if (event.reason != Success) { - listenerBus.post(SparkListenerTaskEnd(stageId, stageInfo.attemptId, taskType, event.reason, + val stageAttemptId = stageIdToStage.get(task.stageId).map(_.latestInfo.attemptId).orElse(-1) + listenerBus.post(SparkListenerTaskEnd(stageId, stageAttemptId, taskType, event.reason, event.taskInfo, event.taskMetrics)) } @@ -921,7 +921,6 @@ class DAGScheduler( if (isSuccessful) { logInfo("%s (%s) finished in %s s".format(stage, stage.name, serviceTime)) } else { - logInfo("%s (%s) failed in %s s".format(stage, stage.name, serviceTime)) } stage.latestInfo.completionTime = Some(clock.getTime()) diff --git a/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressListener.scala b/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressListener.scala index 6ac852fefca34..f7f918fd521a9 100644 --- a/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressListener.scala +++ b/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressListener.scala @@ -133,7 +133,10 @@ class JobProgressListener(conf: SparkConf) extends SparkListener with Logging { override def onTaskEnd(taskEnd: SparkListenerTaskEnd) = synchronized { val info = taskEnd.taskInfo - if (info != null) { + // If stage attempt id is -1, it means the DAGScheduler had no idea which attempt this task + // compeletion event is for. Let's just drop it here. This means we might have some speculation + // tasks on the web ui that's never marked as complete. + if (info != null && taskEnd.stageAttemptId != -1) { val stageData = stageIdToData.getOrElseUpdate((taskEnd.stageId, taskEnd.stageAttemptId), { logWarning("Task end for unknown stage " + taskEnd.stageId) new StageUIData From b3e2eedb44a10fa2283ba71cd1078798851da756 Mon Sep 17 00:00:00 2001 From: Reynold Xin Date: Tue, 19 Aug 2014 00:13:39 -0700 Subject: [PATCH 4/7] Oops previous code didn't compile. --- .../scala/org/apache/spark/scheduler/DAGScheduler.scala | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala index 6b37efae51020..8ee821e542021 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala @@ -902,8 +902,8 @@ class DAGScheduler( // The success case is dealt with separately below, since we need to compute accumulator // updates before posting. if (event.reason != Success) { - val stageAttemptId = stageIdToStage.get(task.stageId).map(_.latestInfo.attemptId).orElse(-1) - listenerBus.post(SparkListenerTaskEnd(stageId, stageAttemptId, taskType, event.reason, + val attemptId = stageIdToStage.get(task.stageId).map(_.latestInfo.attemptId).getOrElse(-1) + listenerBus.post(SparkListenerTaskEnd(stageId, attemptId, taskType, event.reason, event.taskInfo, event.taskMetrics)) } @@ -950,8 +950,8 @@ class DAGScheduler( logError(s"Failed to update accumulators for $task", e) } } - listenerBus.post(SparkListenerTaskEnd(stageId, stageInfo.attemptId, taskType, event.reason, - event.taskInfo, event.taskMetrics)) + listenerBus.post(SparkListenerTaskEnd(stageId, stage.latestInfo.attemptId, taskType, + event.reason, event.taskInfo, event.taskMetrics)) stage.pendingTasks -= task task match { case rt: ResultTask[_, _] => From c414c365211d8fe775838369d1d3c9b3d3841985 Mon Sep 17 00:00:00 2001 From: Reynold Xin Date: Tue, 19 Aug 2014 22:26:57 -0700 Subject: [PATCH 5/7] Fixed the hanging in JobCancellationSuite. --- .../scala/org/apache/spark/scheduler/DAGScheduler.scala | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala index 8ee821e542021..75843af639ad5 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala @@ -677,8 +677,10 @@ class DAGScheduler( } private[scheduler] def handleBeginEvent(task: Task[_], taskInfo: TaskInfo) { - val stageInfo = stageIdToStage(task.stageId).latestInfo - listenerBus.post(SparkListenerTaskStart(task.stageId, stageInfo.attemptId, taskInfo)) + // Note that there is a chance that this task is launched after the stage is cancelled. + // In that case, we wouldn't have the stage anymore in stageIdToStage. + val stageAttemptId = stageIdToStage.get(task.stageId).map(_.latestInfo.attemptId).getOrElse(-1) + listenerBus.post(SparkListenerTaskStart(task.stageId, stageAttemptId, taskInfo)) submitWaitingStages() } From 40a6bd55f32273bab93fac504f26473a9c0364f3 Mon Sep 17 00:00:00 2001 From: Reynold Xin Date: Wed, 20 Aug 2014 00:07:51 -0700 Subject: [PATCH 6/7] Updated test suites. --- .../org/apache/spark/util/JsonProtocol.scala | 2 +- .../ui/jobs/JobProgressListenerSuite.scala | 2 +- .../apache/spark/util/JsonProtocolSuite.scala | 468 ++++++++++++++---- 3 files changed, 371 insertions(+), 101 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala b/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala index 2ab9660a509ea..db7384705fc1b 100644 --- a/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala +++ b/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala @@ -189,7 +189,7 @@ private[spark] object JsonProtocol { val completionTime = stageInfo.completionTime.map(JInt(_)).getOrElse(JNothing) val failureReason = stageInfo.failureReason.map(JString(_)).getOrElse(JNothing) ("Stage ID" -> stageInfo.stageId) ~ - ("Attempt ID" -> stageInfo.attemptId) ~ + ("Stage Attempt ID" -> stageInfo.attemptId) ~ ("Stage Name" -> stageInfo.name) ~ ("Number of Tasks" -> stageInfo.numTasks) ~ ("RDD Info" -> rddInfo) ~ diff --git a/core/src/test/scala/org/apache/spark/ui/jobs/JobProgressListenerSuite.scala b/core/src/test/scala/org/apache/spark/ui/jobs/JobProgressListenerSuite.scala index a3c1c0c765fc9..3370dd4156c3f 100644 --- a/core/src/test/scala/org/apache/spark/ui/jobs/JobProgressListenerSuite.scala +++ b/core/src/test/scala/org/apache/spark/ui/jobs/JobProgressListenerSuite.scala @@ -134,7 +134,7 @@ class JobProgressListenerSuite extends FunSuite with LocalSparkContext with Matc listener.onTaskEnd( SparkListenerTaskEnd(task.stageId, 1, taskType, Success, taskInfo, metrics)) assert(listener.stageIdToData((task.stageId, 1)).numCompleteTasks === 1) - assert(listener.stageIdToData((task.stageId, 1)).numFailedTasks === failCount) + assert(listener.stageIdToData((task.stageId, 0)).numFailedTasks === failCount) } test("test update metrics") { diff --git a/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala b/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala index 7164c3abf6a27..2fd3b9cfd221a 100644 --- a/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala +++ b/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala @@ -397,7 +397,8 @@ class JsonProtocolSuite extends FunSuite { private def assertJsonStringEquals(json1: String, json2: String) { val formatJsonString = (json: String) => json.replaceAll("[\\s|]", "") - assert(formatJsonString(json1) === formatJsonString(json2)) + assert(formatJsonString(json1) === formatJsonString(json2), + s"input ${formatJsonString(json1)} got ${formatJsonString(json2)}") } private def assertSeqEquals[T](seq1: Seq[T], seq2: Seq[T], assertEquals: (T, T) => Unit) { @@ -558,84 +559,246 @@ class JsonProtocolSuite extends FunSuite { private val stageSubmittedJsonString = """ - {"Event":"SparkListenerStageSubmitted","Stage Info":{"Stage ID":100,"Stage Name": - "greetings","Number of Tasks":200,"RDD Info":[],"Details":"details", - "Accumulables":[{"ID":2,"Name":"Accumulable2","Update":"delta2","Value":"val2"}, - {"ID":1,"Name":"Accumulable1","Update":"delta1","Value":"val1"}]},"Properties": - {"France":"Paris","Germany":"Berlin","Russia":"Moscow","Ukraine":"Kiev"}} + |{ + | "Event": "SparkListenerStageSubmitted", + | "Stage Info": { + | "Stage ID": 100, + | "Stage Attempt ID": 0, + | "Stage Name": "greetings", + | "Number of Tasks": 200, + | "RDD Info": [], + | "Details": "details", + | "Accumulables": [ + | { + | "ID": 2, + | "Name": "Accumulable2", + | "Update": "delta2", + | "Value": "val2" + | }, + | { + | "ID": 1, + | "Name": "Accumulable1", + | "Update": "delta1", + | "Value": "val1" + | } + | ] + | }, + | "Properties": { + | "France": "Paris", + | "Germany": "Berlin", + | "Russia": "Moscow", + | "Ukraine": "Kiev" + | } + |} """ private val stageCompletedJsonString = """ - {"Event":"SparkListenerStageCompleted","Stage Info":{"Stage ID":101,"Stage Name": - "greetings","Number of Tasks":201,"RDD Info":[{"RDD ID":101,"Name":"mayor","Storage - Level":{"Use Disk":true,"Use Memory":true,"Use Tachyon":false,"Deserialized":true, - "Replication":1},"Number of Partitions":201,"Number of Cached Partitions":301, - "Memory Size":401,"Tachyon Size":0,"Disk Size":501}],"Details":"details", - "Accumulables":[{"ID":2,"Name":"Accumulable2","Update":"delta2","Value":"val2"}, - {"ID":1,"Name":"Accumulable1","Update":"delta1","Value":"val1"}]}} + |{ + | "Event": "SparkListenerStageCompleted", + | "Stage Info": { + | "Stage ID": 101, + | "Stage Attempt ID": 0, + | "Stage Name": "greetings", + | "Number of Tasks": 201, + | "RDD Info": [ + | { + | "RDD ID": 101, + | "Name": "mayor", + | "Storage Level": { + | "Use Disk": true, + | "Use Memory": true, + | "Use Tachyon": false, + | "Deserialized": true, + | "Replication": 1 + | }, + | "Number of Partitions": 201, + | "Number of Cached Partitions": 301, + | "Memory Size": 401, + | "Tachyon Size": 0, + | "Disk Size": 501 + | } + | ], + | "Details": "details", + | "Accumulables": [ + | { + | "ID": 2, + | "Name": "Accumulable2", + | "Update": "delta2", + | "Value": "val2" + | }, + | { + | "ID": 1, + | "Name": "Accumulable1", + | "Update": "delta1", + | "Value": "val1" + | } + | ] + | } + |} """ private val taskStartJsonString = """ - |{"Event":"SparkListenerTaskStart","Stage ID":111,"Task Info":{"Task ID":222, - |"Index":333,"Attempt":1,"Launch Time":444,"Executor ID":"executor","Host":"your kind sir", - |"Locality":"NODE_LOCAL","Speculative":false,"Getting Result Time":0,"Finish Time":0, - |"Failed":false,"Accumulables":[{"ID":1,"Name":"Accumulable1","Update":"delta1", - |"Value":"val1"},{"ID":2,"Name":"Accumulable2","Update":"delta2","Value":"val2"}, - |{"ID":3,"Name":"Accumulable3","Update":"delta3","Value":"val3"}]}} + |{ + | "Event": "SparkListenerTaskStart", + | "Stage ID": 111, + | "Stage Attempt ID": 0, + | "Task Info": { + | "Task ID": 222, + | "Index": 333, + | "Attempt": 1, + | "Launch Time": 444, + | "Executor ID": "executor", + | "Host": "your kind sir", + | "Locality": "NODE_LOCAL", + | "Speculative": false, + | "Getting Result Time": 0, + | "Finish Time": 0, + | "Failed": false, + | "Accumulables": [ + | { + | "ID": 1, + | "Name": "Accumulable1", + | "Update": "delta1", + | "Value": "val1" + | }, + | { + | "ID": 2, + | "Name": "Accumulable2", + | "Update": "delta2", + | "Value": "val2" + | }, + | { + | "ID": 3, + | "Name": "Accumulable3", + | "Update": "delta3", + | "Value": "val3" + | } + | ] + | } + |} """.stripMargin private val taskGettingResultJsonString = """ - |{"Event":"SparkListenerTaskGettingResult","Task Info": - | {"Task ID":1000,"Index":2000,"Attempt":5,"Launch Time":3000,"Executor ID":"executor", - | "Host":"your kind sir","Locality":"NODE_LOCAL","Speculative":true,"Getting Result Time":0, - | "Finish Time":0,"Failed":false, - | "Accumulables":[{"ID":1,"Name":"Accumulable1","Update":"delta1", - | "Value":"val1"},{"ID":2,"Name":"Accumulable2","Update":"delta2","Value":"val2"}, - | {"ID":3,"Name":"Accumulable3","Update":"delta3","Value":"val3"}] + |{ + | "Event": "SparkListenerTaskGettingResult", + | "Task Info": { + | "Task ID": 1000, + | "Index": 2000, + | "Attempt": 5, + | "Launch Time": 3000, + | "Executor ID": "executor", + | "Host": "your kind sir", + | "Locality": "NODE_LOCAL", + | "Speculative": true, + | "Getting Result Time": 0, + | "Finish Time": 0, + | "Failed": false, + | "Accumulables": [ + | { + | "ID": 1, + | "Name": "Accumulable1", + | "Update": "delta1", + | "Value": "val1" + | }, + | { + | "ID": 2, + | "Name": "Accumulable2", + | "Update": "delta2", + | "Value": "val2" + | }, + | { + | "ID": 3, + | "Name": "Accumulable3", + | "Update": "delta3", + | "Value": "val3" + | } + | ] | } |} """.stripMargin private val taskEndJsonString = """ - |{"Event":"SparkListenerTaskEnd","Stage ID":1,"Task Type":"ShuffleMapTask", - |"Task End Reason":{"Reason":"Success"}, - |"Task Info":{ - | "Task ID":123,"Index":234,"Attempt":67,"Launch Time":345,"Executor ID":"executor", - | "Host":"your kind sir","Locality":"NODE_LOCAL","Speculative":false, - | "Getting Result Time":0,"Finish Time":0,"Failed":false, - | "Accumulables":[{"ID":1,"Name":"Accumulable1","Update":"delta1", - | "Value":"val1"},{"ID":2,"Name":"Accumulable2","Update":"delta2","Value":"val2"}, - | {"ID":3,"Name":"Accumulable3","Update":"delta3","Value":"val3"}] - |}, - |"Task Metrics":{ - | "Host Name":"localhost","Executor Deserialize Time":300,"Executor Run Time":400, - | "Result Size":500,"JVM GC Time":600,"Result Serialization Time":700, - | "Memory Bytes Spilled":800,"Disk Bytes Spilled":0, - | "Shuffle Read Metrics":{ - | "Shuffle Finish Time":900, - | "Remote Blocks Fetched":800, - | "Local Blocks Fetched":700, - | "Fetch Wait Time":900, - | "Remote Bytes Read":1000 + |{ + | "Event": "SparkListenerTaskEnd", + | "Stage ID": 1, + | "Stage Attempt ID": 0, + | "Task Type": "ShuffleMapTask", + | "Task End Reason": { + | "Reason": "Success" | }, - | "Shuffle Write Metrics":{ - | "Shuffle Bytes Written":1200, - | "Shuffle Write Time":1500 + | "Task Info": { + | "Task ID": 123, + | "Index": 234, + | "Attempt": 67, + | "Launch Time": 345, + | "Executor ID": "executor", + | "Host": "your kind sir", + | "Locality": "NODE_LOCAL", + | "Speculative": false, + | "Getting Result Time": 0, + | "Finish Time": 0, + | "Failed": false, + | "Accumulables": [ + | { + | "ID": 1, + | "Name": "Accumulable1", + | "Update": "delta1", + | "Value": "val1" + | }, + | { + | "ID": 2, + | "Name": "Accumulable2", + | "Update": "delta2", + | "Value": "val2" + | }, + | { + | "ID": 3, + | "Name": "Accumulable3", + | "Update": "delta3", + | "Value": "val3" + | } + | ] | }, - | "Updated Blocks":[ - | {"Block ID":"rdd_0_0", - | "Status":{ - | "Storage Level":{ - | "Use Disk":true,"Use Memory":true,"Use Tachyon":false,"Deserialized":false, - | "Replication":2 - | }, - | "Memory Size":0,"Tachyon Size":0,"Disk Size":0 + | "Task Metrics": { + | "Host Name": "localhost", + | "Executor Deserialize Time": 300, + | "Executor Run Time": 400, + | "Result Size": 500, + | "JVM GC Time": 600, + | "Result Serialization Time": 700, + | "Memory Bytes Spilled": 800, + | "Disk Bytes Spilled": 0, + | "Shuffle Read Metrics": { + | "Shuffle Finish Time": 900, + | "Remote Blocks Fetched": 800, + | "Local Blocks Fetched": 700, + | "Fetch Wait Time": 900, + | "Remote Bytes Read": 1000 + | }, + | "Shuffle Write Metrics": { + | "Shuffle Bytes Written": 1200, + | "Shuffle Write Time": 1500 + | }, + | "Updated Blocks": [ + | { + | "Block ID": "rdd_0_0", + | "Status": { + | "Storage Level": { + | "Use Disk": true, + | "Use Memory": true, + | "Use Tachyon": false, + | "Deserialized": false, + | "Replication": 2 + | }, + | "Memory Size": 0, + | "Tachyon Size": 0, + | "Disk Size": 0 + | } | } - | } | ] | } |} @@ -643,80 +806,187 @@ class JsonProtocolSuite extends FunSuite { private val taskEndWithHadoopInputJsonString = """ - |{"Event":"SparkListenerTaskEnd","Stage ID":1,"Task Type":"ShuffleMapTask", - |"Task End Reason":{"Reason":"Success"}, - |"Task Info":{ - | "Task ID":123,"Index":234,"Attempt":67,"Launch Time":345,"Executor ID":"executor", - | "Host":"your kind sir","Locality":"NODE_LOCAL","Speculative":false, - | "Getting Result Time":0,"Finish Time":0,"Failed":false, - | "Accumulables":[{"ID":1,"Name":"Accumulable1","Update":"delta1", - | "Value":"val1"},{"ID":2,"Name":"Accumulable2","Update":"delta2","Value":"val2"}, - | {"ID":3,"Name":"Accumulable3","Update":"delta3","Value":"val3"}] - |}, - |"Task Metrics":{ - | "Host Name":"localhost","Executor Deserialize Time":300,"Executor Run Time":400, - | "Result Size":500,"JVM GC Time":600,"Result Serialization Time":700, - | "Memory Bytes Spilled":800,"Disk Bytes Spilled":0, - | "Shuffle Write Metrics":{"Shuffle Bytes Written":1200,"Shuffle Write Time":1500}, - | "Input Metrics":{"Data Read Method":"Hadoop","Bytes Read":2100}, - | "Updated Blocks":[ - | {"Block ID":"rdd_0_0", - | "Status":{ - | "Storage Level":{ - | "Use Disk":true,"Use Memory":true,"Use Tachyon":false,"Deserialized":false, - | "Replication":2 - | }, - | "Memory Size":0,"Tachyon Size":0,"Disk Size":0 + |{ + | "Event": "SparkListenerTaskEnd", + | "Stage ID": 1, + | "Stage Attempt ID": 0, + | "Task Type": "ShuffleMapTask", + | "Task End Reason": { + | "Reason": "Success" + | }, + | "Task Info": { + | "Task ID": 123, + | "Index": 234, + | "Attempt": 67, + | "Launch Time": 345, + | "Executor ID": "executor", + | "Host": "your kind sir", + | "Locality": "NODE_LOCAL", + | "Speculative": false, + | "Getting Result Time": 0, + | "Finish Time": 0, + | "Failed": false, + | "Accumulables": [ + | { + | "ID": 1, + | "Name": "Accumulable1", + | "Update": "delta1", + | "Value": "val1" + | }, + | { + | "ID": 2, + | "Name": "Accumulable2", + | "Update": "delta2", + | "Value": "val2" + | }, + | { + | "ID": 3, + | "Name": "Accumulable3", + | "Update": "delta3", + | "Value": "val3" | } - | } - | ]} + | ] + | }, + | "Task Metrics": { + | "Host Name": "localhost", + | "Executor Deserialize Time": 300, + | "Executor Run Time": 400, + | "Result Size": 500, + | "JVM GC Time": 600, + | "Result Serialization Time": 700, + | "Memory Bytes Spilled": 800, + | "Disk Bytes Spilled": 0, + | "Shuffle Write Metrics": { + | "Shuffle Bytes Written": 1200, + | "Shuffle Write Time": 1500 + | }, + | "Input Metrics": { + | "Data Read Method": "Hadoop", + | "Bytes Read": 2100 + | }, + | "Updated Blocks": [ + | { + | "Block ID": "rdd_0_0", + | "Status": { + | "Storage Level": { + | "Use Disk": true, + | "Use Memory": true, + | "Use Tachyon": false, + | "Deserialized": false, + | "Replication": 2 + | }, + | "Memory Size": 0, + | "Tachyon Size": 0, + | "Disk Size": 0 + | } + | } + | ] + | } |} """ private val jobStartJsonString = """ - {"Event":"SparkListenerJobStart","Job ID":10,"Stage IDs":[1,2,3,4],"Properties": - {"France":"Paris","Germany":"Berlin","Russia":"Moscow","Ukraine":"Kiev"}} + |{ + | "Event": "SparkListenerJobStart", + | "Job ID": 10, + | "Stage IDs": [ + | 1, + | 2, + | 3, + | 4 + | ], + | "Properties": { + | "France": "Paris", + | "Germany": "Berlin", + | "Russia": "Moscow", + | "Ukraine": "Kiev" + | } + |} """ private val jobEndJsonString = """ - {"Event":"SparkListenerJobEnd","Job ID":20,"Job Result":{"Result":"JobSucceeded"}} + |{ + | "Event": "SparkListenerJobEnd", + | "Job ID": 20, + | "Job Result": { + | "Result": "JobSucceeded" + | } + |} """ private val environmentUpdateJsonString = """ - {"Event":"SparkListenerEnvironmentUpdate","JVM Information":{"GC speed":"9999 objects/s", - "Java home":"Land of coffee"},"Spark Properties":{"Job throughput":"80000 jobs/s, - regardless of job type"},"System Properties":{"Username":"guest","Password":"guest"}, - "Classpath Entries":{"Super library":"/tmp/super_library"}} + |{ + | "Event": "SparkListenerEnvironmentUpdate", + | "JVM Information": { + | "GC speed": "9999 objects/s", + | "Java home": "Land of coffee" + | }, + | "Spark Properties": { + | "Job throughput": "80000 jobs/s, regardless of job type" + | }, + | "System Properties": { + | "Username": "guest", + | "Password": "guest" + | }, + | "Classpath Entries": { + | "Super library": "/tmp/super_library" + | } + |} """ private val blockManagerAddedJsonString = """ - {"Event":"SparkListenerBlockManagerAdded","Block Manager ID":{"Executor ID":"Stars", - "Host":"In your multitude...","Port":300,"Netty Port":400},"Maximum Memory":500} + |{ + | "Event": "SparkListenerBlockManagerAdded", + | "Block Manager ID": { + | "Executor ID": "Stars", + | "Host": "In your multitude...", + | "Port": 300, + | "Netty Port": 400 + | }, + | "Maximum Memory": 500 + |} """ private val blockManagerRemovedJsonString = """ - {"Event":"SparkListenerBlockManagerRemoved","Block Manager ID":{"Executor ID":"Scarce", - "Host":"to be counted...","Port":100,"Netty Port":200}} + |{ + | "Event": "SparkListenerBlockManagerRemoved", + | "Block Manager ID": { + | "Executor ID": "Scarce", + | "Host": "to be counted...", + | "Port": 100, + | "Netty Port": 200 + | } + |} """ private val unpersistRDDJsonString = """ - {"Event":"SparkListenerUnpersistRDD","RDD ID":12345} + |{ + | "Event": "SparkListenerUnpersistRDD", + | "RDD ID": 12345 + |} """ private val applicationStartJsonString = """ - {"Event":"SparkListenerApplicationStart","App Name":"The winner of all","Timestamp":42, - "User":"Garfield"} + |{ + | "Event": "SparkListenerApplicationStart", + | "App Name": "The winner of all", + | "Timestamp": 42, + | "User": "Garfield" + |} """ private val applicationEndJsonString = """ - {"Event":"SparkListenerApplicationEnd","Timestamp":42} + |{ + | "Event": "SparkListenerApplicationEnd", + | "Timestamp": 42 + |} """ } From 3ee1d2a81ccf0e12d791ab256d6f85697b90e3bf Mon Sep 17 00:00:00 2001 From: Reynold Xin Date: Wed, 20 Aug 2014 11:51:36 -0700 Subject: [PATCH 7/7] - Rename attempt to retry in UI. - Properly report stage failure in FetchFailed. --- .../org/apache/spark/scheduler/DAGScheduler.scala | 13 +++++++------ .../scala/org/apache/spark/ui/jobs/StageTable.scala | 2 +- 2 files changed, 8 insertions(+), 7 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala index 75843af639ad5..34131984570e4 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala @@ -915,17 +915,18 @@ class DAGScheduler( } val stage = stageIdToStage(task.stageId) - def markStageAsFinished(stage: Stage, isSuccessful: Boolean) = { + def markStageAsFinished(stage: Stage, errorMessage: Option[String] = None) = { val serviceTime = stage.latestInfo.submissionTime match { case Some(t) => "%.03f".format((clock.getTime() - t) / 1000.0) case _ => "Unknown" } - if (isSuccessful) { + if (errorMessage.isEmpty) { logInfo("%s (%s) finished in %s s".format(stage, stage.name, serviceTime)) + stage.latestInfo.completionTime = Some(clock.getTime()) } else { + stage.latestInfo.stageFailed(errorMessage.get) logInfo("%s (%s) failed in %s s".format(stage, stage.name, serviceTime)) } - stage.latestInfo.completionTime = Some(clock.getTime()) listenerBus.post(SparkListenerStageCompleted(stage.latestInfo)) runningStages -= stage } @@ -964,7 +965,7 @@ class DAGScheduler( job.numFinished += 1 // If the whole job has finished, remove it if (job.numFinished == job.numPartitions) { - markStageAsFinished(stage, isSuccessful = true) + markStageAsFinished(stage) cleanupStateForJobAndIndependentStages(job) listenerBus.post(SparkListenerJobEnd(job.jobId, JobSucceeded)) } @@ -993,7 +994,7 @@ class DAGScheduler( stage.addOutputLoc(smt.partitionId, status) } if (runningStages.contains(stage) && stage.pendingTasks.isEmpty) { - markStageAsFinished(stage, isSuccessful = true) + markStageAsFinished(stage) logInfo("looking for newly runnable stages") logInfo("running: " + runningStages) logInfo("waiting: " + waitingStages) @@ -1046,7 +1047,7 @@ class DAGScheduler( case FetchFailed(bmAddress, shuffleId, mapId, reduceId) => // Mark the stage that the reducer was in as unrunnable val failedStage = stageIdToStage(task.stageId) - markStageAsFinished(failedStage, isSuccessful = false) + markStageAsFinished(failedStage, Some("Fetch failure")) runningStages -= failedStage // TODO: Cancel running tasks in the stage logInfo("Marking " + failedStage + " (" + failedStage.name + diff --git a/core/src/main/scala/org/apache/spark/ui/jobs/StageTable.scala b/core/src/main/scala/org/apache/spark/ui/jobs/StageTable.scala index acc48345d5d7f..2e67310594784 100644 --- a/core/src/main/scala/org/apache/spark/ui/jobs/StageTable.scala +++ b/core/src/main/scala/org/apache/spark/ui/jobs/StageTable.scala @@ -155,7 +155,7 @@ private[ui] class StageTableBase( val shuffleWriteWithUnit = if (shuffleWrite > 0) Utils.bytesToString(shuffleWrite) else "" {if (s.attemptId > 0) { - + } else { }} ++
{s.stageId} (attempt {s.attemptId}){s.stageId} (retry {s.attemptId}){s.stageId}