Skip to content

Commit

Permalink
Removed stage timeline
Browse files Browse the repository at this point in the history
  • Loading branch information
sarutak committed Apr 22, 2015
1 parent fc1696c commit aeed4b1
Show file tree
Hide file tree
Showing 3 changed files with 4 additions and 279 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -23,63 +23,10 @@ div#application-timeline, div#job-timeline {
margin-top: 20px;
}

#task-assignment-timeline div.legend-area {
float: right;
border: 1px solid #000000;
}

#task-assignment-timeline .legend-area>svg {
width: 600px;
height: 80px;
}

#task-assignment-timeline>div.control-panel {
float: left;
}

.control-panel input+span {
cursor: pointer;
}

#task-assignment-timeline>.timeline-header:after {
content: "";
clear: both;
height: 0;
display: block;
visibility: hidden;
}

#task-assignment-timeline div.item.range {
padding: 0px;
height: 40px;
}

.task-assignment-timeline-content {
width: 100%;
height: 19px;
}

.task-assignment-timeline-duration-bar {
width: 100%;
height: 19px;
}

.vis.timeline div.content {
width: 100%;
}

.vis.timeline .item.task.succeeded {
background-color: #D5DDF6;
}

.vis.timeline .item.task.failed {
background-color: #FF5475;
}

.vis.timeline .item.task.running {
background-color: #FDFFCA;
}

.vis.timeline .item.stage.succeeded {
background-color: #D5DDF6;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,34 +57,6 @@ function drawJobTimeline(groupArray, eventObjArray) {
setupZoomable("#job-timeline-zoom-lock", jobTimeline);
}

function drawTaskAssignmentTimeline(groupArray, eventObjArray, minLaunchTime, zoomMax) {
var groups = new vis.DataSet(groupArray);
var items = new vis.DataSet(eventObjArray);
var container = $("#task-assignment-timeline")[0]
var options = {
groupOrder: function(a, b) {
return a.value - b.value
},
editable: false,
align: 'left',
selectable: false,
showCurrentTime: false,
zoomable: false,
zoomMax: zoomMax
};

var taskTimeline = new vis.Timeline(container)
taskTimeline.setOptions(options);
taskTimeline.setGroups(groups);
taskTimeline.setItems(items);
var curEnd = taskTimeline.getWindow()["end"].getTime();
if (curEnd - minLaunchTime > zoomMax) {
curEnd = minLaunchTime + zoomMax;
}
taskTimeline.setWindow(minLaunchTime, curEnd);
setupZoomable('#task-assignment-timeline-zoom-lock', taskTimeline);
}

function setupZoomable(id, timeline) {
$(id + '>input[type="checkbox"]').click(function() {
if (this.checked) {
Expand Down
202 changes: 4 additions & 198 deletions core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@ package org.apache.spark.ui.jobs
import java.util.Date
import javax.servlet.http.HttpServletRequest

import scala.collection.mutable.HashSet
import scala.xml.{Elem, Node, Unparsed}

import org.apache.commons.lang3.StringEscapeUtils
Expand Down Expand Up @@ -209,12 +208,10 @@ private[ui] class StagePage(parent: StagesTab) extends WebUIPage("stage") {

val unzipped = taskHeadersAndCssClasses.unzip

val currentTime = System.currentTimeMillis()
val taskTable = UIUtils.listingTable(
unzipped._1,
taskRow(hasAccumulators, stageData.hasInput, stageData.hasOutput,
stageData.hasShuffleRead, stageData.hasShuffleWrite,
stageData.hasBytesSpilled, currentTime),
stageData.hasShuffleRead, stageData.hasShuffleWrite, stageData.hasBytesSpilled),
tasks,
headerClasses = unzipped._2)
// Excludes tasks which failed and have incomplete metrics
Expand Down Expand Up @@ -434,219 +431,28 @@ private[ui] class StagePage(parent: StagesTab) extends WebUIPage("stage") {
val maybeAccumulableTable: Seq[Node] =
if (accumulables.size > 0) { <h4>Accumulators</h4> ++ accumulableTable } else Seq()

val executorsSet = new HashSet[(String, String)]

var minLaunchTime = Long.MaxValue
var maxFinishTime = Long.MinValue
var numEffectiveTasks = 0
val executorsArrayStr = stageData.taskData.flatMap {
case (_, taskUIData) =>
val taskInfo = taskUIData.taskInfo

val executorId = taskInfo.executorId
val host = taskInfo.host
executorsSet += ((executorId, host))

val taskId = taskInfo.taskId
val taskIdWithIndexAndAttempt = s"Task ${taskId}(${taskInfo.id})"

val isSucceeded = taskInfo.successful
val isFailed = taskInfo.failed
val isRunning = taskInfo.running
val classNameByStatus = {
if (isSucceeded) {
"succeeded"
} else if (isFailed) {
"failed"
} else if (isRunning) {
"running"
}
}

if (isSucceeded || isRunning || isFailed) {
val launchTime = taskInfo.launchTime
val finishTime = if (!isRunning) taskInfo.finishTime else currentTime
val totalExecutionTime = finishTime - launchTime
minLaunchTime = launchTime.min(minLaunchTime)
maxFinishTime = launchTime.max(maxFinishTime)
numEffectiveTasks += 1

val metricsOpt = taskUIData.taskMetrics
val shuffleReadTime =
metricsOpt.flatMap(_.shuffleReadMetrics.map(_.fetchWaitTime)).getOrElse(0L).toDouble
val shuffleReadTimeProportion =
(shuffleReadTime / totalExecutionTime * 100).toLong
val shuffleWriteTime =
metricsOpt.flatMap(_.shuffleWriteMetrics.map(_.shuffleWriteTime)).getOrElse(0L) / 1e6
val shuffleWriteTimeProportion =
(shuffleWriteTime / totalExecutionTime * 100).toLong
val executorRuntimeProportion =
((metricsOpt.map(_.executorRunTime).getOrElse(0L) -
shuffleReadTime - shuffleWriteTime) / totalExecutionTime * 100).toLong
val serializationTimeProportion =
(metricsOpt.map(_.resultSerializationTime).getOrElse(0L).toDouble /
totalExecutionTime * 100).toLong
val deserializationTimeProportion =
(metricsOpt.map(_.executorDeserializeTime).getOrElse(0L).toDouble /
totalExecutionTime * 100).toLong
val gettingResultTimeProportion =
(getGettingResultTime(taskUIData.taskInfo).toDouble / totalExecutionTime * 100).toLong
val schedulerDelayProportion =
100 - executorRuntimeProportion - shuffleReadTimeProportion -
shuffleWriteTimeProportion - serializationTimeProportion -
deserializationTimeProportion - gettingResultTimeProportion

val schedulerDelayProportionPos = 0
val deserializationTimeProportionPos =
schedulerDelayProportionPos + schedulerDelayProportion
val shuffleReadTimeProportionPos =
deserializationTimeProportionPos + deserializationTimeProportion
val executorRuntimeProportionPos =
shuffleReadTimeProportionPos + shuffleReadTimeProportion
val shuffleWriteTimeProportionPos =
executorRuntimeProportionPos + executorRuntimeProportion
val serializationTimeProportionPos =
shuffleWriteTimeProportionPos + shuffleWriteTimeProportion
val gettingResultTimeProportionPos =
serializationTimeProportionPos + serializationTimeProportion

val timelineObject =
s"""
|{
| 'className': 'task task-assignment-timeline-object ${classNameByStatus}',
| 'group': '${executorId}',
| 'content': '<div class="task-assignment-timeline-content">' +
| '${taskIdWithIndexAndAttempt}</div>' +
| '<svg class="task-assignment-timeline-duration-bar">' +
| '<rect x="${schedulerDelayProportionPos}%" y="0" height="100%"' +
| 'width="${schedulerDelayProportion}%" fill="#F6D76B"></rect>' +
| '<rect x="${deserializationTimeProportionPos}%" y="0" height="100%"' +
| 'width="${deserializationTimeProportion}%" fill="#FFBDD8"></rect>' +
| '<rect x="${shuffleReadTimeProportionPos}%" y="0" height="100%"' +
| 'width="${shuffleReadTimeProportion}%" fill="#8AC7DE"></rect>' +
| '<rect x="${executorRuntimeProportionPos}%" y="0" height="100%"' +
| 'width="${executorRuntimeProportion}%" fill="#D9EB52"></rect>' +
| '<rect x="${shuffleWriteTimeProportionPos}%" y="0" height="100%"' +
| 'width="${shuffleWriteTimeProportion}%" fill="#87796F"></rect>' +
| '<rect x="${serializationTimeProportionPos}%" y="0" height="100%"' +
| 'width="${serializationTimeProportion}%" fill="#93DFB8"></rect>' +
| '<rect x="${gettingResultTimeProportionPos}%" y="0" height="100%"' +
| 'width="${gettingResultTimeProportion}%" fill="#FF9036"></rect></svg>',
| 'start': new Date(${launchTime}),
| 'end': new Date(${finishTime}),
| 'title': '${taskIdWithIndexAndAttempt}\\nStatus: ${taskInfo.status}\\n' +
| 'Launch Time: ${UIUtils.formatDate(new Date(launchTime))}' +
| '${
if (!isRunning) {
s"""\\nFinish Time: ${UIUtils.formatDate(new Date(finishTime))}"""
} else {
""
}
}'
|}
""".stripMargin
Option(timelineObject)
} else {
None
}
}.mkString("[", ",", "]")

val groupArrayStr = executorsSet.map {
case (executorId, host) =>
s"""
|{
| 'id': '${executorId}',
| 'content': '${executorId} / ${host}',
|}
""".stripMargin
}.mkString("[", ",", "]")

var maxWindowInSec = ((maxFinishTime - minLaunchTime) / 1000.0).round
if (maxWindowInSec <= 0) maxWindowInSec = 1
val tasksPerSecond = numEffectiveTasks / maxWindowInSec
var maxZoom = {
if (tasksPerSecond > 100) {
1000L / (tasksPerSecond / 100)
}
else {
24L * 60 * 60 * 1000
}
}

if (maxZoom < 0) maxZoom = 1

val content =
summary ++
showAdditionalMetrics ++
<h4>Summary Metrics for {numCompleted} Completed Tasks</h4> ++
<div>{summaryTable.getOrElse("No tasks have reported metrics yet.")}</div> ++
<h4>Aggregated Metrics by Executor</h4> ++ executorTable.toNodeSeq ++
maybeAccumulableTable ++
<h4>Tasks</h4> ++ taskTable ++
<h4>Task Assignment Timeline</h4> ++
<div id="task-assignment-timeline">
<div class="timeline-header">
{taskAssignmentTimelineControlPanel ++ taskAssignmentTimelineLegend}
</div>
</div> ++
<script type="text/javascript">
{Unparsed(s"drawTaskAssignmentTimeline(" +
s"${groupArrayStr}, ${executorsArrayStr}, ${minLaunchTime}, ${maxZoom})")}
</script>
<h4>Tasks</h4> ++ taskTable

UIUtils.headerSparkPage("Details for Stage %d".format(stageId), content, parent)
}
}

private val taskAssignmentTimelineControlPanel: Seq[Node] = {
<div class="control-panel">
<div id="task-assignment-timeline-zoom-lock">
<input type="checkbox" checked="checked"></input>
<span>Zoom Lock</span>
</div>
</div>
}

private val taskAssignmentTimelineLegend: Seq[Node] = {
<div class="legend-area">
<svg>
<rect x="5px" y="5px" width="20px"
height="15px" rx="2px" fill="#D5DDF6" stroke="#97B0F8"></rect>
<text x="35px" y="17px">Succeeded Task</text>
<rect x="215px" y="5px" width="20px"
height="15px" rx="2px" fill="#FF5475" stroke="#97B0F8"></rect>
<text x="245px" y="17px">Failed Task</text>
<rect x="425px" y="5px" width="20px"
height="15px" rx="2px" fill="#FDFFCA" stroke="#97B0F8"></rect>
<text x="455px" y="17px">Running Task</text>
{
val legendPairs = List(("#FFBDD8", "Task Deserialization Time"),
("#8AC7DE", "Shuffle Read Time"), ("#D9EB52", "Executor Computing Time"),
("#87796F", "Shuffle Write Time"), ("#93DFB8", "Result Serialization TIme"),
("#FF9036", "Getting Result Time"), ("#F6D76B", "Scheduler Delay"))

legendPairs.zipWithIndex.map {
case ((color, name), index) =>
<rect x={5 + (index / 3) * 210 + "px"} y={35 + (index % 3) * 15 + "px"}
width="10px" height="10px" fill={color}></rect>
<text x={25 + (index / 3) * 210 + "px"}
y={45 + (index % 3) * 15 + "px"}>{name}</text>
}
}
</svg>
</div>
}

def taskRow(
hasAccumulators: Boolean,
hasInput: Boolean,
hasOutput: Boolean,
hasShuffleRead: Boolean,
hasShuffleWrite: Boolean,
hasBytesSpilled: Boolean,
currentTime: Long)(taskData: TaskUIData): Seq[Node] = {
hasBytesSpilled: Boolean)(taskData: TaskUIData): Seq[Node] = {
taskData match { case TaskUIData(info, metrics, errorMessage) =>
val duration = if (info.status == "RUNNING") info.timeRunning(currentTime)
val duration = if (info.status == "RUNNING") info.timeRunning(System.currentTimeMillis())
else metrics.map(_.executorRunTime).getOrElse(1L)
val formatDuration = if (info.status == "RUNNING") UIUtils.formatDuration(duration)
else metrics.map(m => UIUtils.formatDuration(m.executorRunTime)).getOrElse("")
Expand Down

0 comments on commit aeed4b1

Please sign in to comment.