diff --git a/core/src/main/resources/org/apache/spark/ui/static/timeline-view.css b/core/src/main/resources/org/apache/spark/ui/static/timeline-view.css index 1ba344e09500d..4a39dfa93eb45 100644 --- a/core/src/main/resources/org/apache/spark/ui/static/timeline-view.css +++ b/core/src/main/resources/org/apache/spark/ui/static/timeline-view.css @@ -34,10 +34,6 @@ div#application-timeline, div#job-timeline { height: 80px; } -#task-assignment-timeline>div.control-panel { - float: left; -} - #task-assignment-timeline>.timeline-header:after { content: ""; clear: both; @@ -61,18 +57,61 @@ div#application-timeline, div#job-timeline { height: 19px; } +rect.scheduler-delay-proportion { + fill: #F6D76B; +} + +rect.deserialization-time-proportion { + fill: #FFBDD8; +} + +rect.shuffle-read-time-proportion { + fill: #8AC7DE; +} + +rect.executor-runtime-proportion { + fill: #D9EB52; +} + +rect.shuffle-write-time-proportion { + fill: #87796F; +} + +rect.serialization-time-proportion { + fill: #93DFB8; +} + +rect.getting-result-time-proportion { + fill: #FF9036; +} + .vis.timeline .item.task.succeeded { background-color: #D5DDF6; } +.legend-area rect.succeeded-task-legend { + fill: #D5DDF6; + stroke: #97B0F8; +} + .vis.timeline .item.task.failed { background-color: #FF5475; } +.legend-area rect.failed-task-legend { + fill: #FF5475; + stroke: #97B0F8; +} + .vis.timeline .item.task.running { background-color: #FDFFCA; } +.legend-area rect.running-task-legend { + fill: #FDFFCA; + stroke: #97B0F8; +} + #application-timeline div.legend-area { margin-top: 5px; } @@ -215,11 +254,20 @@ tr.corresponding-item-hover>td, tr.corresponding-item-hover>th { display: none; } +#task-assignment-timeline.collapsed { + display: none; +} + .control-panel { margin-bottom: 5px; } -span.expand-application-timeline, span.expand-job-timeline { +#task-assignment-timeline .control-panel { + float: left; +} + +span.expand-application-timeline, span.expand-job-timeline, +span.expand-task-assignment-timeline { cursor: pointer; } diff --git a/core/src/main/resources/org/apache/spark/ui/static/timeline-view.js b/core/src/main/resources/org/apache/spark/ui/static/timeline-view.js index 93de3bc6b7adb..3083111b88938 100644 --- a/core/src/main/resources/org/apache/spark/ui/static/timeline-view.js +++ b/core/src/main/resources/org/apache/spark/ui/static/timeline-view.js @@ -168,6 +168,41 @@ function drawTaskAssignmentTimeline(groupArray, eventObjArray, minLaunchTime, zo } taskTimeline.setWindow(minLaunchTime, curEnd); setupZoomable('#task-assignment-timeline-zoom-lock', taskTimeline); + + function setupTaskEventAction() { + $(".item.range.task.task-assignment-timeline-object").each(function() { + var getTaskIdx = function(baseElem) { + var taskIdxText = $($(baseElem).find(".task-assignment-timeline-content")[0]).text(); + var taskIdx = taskIdxText.match("Task (\\d+)\\(")[1]; + return taskIdx; + }; + + $(this).hover( + function() { + var id = getTaskIdx(this); + $($(this).find("div.task-assignment-timeline-content")[0]).tooltip("show"); + }, + function() { + var id = getTaskIdx(this); + $($(this).find("div.task-assignment-timeline-content")[0]) + .tooltip("hide"); + } + ); + }); + } + + setupTaskEventAction(); + taskTimeline.on("rangechanged", function(properties) { + setupTaskEventAction(); + }); + + $("span.expand-task-assignment-timeline").click(function() { + $("#task-assignment-timeline").toggleClass('collapsed'); + + // Switch the class of the arrow from open to closed. + $(this).find('.expand-task-assignment-timeline-arrow').toggleClass('arrow-open'); + $(this).find('.expand-task-assignment-timeline-arrow').toggleClass('arrow-closed'); + }); } function setupExecutorEventAction() { diff --git a/core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala b/core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala index 27edd5eec3d27..b41196b9238af 100644 --- a/core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala +++ b/core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala @@ -35,6 +35,42 @@ import org.apache.spark.scheduler.{AccumulableInfo, TaskInfo} private[ui] class StagePage(parent: StagesTab) extends WebUIPage("stage") { private val listener = parent.listener + private val TIMELINE_LEGEND = { +
+ + + Succeeded + + Failed + + Running + { + val legendPairs = List(("deserialization-time-proportion", "Task Deserialization Time"), + ("shuffle-read-time-proportion", "Shuffle Read Time"), + ("executor-runtime-proportion", "Executor Computing Time"), + ("shuffle-write-time-proportion", "Shuffle Write Time"), + ("serialization-time-proportion", "Result Serialization TIme"), + ("getting-result-time-proportion", "Getting Result Time"), + ("scheduler-delay-proportion", "Scheduler Delay")) + + legendPairs.zipWithIndex.map { + case ((classAttr, name), index) => + + {name} + } + } + +
+ } + + + + def render(request: HttpServletRequest): Seq[Node] = { listener.synchronized { val parameterId = request.getParameter("id") @@ -434,207 +470,199 @@ private[ui] class StagePage(parent: StagesTab) extends WebUIPage("stage") { val maybeAccumulableTable: Seq[Node] = if (accumulables.size > 0) {

Accumulators

++ accumulableTable } else Seq() - val executorsSet = new HashSet[(String, String)] - - var minLaunchTime = Long.MaxValue - var maxFinishTime = Long.MinValue - var numEffectiveTasks = 0 - val executorsArrayStr = stageData.taskData.flatMap { - case (_, taskUIData) => - val taskInfo = taskUIData.taskInfo - - val executorId = taskInfo.executorId - val host = taskInfo.host - executorsSet += ((executorId, host)) - - val taskId = taskInfo.taskId - val taskIdWithIndexAndAttempt = s"Task ${taskId}(${taskInfo.id})" - - val isSucceeded = taskInfo.successful - val isFailed = taskInfo.failed - val isRunning = taskInfo.running - val classNameByStatus = { - if (isSucceeded) { - "succeeded" - } else if (isFailed) { - "failed" - } else if (isRunning) { - "running" - } - } - - if (isSucceeded || isRunning || isFailed) { - val launchTime = taskInfo.launchTime - val finishTime = if (!isRunning) taskInfo.finishTime else currentTime - val totalExecutionTime = finishTime - launchTime - minLaunchTime = launchTime.min(minLaunchTime) - maxFinishTime = launchTime.max(maxFinishTime) - numEffectiveTasks += 1 - - val metricsOpt = taskUIData.taskMetrics - val shuffleReadTime = - metricsOpt.flatMap(_.shuffleReadMetrics.map(_.fetchWaitTime)).getOrElse(0L).toDouble - val shuffleReadTimeProportion = - (shuffleReadTime / totalExecutionTime * 100).toLong - val shuffleWriteTime = - metricsOpt.flatMap(_.shuffleWriteMetrics.map(_.shuffleWriteTime)).getOrElse(0L) / 1e6 - val shuffleWriteTimeProportion = - (shuffleWriteTime / totalExecutionTime * 100).toLong - val executorRuntimeProportion = - ((metricsOpt.map(_.executorRunTime).getOrElse(0L) - - shuffleReadTime - shuffleWriteTime) / totalExecutionTime * 100).toLong - val serializationTimeProportion = - (metricsOpt.map(_.resultSerializationTime).getOrElse(0L).toDouble / - totalExecutionTime * 100).toLong - val deserializationTimeProportion = - (metricsOpt.map(_.executorDeserializeTime).getOrElse(0L).toDouble / - totalExecutionTime * 100).toLong - val gettingResultTimeProportion = - (getGettingResultTime(taskUIData.taskInfo).toDouble / totalExecutionTime * 100).toLong - val schedulerDelayProportion = - 100 - executorRuntimeProportion - shuffleReadTimeProportion - - shuffleWriteTimeProportion - serializationTimeProportion - - deserializationTimeProportion - gettingResultTimeProportion - - val schedulerDelayProportionPos = 0 - val deserializationTimeProportionPos = - schedulerDelayProportionPos + schedulerDelayProportion - val shuffleReadTimeProportionPos = - deserializationTimeProportionPos + deserializationTimeProportion - val executorRuntimeProportionPos = - shuffleReadTimeProportionPos + shuffleReadTimeProportion - val shuffleWriteTimeProportionPos = - executorRuntimeProportionPos + executorRuntimeProportion - val serializationTimeProportionPos = - shuffleWriteTimeProportionPos + shuffleWriteTimeProportion - val gettingResultTimeProportionPos = - serializationTimeProportionPos + serializationTimeProportion - - val timelineObject = - s""" - |{ - | 'className': 'task task-assignment-timeline-object ${classNameByStatus}', - | 'group': '${executorId}', - | 'content': '
' + - | '${taskIdWithIndexAndAttempt}
' + - | '' + - | '' + - | '' + - | '' + - | '' + - | '' + - | '' + - | '', - | 'start': new Date(${launchTime}), - | 'end': new Date(${finishTime}), - | 'title': '${taskIdWithIndexAndAttempt}\\nStatus: ${taskInfo.status}\\n' + - | 'Launch Time: ${UIUtils.formatDate(new Date(launchTime))}' + - | '${ - if (!isRunning) { - s"""\\nFinish Time: ${UIUtils.formatDate(new Date(finishTime))}""" - } else { - "" - } - }' - |} - """.stripMargin - Option(timelineObject) - } else { - None - } - }.mkString("[", ",", "]") - - val groupArrayStr = executorsSet.map { - case (executorId, host) => - s""" - |{ - | 'id': '${executorId}', - | 'content': '${executorId} / ${host}', - |} - """.stripMargin - }.mkString("[", ",", "]") - - var maxWindowInSec = ((maxFinishTime - minLaunchTime) / 1000.0).round - if (maxWindowInSec <= 0) maxWindowInSec = 1 - val tasksPerSecond = numEffectiveTasks / maxWindowInSec - var maxZoom = { - if (tasksPerSecond > 100) { - 1000L / (tasksPerSecond / 100) - } - else { - 24L * 60 * 60 * 1000 - } - } - - if (maxZoom < 0) maxZoom = 1 - val content = summary ++ showAdditionalMetrics ++ + makeTimeline(stageData.taskData.values.toSeq, currentTime) ++

Summary Metrics for {numCompleted} Completed Tasks

++
{summaryTable.getOrElse("No tasks have reported metrics yet.")}
++

Aggregated Metrics by Executor

++ executorTable.toNodeSeq ++ maybeAccumulableTable ++ -

Tasks

++ taskTable ++ -

Task Assignment Timeline

++ -
-
- {taskAssignmentTimelineControlPanel ++ taskAssignmentTimelineLegend} -
-
++ - - +

Tasks

++ taskTable UIUtils.headerSparkPage("Details for Stage %d".format(stageId), content, parent) } } - private val taskAssignmentTimelineControlPanel: Seq[Node] = { -
-
- - Zoom Lock -
-
- } + def makeTimeline(tasks: Seq[TaskUIData], currentTime: Long): Seq[Node] = { + val executorsSet = new HashSet[(String, String)] + + var minLaunchTime = Long.MaxValue + var maxFinishTime = Long.MinValue + var numEffectiveTasks = 0 + + val executorsArrayStr = tasks.filter { taskUIData => + val taskInfo = taskUIData.taskInfo + taskInfo.successful || taskInfo.running || taskInfo.failed + }.map { taskUIData => + val taskInfo = taskUIData.taskInfo + val executorId = taskInfo.executorId + val host = taskInfo.host + executorsSet += ((executorId, host)) + + val taskId = taskInfo.taskId + val taskIdWithIndexAndAttempt = s"Task ${taskId}(${taskInfo.id})" + + val isSucceeded = taskInfo.successful + val isFailed = taskInfo.failed + val isRunning = taskInfo.running + val classNameByStatus = { + if (isSucceeded) { + "succeeded" + } else if (isFailed) { + "failed" + } else if (isRunning) { + "running" + } + } - private val taskAssignmentTimelineLegend: Seq[Node] = { -
- - - Succeeded Task - - Failed Task - - Running Task - { - val legendPairs = List(("#FFBDD8", "Task Deserialization Time"), - ("#8AC7DE", "Shuffle Read Time"), ("#D9EB52", "Executor Computing Time"), - ("#87796F", "Shuffle Write Time"), ("#93DFB8", "Result Serialization TIme"), - ("#FF9036", "Getting Result Time"), ("#F6D76B", "Scheduler Delay")) + val launchTime = taskInfo.launchTime + val finishTime = if (!isRunning) taskInfo.finishTime else currentTime + val totalExecutionTime = finishTime - launchTime + minLaunchTime = launchTime.min(minLaunchTime) + maxFinishTime = launchTime.max(maxFinishTime) + numEffectiveTasks += 1 + + val metricsOpt = taskUIData.taskMetrics + val shuffleReadTime = + metricsOpt.flatMap(_.shuffleReadMetrics.map(_.fetchWaitTime)).getOrElse(0L) + val shuffleReadTimeProportion = + (shuffleReadTime.toDouble / totalExecutionTime * 100).toLong + val shuffleWriteTime = + (metricsOpt.flatMap(_.shuffleWriteMetrics + .map(_.shuffleWriteTime)).getOrElse(0L) / 1e6).toLong + val shuffleWriteTimeProportion = + (shuffleWriteTime.toDouble / totalExecutionTime * 100).toLong + val executorComputingTime = metricsOpt.map(_.executorRunTime).getOrElse(0L) - + shuffleReadTime - shuffleWriteTime + val executorComputingTimeProportion = + (executorComputingTime / totalExecutionTime * 100).toLong + val serializationTime = metricsOpt.map(_.resultSerializationTime).getOrElse(0L) + val serializationTimeProportion = + (serializationTime.toDouble / totalExecutionTime * 100).toLong + val deserializationTime = metricsOpt.map(_.executorDeserializeTime).getOrElse(0L) + val deserializationTimeProportion = + (deserializationTime.toDouble / totalExecutionTime * 100).toLong + val gettingResultTime = getGettingResultTime(taskUIData.taskInfo) + val gettingResultTimeProportion = + (gettingResultTime.toDouble / totalExecutionTime * 100).toLong + val schedulerDelay = totalExecutionTime - + (executorComputingTime + shuffleReadTime + shuffleWriteTime + + serializationTime + deserializationTime + gettingResultTime) + val schedulerDelayProportion = + 100 - executorComputingTimeProportion - shuffleReadTimeProportion - + shuffleWriteTimeProportion - serializationTimeProportion - + deserializationTimeProportion - gettingResultTimeProportion + + val schedulerDelayProportionPos = 0 + val deserializationTimeProportionPos = + schedulerDelayProportionPos + schedulerDelayProportion + val shuffleReadTimeProportionPos = + deserializationTimeProportionPos + deserializationTimeProportion + val executorRuntimeProportionPos = + shuffleReadTimeProportionPos + shuffleReadTimeProportion + val shuffleWriteTimeProportionPos = + executorRuntimeProportionPos + executorComputingTimeProportion + val serializationTimeProportionPos = + shuffleWriteTimeProportionPos + shuffleWriteTimeProportion + val gettingResultTimeProportionPos = + serializationTimeProportionPos + serializationTimeProportion + + val timelineObject = + s""" + |{ + | 'className': 'task task-assignment-timeline-object ${classNameByStatus}', + | 'group': '${executorId}', + | 'content': '
Finish Time: ${UIUtils.formatDate(new Date(finishTime))}""" + } else { + "" + } + }' + + '
Scheduler Delay: ${schedulerDelay} ms' + + '
Task Deserialization Time: ${UIUtils.formatDuration(deserializationTime)}' + + '
Shuffle Read Time: ${UIUtils.formatDuration(shuffleReadTime)}' + + '
Executor Computing Time: ${UIUtils.formatDuration(executorComputingTime)}' + + '
Shuffle Write Time: ${UIUtils.formatDuration(shuffleWriteTime)}' + + '
Result Serialization Time: ${UIUtils.formatDuration(serializationTime)}' + + '
Getting Result Time: ${UIUtils.formatDuration(gettingResultTime)}">' + + | '${taskIdWithIndexAndAttempt}
' + + | '' + + | '' + + | '' + + | '' + + | '' + + | '' + + | '' + + | '', + | 'start': new Date(${launchTime}), + | 'end': new Date(${finishTime}) + |} + """.stripMargin + timelineObject + }.mkString("[", ",", "]") + + val groupArrayStr = executorsSet.map { + case (executorId, host) => + s""" + |{ + | 'id': '${executorId}', + | 'content': '${executorId} / ${host}', + |} + """.stripMargin + }.mkString("[", ",", "]") + + var maxWindowInSec = ((maxFinishTime - minLaunchTime) / 1000.0).round + if (maxWindowInSec <= 0) maxWindowInSec = 1 + val tasksPerSecond = numEffectiveTasks / maxWindowInSec + var maxZoom = { + if (tasksPerSecond > 100) { + 1000L / (tasksPerSecond / 100) + } + else { + 24L * 60 * 60 * 1000 + } + } - legendPairs.zipWithIndex.map { - case ((color, name), index) => - - {name} - } - } - -
+ if (maxZoom < 0) maxZoom = 1 + + + + Event Timeline + ++ + ++ + } def taskRow(