From b587cf2410ba5e55f91eb3cb3b7478cbf816f5ce Mon Sep 17 00:00:00 2001 From: Kousuke Saruta Date: Fri, 1 May 2015 18:04:01 +0900 Subject: [PATCH] initial commit --- .../apache/spark/ui/static/timeline-view.css | 52 ++++- .../apache/spark/ui/static/timeline-view.js | 29 +++ .../org/apache/spark/ui/jobs/StagePage.scala | 202 +++++++++++++++++- 3 files changed, 278 insertions(+), 5 deletions(-) diff --git a/core/src/main/resources/org/apache/spark/ui/static/timeline-view.css b/core/src/main/resources/org/apache/spark/ui/static/timeline-view.css index 35ef14e5aaf1a..da7b7b910ad68 100644 --- a/core/src/main/resources/org/apache/spark/ui/static/timeline-view.css +++ b/core/src/main/resources/org/apache/spark/ui/static/timeline-view.css @@ -19,10 +19,60 @@ div#application-timeline, div#job-timeline { margin-bottom: 30px; } -#application-timeline div.legend-area { +#application-timeline div.legend-area, +#job-timeline div.legend-area { margin-top: 5px; } +#task-assignment-timeline div.legend-area { + float: right; + border: 1px solid #000000; +} + +#task-assignment-timeline .legend-area>svg { + width: 600px; + height: 80px; +} + +#task-assignment-timeline>div.control-panel { + float: left; +} + +#task-assignment-timeline>.timeline-header:after { + content: ""; + clear: both; + height: 0; + display: block; + visibility: hidden; +} + +#task-assignment-timeline div.item.range { + padding: 0px; + height: 40px; +} + +.task-assignment-timeline-content { + width: 100%; + height: 19px; +} + +.task-assignment-timeline-duration-bar { + width: 100%; + height: 19px; +} + +.vis.timeline .item.task.succeeded { + background-color: #D5DDF6; +} + +.vis.timeline .item.task.failed { + background-color: #FF5475; +} + +.vis.timeline .item.task.running { + background-color: #FDFFCA; +} + .vis.timeline div.content { width: 100%; } diff --git a/core/src/main/resources/org/apache/spark/ui/static/timeline-view.js b/core/src/main/resources/org/apache/spark/ui/static/timeline-view.js index e4a891d47f035..23f324115bbd5 100644 --- a/core/src/main/resources/org/apache/spark/ui/static/timeline-view.js +++ b/core/src/main/resources/org/apache/spark/ui/static/timeline-view.js @@ -140,6 +140,35 @@ function drawJobTimeline(groupArray, eventObjArray, startTime) { }); } +function drawTaskAssignmentTimeline(groupArray, eventObjArray, minLaunchTime, zoomMax) { + var groups = new vis.DataSet(groupArray); + var items = new vis.DataSet(eventObjArray); + var container = $("#task-assignment-timeline")[0] + var options = { + groupOrder: function(a, b) { + return a.value - b.value + }, + editable: false, + align: 'left', + selectable: false, + showCurrentTime: false, + min: minLaunchTime, + zoomable: false, + zoomMax: zoomMax + }; + + var taskTimeline = new vis.Timeline(container) + taskTimeline.setOptions(options); + taskTimeline.setGroups(groups); + taskTimeline.setItems(items); + var curEnd = taskTimeline.getWindow()["end"].getTime(); + if (curEnd - minLaunchTime > zoomMax) { + curEnd = minLaunchTime + zoomMax; + } + taskTimeline.setWindow(minLaunchTime, curEnd); + setupZoomable('#task-assignment-timeline-zoom-lock', taskTimeline); +} + function setupExecutorEventAction() { $(".item.box.executor").each(function () { $(this).hover( diff --git a/core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala b/core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala index 797c9404bc449..27edd5eec3d27 100644 --- a/core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala +++ b/core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala @@ -20,6 +20,7 @@ package org.apache.spark.ui.jobs import java.util.Date import javax.servlet.http.HttpServletRequest +import scala.collection.mutable.HashSet import scala.xml.{Elem, Node, Unparsed} import org.apache.commons.lang3.StringEscapeUtils @@ -208,10 +209,12 @@ private[ui] class StagePage(parent: StagesTab) extends WebUIPage("stage") { val unzipped = taskHeadersAndCssClasses.unzip + val currentTime = System.currentTimeMillis() val taskTable = UIUtils.listingTable( unzipped._1, taskRow(hasAccumulators, stageData.hasInput, stageData.hasOutput, - stageData.hasShuffleRead, stageData.hasShuffleWrite, stageData.hasBytesSpilled), + stageData.hasShuffleRead, stageData.hasShuffleWrite, + stageData.hasBytesSpilled, currentTime), tasks, headerClasses = unzipped._2) // Excludes tasks which failed and have incomplete metrics @@ -431,6 +434,147 @@ private[ui] class StagePage(parent: StagesTab) extends WebUIPage("stage") { val maybeAccumulableTable: Seq[Node] = if (accumulables.size > 0) {

Accumulators

++ accumulableTable } else Seq() + val executorsSet = new HashSet[(String, String)] + + var minLaunchTime = Long.MaxValue + var maxFinishTime = Long.MinValue + var numEffectiveTasks = 0 + val executorsArrayStr = stageData.taskData.flatMap { + case (_, taskUIData) => + val taskInfo = taskUIData.taskInfo + + val executorId = taskInfo.executorId + val host = taskInfo.host + executorsSet += ((executorId, host)) + + val taskId = taskInfo.taskId + val taskIdWithIndexAndAttempt = s"Task ${taskId}(${taskInfo.id})" + + val isSucceeded = taskInfo.successful + val isFailed = taskInfo.failed + val isRunning = taskInfo.running + val classNameByStatus = { + if (isSucceeded) { + "succeeded" + } else if (isFailed) { + "failed" + } else if (isRunning) { + "running" + } + } + + if (isSucceeded || isRunning || isFailed) { + val launchTime = taskInfo.launchTime + val finishTime = if (!isRunning) taskInfo.finishTime else currentTime + val totalExecutionTime = finishTime - launchTime + minLaunchTime = launchTime.min(minLaunchTime) + maxFinishTime = launchTime.max(maxFinishTime) + numEffectiveTasks += 1 + + val metricsOpt = taskUIData.taskMetrics + val shuffleReadTime = + metricsOpt.flatMap(_.shuffleReadMetrics.map(_.fetchWaitTime)).getOrElse(0L).toDouble + val shuffleReadTimeProportion = + (shuffleReadTime / totalExecutionTime * 100).toLong + val shuffleWriteTime = + metricsOpt.flatMap(_.shuffleWriteMetrics.map(_.shuffleWriteTime)).getOrElse(0L) / 1e6 + val shuffleWriteTimeProportion = + (shuffleWriteTime / totalExecutionTime * 100).toLong + val executorRuntimeProportion = + ((metricsOpt.map(_.executorRunTime).getOrElse(0L) - + shuffleReadTime - shuffleWriteTime) / totalExecutionTime * 100).toLong + val serializationTimeProportion = + (metricsOpt.map(_.resultSerializationTime).getOrElse(0L).toDouble / + totalExecutionTime * 100).toLong + val deserializationTimeProportion = + (metricsOpt.map(_.executorDeserializeTime).getOrElse(0L).toDouble / + totalExecutionTime * 100).toLong + val gettingResultTimeProportion = + (getGettingResultTime(taskUIData.taskInfo).toDouble / totalExecutionTime * 100).toLong + val schedulerDelayProportion = + 100 - executorRuntimeProportion - shuffleReadTimeProportion - + shuffleWriteTimeProportion - serializationTimeProportion - + deserializationTimeProportion - gettingResultTimeProportion + + val schedulerDelayProportionPos = 0 + val deserializationTimeProportionPos = + schedulerDelayProportionPos + schedulerDelayProportion + val shuffleReadTimeProportionPos = + deserializationTimeProportionPos + deserializationTimeProportion + val executorRuntimeProportionPos = + shuffleReadTimeProportionPos + shuffleReadTimeProportion + val shuffleWriteTimeProportionPos = + executorRuntimeProportionPos + executorRuntimeProportion + val serializationTimeProportionPos = + shuffleWriteTimeProportionPos + shuffleWriteTimeProportion + val gettingResultTimeProportionPos = + serializationTimeProportionPos + serializationTimeProportion + + val timelineObject = + s""" + |{ + | 'className': 'task task-assignment-timeline-object ${classNameByStatus}', + | 'group': '${executorId}', + | 'content': '
' + + | '${taskIdWithIndexAndAttempt}
' + + | '' + + | '' + + | '' + + | '' + + | '' + + | '' + + | '' + + | '', + | 'start': new Date(${launchTime}), + | 'end': new Date(${finishTime}), + | 'title': '${taskIdWithIndexAndAttempt}\\nStatus: ${taskInfo.status}\\n' + + | 'Launch Time: ${UIUtils.formatDate(new Date(launchTime))}' + + | '${ + if (!isRunning) { + s"""\\nFinish Time: ${UIUtils.formatDate(new Date(finishTime))}""" + } else { + "" + } + }' + |} + """.stripMargin + Option(timelineObject) + } else { + None + } + }.mkString("[", ",", "]") + + val groupArrayStr = executorsSet.map { + case (executorId, host) => + s""" + |{ + | 'id': '${executorId}', + | 'content': '${executorId} / ${host}', + |} + """.stripMargin + }.mkString("[", ",", "]") + + var maxWindowInSec = ((maxFinishTime - minLaunchTime) / 1000.0).round + if (maxWindowInSec <= 0) maxWindowInSec = 1 + val tasksPerSecond = numEffectiveTasks / maxWindowInSec + var maxZoom = { + if (tasksPerSecond > 100) { + 1000L / (tasksPerSecond / 100) + } + else { + 24L * 60 * 60 * 1000 + } + } + + if (maxZoom < 0) maxZoom = 1 + val content = summary ++ showAdditionalMetrics ++ @@ -438,21 +582,71 @@ private[ui] class StagePage(parent: StagesTab) extends WebUIPage("stage") {
{summaryTable.getOrElse("No tasks have reported metrics yet.")}
++

Aggregated Metrics by Executor

++ executorTable.toNodeSeq ++ maybeAccumulableTable ++ -

Tasks

++ taskTable +

Tasks

++ taskTable ++ +

Task Assignment Timeline

++ +
+
+ {taskAssignmentTimelineControlPanel ++ taskAssignmentTimelineLegend} +
+
++ + UIUtils.headerSparkPage("Details for Stage %d".format(stageId), content, parent) } } + private val taskAssignmentTimelineControlPanel: Seq[Node] = { +
+
+ + Zoom Lock +
+
+ } + + private val taskAssignmentTimelineLegend: Seq[Node] = { +
+ + + Succeeded Task + + Failed Task + + Running Task + { + val legendPairs = List(("#FFBDD8", "Task Deserialization Time"), + ("#8AC7DE", "Shuffle Read Time"), ("#D9EB52", "Executor Computing Time"), + ("#87796F", "Shuffle Write Time"), ("#93DFB8", "Result Serialization TIme"), + ("#FF9036", "Getting Result Time"), ("#F6D76B", "Scheduler Delay")) + + legendPairs.zipWithIndex.map { + case ((color, name), index) => + + {name} + } + } + +
+ } + def taskRow( hasAccumulators: Boolean, hasInput: Boolean, hasOutput: Boolean, hasShuffleRead: Boolean, hasShuffleWrite: Boolean, - hasBytesSpilled: Boolean)(taskData: TaskUIData): Seq[Node] = { + hasBytesSpilled: Boolean, + currentTime: Long)(taskData: TaskUIData): Seq[Node] = { taskData match { case TaskUIData(info, metrics, errorMessage) => - val duration = if (info.status == "RUNNING") info.timeRunning(System.currentTimeMillis()) + val duration = if (info.status == "RUNNING") info.timeRunning(currentTime) else metrics.map(_.executorRunTime).getOrElse(1L) val formatDuration = if (info.status == "RUNNING") UIUtils.formatDuration(duration) else metrics.map(m => UIUtils.formatDuration(m.executorRunTime)).getOrElse("")