Skip to content

Commit

Permalink
[SPARK-7627] [SPARK-7472] DAG visualization: style skipped stages
Browse files Browse the repository at this point in the history
This patch fixes two things:

**SPARK-7627.** Cached RDDs no longer light up on the job page. This is a simple fix.
**SPARK-7472.** Display skipped stages differently from normal stages.

The latter is a major UX issue. Because we link the job viz to the stage viz even for skipped stages, the user may inadvertently click into the stage page of a skipped stage, which is empty.

-------------------
<img src="https://cloud.githubusercontent.com/assets/2133137/7675241/de1a3da6-fcea-11e4-8101-88055cef78c5.png" width="300px" />

Author: Andrew Or <[email protected]>

Closes apache#6171 from andrewor14/dag-viz-skipped and squashes the following commits:

f261797 [Andrew Or] Merge branch 'master' of github.com:apache/spark into dag-viz-skipped
0eda358 [Andrew Or] Tweak skipped stage border color
c604150 [Andrew Or] Tweak grayscale colors
7010676 [Andrew Or] Merge branch 'master' of github.com:apache/spark into dag-viz-skipped
762b541 [Andrew Or] Use special prefix for stage clusters to avoid collisions
51c95b9 [Andrew Or] Merge branch 'master' of github.com:apache/spark into dag-viz-skipped
b928cd4 [Andrew Or] Fix potential leak + write tests for it
7c4c364 [Andrew Or] Show skipped stages differently
7cc34ce [Andrew Or] Merge branch 'master' of github.com:apache/spark into dag-viz-skipped
c121fa2 [Andrew Or] Fix cache color
  • Loading branch information
Andrew Or committed May 18, 2015
1 parent 814b3da commit 563bfcc
Show file tree
Hide file tree
Showing 6 changed files with 352 additions and 108 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -15,32 +15,21 @@
* limitations under the License.
*/

#dag-viz-graph svg path {
stroke: #444;
stroke-width: 1.5px;
}

#dag-viz-graph svg g.cluster rect {
stroke-width: 1px;
}

#dag-viz-graph svg g.node circle {
fill: #444;
#dag-viz-graph a, #dag-viz-graph a:hover {
text-decoration: none;
}

#dag-viz-graph svg g.node rect {
fill: #C3EBFF;
stroke: #3EC0FF;
stroke-width: 1px;
#dag-viz-graph .label {
font-weight: normal;
text-shadow: none;
}

#dag-viz-graph svg g.node.cached circle {
fill: #444;
#dag-viz-graph svg path {
stroke: #444;
stroke-width: 1.5px;
}

#dag-viz-graph svg g.node.cached rect {
fill: #B3F5C5;
stroke: #56F578;
#dag-viz-graph svg g.cluster rect {
stroke-width: 1px;
}

Expand All @@ -61,12 +50,23 @@
stroke-width: 1px;
}

#dag-viz-graph svg.job g.cluster[class*="stage"] rect {
#dag-viz-graph svg.job g.cluster.skipped rect {
fill: #D6D6D6;
stroke: #B7B7B7;
stroke-width: 1px;
}

#dag-viz-graph svg.job g.cluster.stage rect {
fill: #FFFFFF;
stroke: #FF99AC;
stroke-width: 1px;
}

#dag-viz-graph svg.job g.cluster.stage.skipped rect {
stroke: #ADADAD;
stroke-width: 1px;
}

#dag-viz-graph svg.job g#cross-stage-edges path {
fill: none;
}
Expand All @@ -75,6 +75,20 @@
fill: #333;
}

#dag-viz-graph svg.job g.cluster.skipped text {
fill: #666;
}

#dag-viz-graph svg.job g.node circle {
fill: #444;
}

#dag-viz-graph svg.job g.node.cached circle {
fill: #A3F545;
stroke: #52C366;
stroke-width: 2px;
}

/* Stage page specific styles */

#dag-viz-graph svg.stage g.cluster rect {
Expand All @@ -83,7 +97,7 @@
stroke-width: 1px;
}

#dag-viz-graph svg.stage g.cluster[class*="stage"] rect {
#dag-viz-graph svg.stage g.cluster.stage rect {
fill: #FFFFFF;
stroke: #FFA6B6;
stroke-width: 1px;
Expand All @@ -97,11 +111,14 @@
fill: #333;
}

#dag-viz-graph a, #dag-viz-graph a:hover {
text-decoration: none;
#dag-viz-graph svg.stage g.node rect {
fill: #C3EBFF;
stroke: #3EC0FF;
stroke-width: 1px;
}

#dag-viz-graph .label {
font-weight: normal;
text-shadow: none;
#dag-viz-graph svg.stage g.node.cached rect {
fill: #B3F5C5;
stroke: #52C366;
stroke-width: 2px;
}
50 changes: 32 additions & 18 deletions core/src/main/resources/org/apache/spark/ui/static/spark-dag-viz.js
Original file line number Diff line number Diff line change
Expand Up @@ -57,9 +57,7 @@ var VizConstants = {
stageSep: 40,
graphPrefix: "graph_",
nodePrefix: "node_",
stagePrefix: "stage_",
clusterPrefix: "cluster_",
stageClusterPrefix: "cluster_stage_"
clusterPrefix: "cluster_"
};

var JobPageVizConstants = {
Expand Down Expand Up @@ -133,9 +131,7 @@ function renderDagViz(forJob) {
}

// Render
var svg = graphContainer()
.append("svg")
.attr("class", jobOrStage);
var svg = graphContainer().append("svg").attr("class", jobOrStage);
if (forJob) {
renderDagVizForJob(svg);
} else {
Expand Down Expand Up @@ -185,23 +181,32 @@ function renderDagVizForJob(svgContainer) {
var dot = metadata.select(".dot-file").text();
var stageId = metadata.attr("stage-id");
var containerId = VizConstants.graphPrefix + stageId;
// Link each graph to the corresponding stage page (TODO: handle stage attempts)
var stageLink = $("#stage-" + stageId.replace(VizConstants.stagePrefix, "") + "-0")
.find("a")
.attr("href") + "&expandDagViz=true";
var container = svgContainer
.append("a")
.attr("xlink:href", stageLink)
.append("g")
.attr("id", containerId);
var isSkipped = metadata.attr("skipped") == "true";
var container;
if (isSkipped) {
container = svgContainer
.append("g")
.attr("id", containerId)
.attr("skipped", "true");
} else {
// Link each graph to the corresponding stage page (TODO: handle stage attempts)
// Use the link from the stage table so it also works for the history server
var attemptId = 0
var stageLink = d3.select("#stage-" + stageId + "-" + attemptId)
.select("a")
.attr("href") + "&expandDagViz=true";
container = svgContainer
.append("a")
.attr("xlink:href", stageLink)
.append("g")
.attr("id", containerId);
}

// Now we need to shift the container for this stage so it doesn't overlap with
// existing ones, taking into account the position and width of the last stage's
// container. We do not need to do this for the first stage of this job.
if (i > 0) {
var existingStages = svgContainer
.selectAll("g.cluster")
.filter("[class*=\"" + VizConstants.stageClusterPrefix + "\"]");
var existingStages = svgContainer.selectAll("g.cluster.stage")
if (!existingStages.empty()) {
var lastStage = d3.select(existingStages[0].pop());
var lastStageWidth = toFloat(lastStage.select("rect").attr("width"));
Expand All @@ -214,6 +219,12 @@ function renderDagVizForJob(svgContainer) {
// Actually render the stage
renderDot(dot, container, true);

// Mark elements as skipped if appropriate. Unfortunately we need to mark all
// elements instead of the parent container because of CSS override rules.
if (isSkipped) {
container.selectAll("g").classed("skipped", true);
}

// Round corners on rectangles
container
.selectAll("rect")
Expand Down Expand Up @@ -243,6 +254,9 @@ function renderDot(dot, container, forJob) {
var renderer = new dagreD3.render();
preprocessGraphLayout(g, forJob);
renderer(container, g);

// Find the stage cluster and mark it for styling and post-processing
container.selectAll("g.cluster[name*=\"Stage\"]").classed("stage", true);
}

/* -------------------- *
Expand Down
6 changes: 4 additions & 2 deletions core/src/main/scala/org/apache/spark/ui/UIUtils.scala
Original file line number Diff line number Diff line change
Expand Up @@ -352,10 +352,12 @@ private[spark] object UIUtils extends Logging {
</a>
</span>
<div id="dag-viz-graph"></div>
<div id="dag-viz-metadata">
<div id="dag-viz-metadata" style="display:none">
{
graphs.map { g =>
<div class="stage-metadata" stage-id={g.rootCluster.id} style="display:none">
val stageId = g.rootCluster.id.replaceAll(RDDOperationGraph.STAGE_CLUSTER_PREFIX, "")
val skipped = g.rootCluster.name.contains("skipped").toString
<div class="stage-metadata" stage-id={stageId} skipped={skipped}>
<div class="dot-file">{RDDOperationGraph.makeDotFile(g)}</div>
{ g.incomingEdges.map { e => <div class="incoming-edge">{e.fromId},{e.toId}</div> } }
{ g.outgoingEdges.map { e => <div class="outgoing-edge">{e.fromId},{e.toId}</div> } }
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,10 +52,13 @@ private[ui] case class RDDOperationEdge(fromId: Int, toId: Int)
* This represents any grouping of RDDs, including operation scopes (e.g. textFile, flatMap),
* stages, jobs, or any higher level construct. A cluster may be nested inside of other clusters.
*/
private[ui] class RDDOperationCluster(val id: String, val name: String) {
private[ui] class RDDOperationCluster(val id: String, private var _name: String) {
private val _childNodes = new ListBuffer[RDDOperationNode]
private val _childClusters = new ListBuffer[RDDOperationCluster]

def name: String = _name
def setName(n: String): Unit = { _name = n }

def childNodes: Seq[RDDOperationNode] = _childNodes.iterator.toSeq
def childClusters: Seq[RDDOperationCluster] = _childClusters.iterator.toSeq
def attachChildNode(childNode: RDDOperationNode): Unit = { _childNodes += childNode }
Expand All @@ -71,6 +74,8 @@ private[ui] class RDDOperationCluster(val id: String, val name: String) {

private[ui] object RDDOperationGraph extends Logging {

val STAGE_CLUSTER_PREFIX = "stage_"

/**
* Construct a RDDOperationGraph for a given stage.
*
Expand All @@ -88,7 +93,8 @@ private[ui] object RDDOperationGraph extends Logging {
val clusters = new mutable.HashMap[String, RDDOperationCluster] // indexed by cluster ID

// Root cluster is the stage cluster
val stageClusterId = s"stage_${stage.stageId}"
// Use a special prefix here to differentiate this cluster from other operation clusters
val stageClusterId = STAGE_CLUSTER_PREFIX + stage.stageId
val stageClusterName = s"Stage ${stage.stageId}" +
{ if (stage.attemptId == 0) "" else s" (attempt ${stage.attemptId})" }
val rootCluster = new RDDOperationCluster(stageClusterId, stageClusterName)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,15 @@ import org.apache.spark.ui.SparkUI
* A SparkListener that constructs a DAG of RDD operations.
*/
private[ui] class RDDOperationGraphListener(conf: SparkConf) extends SparkListener {

// Note: the fate of jobs and stages are tied. This means when we clean up a job,
// we always clean up all of its stages. Similarly, when we clean up a stage, we
// always clean up its job (and, transitively, other stages in the same job).
private[ui] val jobIdToStageIds = new mutable.HashMap[Int, Seq[Int]]
private[ui] val jobIdToSkippedStageIds = new mutable.HashMap[Int, Seq[Int]]
private[ui] val stageIdToJobId = new mutable.HashMap[Int, Int]
private[ui] val stageIdToGraph = new mutable.HashMap[Int, RDDOperationGraph]
private[ui] val completedStageIds = new mutable.HashSet[Int]

// Keep track of the order in which these are inserted so we can remove old ones
private[ui] val jobIds = new mutable.ArrayBuffer[Int]
Expand All @@ -40,16 +47,23 @@ private[ui] class RDDOperationGraphListener(conf: SparkConf) extends SparkListen
private val retainedStages =
conf.getInt("spark.ui.retainedStages", SparkUI.DEFAULT_RETAINED_STAGES)

/** Return the graph metadata for the given stage, or None if no such information exists. */
/**
* Return the graph metadata for all stages in the given job.
* An empty list is returned if one or more of its stages has been cleaned up.
*/
def getOperationGraphForJob(jobId: Int): Seq[RDDOperationGraph] = synchronized {
val _stageIds = jobIdToStageIds.get(jobId).getOrElse { Seq.empty }
val graphs = _stageIds.flatMap { sid => stageIdToGraph.get(sid) }
// If the metadata for some stages have been removed, do not bother rendering this job
if (_stageIds.size != graphs.size) {
Seq.empty
} else {
graphs
val skippedStageIds = jobIdToSkippedStageIds.get(jobId).getOrElse(Seq.empty)
val graphs = jobIdToStageIds.get(jobId)
.getOrElse(Seq.empty)
.flatMap { sid => stageIdToGraph.get(sid) }
// Mark any skipped stages as such
graphs.foreach { g =>
val stageId = g.rootCluster.id.replaceAll(RDDOperationGraph.STAGE_CLUSTER_PREFIX, "").toInt
if (skippedStageIds.contains(stageId) && !g.rootCluster.name.contains("skipped")) {
g.rootCluster.setName(g.rootCluster.name + " (skipped)")
}
}
graphs
}

/** Return the graph metadata for the given stage, or None if no such information exists. */
Expand All @@ -66,22 +80,68 @@ private[ui] class RDDOperationGraphListener(conf: SparkConf) extends SparkListen
jobIdToStageIds(jobId) = jobStart.stageInfos.map(_.stageId).sorted

stageInfos.foreach { stageInfo =>
stageIds += stageInfo.stageId
stageIdToGraph(stageInfo.stageId) = RDDOperationGraph.makeOperationGraph(stageInfo)
// Remove state for old stages
if (stageIds.size >= retainedStages) {
val toRemove = math.max(retainedStages / 10, 1)
stageIds.take(toRemove).foreach { id => stageIdToGraph.remove(id) }
stageIds.trimStart(toRemove)
}
val stageId = stageInfo.stageId
stageIds += stageId
stageIdToJobId(stageId) = jobId
stageIdToGraph(stageId) = RDDOperationGraph.makeOperationGraph(stageInfo)
trimStagesIfNecessary()
}

trimJobsIfNecessary()
}

/** Keep track of stages that have completed. */
override def onStageCompleted(stageCompleted: SparkListenerStageCompleted): Unit = synchronized {
val stageId = stageCompleted.stageInfo.stageId
if (stageIdToJobId.contains(stageId)) {
// Note: Only do this if the stage has not already been cleaned up
// Otherwise, we may never clean this stage from `completedStageIds`
completedStageIds += stageCompleted.stageInfo.stageId
}
}

/** On job end, find all stages in this job that are skipped and mark them as such. */
override def onJobEnd(jobEnd: SparkListenerJobEnd): Unit = synchronized {
val jobId = jobEnd.jobId
jobIdToStageIds.get(jobId).foreach { stageIds =>
val skippedStageIds = stageIds.filter { sid => !completedStageIds.contains(sid) }
// Note: Only do this if the job has not already been cleaned up
// Otherwise, we may never clean this job from `jobIdToSkippedStageIds`
jobIdToSkippedStageIds(jobId) = skippedStageIds
}
}

/** Clean metadata for old stages if we have exceeded the number to retain. */
private def trimStagesIfNecessary(): Unit = {
if (stageIds.size >= retainedStages) {
val toRemove = math.max(retainedStages / 10, 1)
stageIds.take(toRemove).foreach { id => cleanStage(id) }
stageIds.trimStart(toRemove)
}
}

// Remove state for old jobs
/** Clean metadata for old jobs if we have exceeded the number to retain. */
private def trimJobsIfNecessary(): Unit = {
if (jobIds.size >= retainedJobs) {
val toRemove = math.max(retainedJobs / 10, 1)
jobIds.take(toRemove).foreach { id => jobIdToStageIds.remove(id) }
jobIds.take(toRemove).foreach { id => cleanJob(id) }
jobIds.trimStart(toRemove)
}
}

/** Clean metadata for the given stage, its job, and all other stages that belong to the job. */
private[ui] def cleanStage(stageId: Int): Unit = {
completedStageIds.remove(stageId)
stageIdToGraph.remove(stageId)
stageIdToJobId.remove(stageId).foreach { jobId => cleanJob(jobId) }
}

/** Clean metadata for the given job and all stages that belong to it. */
private[ui] def cleanJob(jobId: Int): Unit = {
jobIdToSkippedStageIds.remove(jobId)
jobIdToStageIds.remove(jobId).foreach { stageIds =>
stageIds.foreach { stageId => cleanStage(stageId) }
}
}

}
Loading

0 comments on commit 563bfcc

Please sign in to comment.