Skip to content

Commit

Permalink
merge master
Browse files Browse the repository at this point in the history
  • Loading branch information
mengxr committed May 13, 2015
2 parents 697fdf9 + 77f64c7 commit 7bde7ae
Show file tree
Hide file tree
Showing 86 changed files with 4,358 additions and 660 deletions.
47 changes: 0 additions & 47 deletions core/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -381,35 +381,6 @@
<outputDirectory>target/scala-${scala.binary.version}/classes</outputDirectory>
<testOutputDirectory>target/scala-${scala.binary.version}/test-classes</testOutputDirectory>
<plugins>
<!-- Unzip py4j so we can include its files in the jar -->
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-antrun-plugin</artifactId>
<executions>
<execution>
<phase>generate-resources</phase>
<goals>
<goal>run</goal>
</goals>
</execution>
</executions>
<configuration>
<target>
<unzip src="../python/lib/py4j-0.8.2.1-src.zip" dest="../python/build" />
</target>
</configuration>
</plugin>
<plugin>
<artifactId>maven-clean-plugin</artifactId>
<configuration>
<filesets>
<fileset>
<directory>${basedir}/../python/build</directory>
</fileset>
</filesets>
<verbose>true</verbose>
</configuration>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-dependency-plugin</artifactId>
Expand Down Expand Up @@ -438,24 +409,6 @@
</executions>
</plugin>
</plugins>

<resources>
<resource>
<directory>src/main/resources</directory>
</resource>
<resource>
<directory>../python</directory>
<includes>
<include>pyspark/*.py</include>
</includes>
</resource>
<resource>
<directory>../python/build</directory>
<includes>
<include>py4j/*.py</include>
</includes>
</resource>
</resources>
</build>

<profiles>
Expand Down

Large diffs are not rendered by default.

80 changes: 33 additions & 47 deletions core/src/main/resources/org/apache/spark/ui/static/spark-dag-viz.js
Original file line number Diff line number Diff line change
Expand Up @@ -140,8 +140,6 @@ function renderDagViz(forJob) {
svg.selectAll("#" + nodeId).classed("cached", true);
});

// More post-processing
drawClusterLabels(svg, forJob);
resizeSvg(svg);
}

Expand All @@ -151,7 +149,7 @@ function renderDagVizForStage(svgContainer) {
var dot = metadata.select(".dot-file").text();
var containerId = VizConstants.graphPrefix + metadata.attr("stage-id");
var container = svgContainer.append("g").attr("id", containerId);
renderDot(dot, container, StagePageVizConstants.rankSep);
renderDot(dot, container, false);

// Round corners on rectangles
svgContainer
Expand Down Expand Up @@ -209,7 +207,7 @@ function renderDagVizForJob(svgContainer) {
}

// Actually render the stage
renderDot(dot, container, JobPageVizConstants.rankSep);
renderDot(dot, container, true);

// Round corners on rectangles
container
Expand All @@ -231,14 +229,14 @@ function renderDagVizForJob(svgContainer) {
}

/* Render the dot file as an SVG in the given container. */
function renderDot(dot, container, rankSep) {
function renderDot(dot, container, forJob) {
var escaped_dot = dot
.replace(/&lt;/g, "<")
.replace(/&gt;/g, ">")
.replace(/&quot;/g, "\"");
var g = graphlibDot.read(escaped_dot);
g.graph().rankSep = rankSep;
var renderer = new dagreD3.render();
preprocessGraphLayout(g, forJob);
renderer(container, g);
}

Expand All @@ -251,50 +249,38 @@ function graphContainer() { return d3.select("#dag-viz-graph"); }
function metadataContainer() { return d3.select("#dag-viz-metadata"); }

/*
* Helper function to create draw a label for each cluster.
*
* We need to do this manually because dagre-d3 does not support labeling clusters.
* In general, the clustering support for dagre-d3 is quite limited at this point.
* Helper function to pre-process the graph layout.
* This step is necessary for certain styles that affect the positioning
* and sizes of graph elements, e.g. padding, font style, shape.
*/
function drawClusterLabels(svgContainer, forJob) {
var clusterLabelSize, stageClusterLabelSize;
function preprocessGraphLayout(g, forJob) {
var nodes = g.nodes();
for (var i = 0; i < nodes.length; i++) {
var isCluster = g.children(nodes[i]).length > 0;
if (!isCluster) {
var node = g.node(nodes[i]);
if (forJob) {
// Do not display RDD name on job page
node.shape = "circle";
node.labelStyle = "font-size: 0px";
} else {
node.labelStyle = "font-size: 12px";
}
node.padding = "5";
}
}
// Curve the edges
var edges = g.edges();
for (var j = 0; j < edges.length; j++) {
var edge = g.edge(edges[j]);
edge.lineInterpolate = "basis";
}
// Adjust vertical separation between nodes
if (forJob) {
clusterLabelSize = JobPageVizConstants.clusterLabelSize;
stageClusterLabelSize = JobPageVizConstants.stageClusterLabelSize;
g.graph().rankSep = JobPageVizConstants.rankSep;
} else {
clusterLabelSize = StagePageVizConstants.clusterLabelSize;
stageClusterLabelSize = StagePageVizConstants.stageClusterLabelSize;
g.graph().rankSep = StagePageVizConstants.rankSep;
}
svgContainer.selectAll("g.cluster").each(function() {
var cluster = d3.select(this);
var isStage = cluster.attr("id").indexOf(VizConstants.stageClusterPrefix) > -1;
var labelSize = isStage ? stageClusterLabelSize : clusterLabelSize;
drawClusterLabel(cluster, labelSize);
});
}

/*
* Helper function to draw a label for the given cluster element based on its name.
*
* In the process, we need to expand the bounding box to make room for the label.
* We need to do this because dagre-d3 did not take this into account when it first
* rendered the bounding boxes. Note that this means we need to adjust the view box
* of the SVG afterwards since we shifted a few boxes around.
*/
function drawClusterLabel(d3cluster, fontSize) {
var cluster = d3cluster;
var rect = d3cluster.select("rect");
rect.attr("y", toFloat(rect.attr("y")) - fontSize);
rect.attr("height", toFloat(rect.attr("height")) + fontSize);
var labelX = toFloat(rect.attr("x")) + toFloat(rect.attr("width")) - fontSize / 2;
var labelY = toFloat(rect.attr("y")) + fontSize * 1.5;
var labelText = cluster.attr("name").replace(VizConstants.clusterPrefix, "");
cluster.append("text")
.attr("x", labelX)
.attr("y", labelY)
.attr("text-anchor", "end")
.style("font-size", fontSize + "px")
.text(labelText);
}

/*
Expand Down Expand Up @@ -444,7 +430,7 @@ function addTooltipsForRDDs(svgContainer) {
if (tooltipText) {
node.select("circle")
.attr("data-toggle", "tooltip")
.attr("data-placement", "right")
.attr("data-placement", "bottom")
.attr("title", tooltipText)
}
});
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ private[spark] object PythonUtils {
def sparkPythonPath: String = {
val pythonPath = new ArrayBuffer[String]
for (sparkHome <- sys.env.get("SPARK_HOME")) {
pythonPath += Seq(sparkHome, "python").mkString(File.separator)
pythonPath += Seq(sparkHome, "python", "lib", "pyspark.zip").mkString(File.separator)
pythonPath += Seq(sparkHome, "python", "lib", "py4j-0.8.2.1-src.zip").mkString(File.separator)
}
pythonPath ++= SparkContext.jarOfObject(this)
Expand All @@ -53,4 +53,11 @@ private[spark] object PythonUtils {
def toSeq[T](cols: JList[T]): Seq[T] = {
cols.toList.toSeq
}

/**
* Convert java map of K, V into Map of K, V (for calling API with varargs)
*/
def toScalaMap[K, V](jm: java.util.Map[K, V]): Map[K, V] = {
jm.toMap
}
}
52 changes: 41 additions & 11 deletions core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala
Original file line number Diff line number Diff line change
Expand Up @@ -22,22 +22,22 @@ import java.lang.reflect.Method
import java.security.PrivilegedExceptionAction
import java.util.{Arrays, Comparator}

import scala.collection.JavaConversions._
import scala.concurrent.duration._
import scala.language.postfixOps

import com.google.common.primitives.Longs
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.{FileStatus, FileSystem, Path, PathFilter}
import org.apache.hadoop.fs.FileSystem.Statistics
import org.apache.hadoop.fs.{FileStatus, FileSystem, Path, PathFilter}
import org.apache.hadoop.hdfs.security.token.delegation.DelegationTokenIdentifier
import org.apache.hadoop.mapred.JobConf
import org.apache.hadoop.mapreduce.JobContext
import org.apache.hadoop.security.{Credentials, UserGroupInformation}

import org.apache.spark.{Logging, SparkConf, SparkException}
import org.apache.spark.annotation.DeveloperApi
import org.apache.spark.util.Utils

import scala.collection.JavaConversions._
import scala.concurrent.duration._
import scala.language.postfixOps
import org.apache.spark.{Logging, SparkConf, SparkException}

/**
* :: DeveloperApi ::
Expand Down Expand Up @@ -199,13 +199,43 @@ class SparkHadoopUtil extends Logging {
* that file.
*/
def listLeafStatuses(fs: FileSystem, basePath: Path): Seq[FileStatus] = {
def recurse(path: Path): Array[FileStatus] = {
val (directories, leaves) = fs.listStatus(path).partition(_.isDir)
leaves ++ directories.flatMap(f => listLeafStatuses(fs, f.getPath))
listLeafStatuses(fs, fs.getFileStatus(basePath))
}

/**
* Get [[FileStatus]] objects for all leaf children (files) under the given base path. If the
* given path points to a file, return a single-element collection containing [[FileStatus]] of
* that file.
*/
def listLeafStatuses(fs: FileSystem, baseStatus: FileStatus): Seq[FileStatus] = {
def recurse(status: FileStatus): Seq[FileStatus] = {
val (directories, leaves) = fs.listStatus(status.getPath).partition(_.isDir)
leaves ++ directories.flatMap(f => listLeafStatuses(fs, f))
}

val baseStatus = fs.getFileStatus(basePath)
if (baseStatus.isDir) recurse(basePath) else Array(baseStatus)
if (baseStatus.isDir) recurse(baseStatus) else Seq(baseStatus)
}

def listLeafDirStatuses(fs: FileSystem, basePath: Path): Seq[FileStatus] = {
listLeafDirStatuses(fs, fs.getFileStatus(basePath))
}

def listLeafDirStatuses(fs: FileSystem, baseStatus: FileStatus): Seq[FileStatus] = {
def recurse(status: FileStatus): Seq[FileStatus] = {
val (directories, files) = fs.listStatus(status.getPath).partition(_.isDir)
val leaves = if (directories.isEmpty) Seq(status) else Seq.empty[FileStatus]
leaves ++ directories.flatMap(dir => listLeafDirStatuses(fs, dir))
}

assert(baseStatus.isDir)
recurse(baseStatus)
}

def globPath(pattern: Path): Seq[Path] = {
val fs = pattern.getFileSystem(conf)
Option(fs.globStatus(pattern)).map { statuses =>
statuses.map(_.getPath.makeQualified(fs.getUri, fs.getWorkingDirectory)).toSeq
}.getOrElse(Seq.empty[Path])
}

/**
Expand Down
16 changes: 9 additions & 7 deletions core/src/main/scala/org/apache/spark/rdd/RDD.scala
Original file line number Diff line number Diff line change
Expand Up @@ -1523,13 +1523,15 @@ abstract class RDD[T: ClassTag](
* has completed (therefore the RDD has been materialized and potentially stored in memory).
* doCheckpoint() is called recursively on the parent RDDs.
*/
private[spark] def doCheckpoint() {
if (!doCheckpointCalled) {
doCheckpointCalled = true
if (checkpointData.isDefined) {
checkpointData.get.doCheckpoint()
} else {
dependencies.foreach(_.rdd.doCheckpoint())
private[spark] def doCheckpoint(): Unit = {
RDDOperationScope.withScope(sc, "checkpoint", false, true) {
if (!doCheckpointCalled) {
doCheckpointCalled = true
if (checkpointData.isDefined) {
checkpointData.get.doCheckpoint()
} else {
dependencies.foreach(_.rdd.doCheckpoint())
}
}
}
}
Expand Down
14 changes: 8 additions & 6 deletions core/src/main/scala/org/apache/spark/ui/UIUtils.scala
Original file line number Diff line number Diff line change
Expand Up @@ -194,11 +194,7 @@ private[spark] object UIUtils extends Logging {
<a href={prependBaseUri(activeTab.basePath, "/" + tab.prefix + "/")}>{tab.name}</a>
</li>
}
val helpButton: Seq[Node] = helpText.map { helpText =>
<sup>
(<a data-toggle="tooltip" data-placement="bottom" title={helpText}>?</a>)
</sup>
}.getOrElse(Seq.empty)
val helpButton: Seq[Node] = helpText.map(tooltip(_, "bottom")).getOrElse(Seq.empty)

<html>
<head>
Expand Down Expand Up @@ -360,7 +356,7 @@ private[spark] object UIUtils extends Logging {
{
graphs.map { g =>
<div class="stage-metadata" stage-id={g.rootCluster.id} style="display:none">
<div class="dot-file">{RDDOperationGraph.makeDotFile(g, forJob)}</div>
<div class="dot-file">{RDDOperationGraph.makeDotFile(g)}</div>
{ g.incomingEdges.map { e => <div class="incoming-edge">{e.fromId},{e.toId}</div> } }
{ g.outgoingEdges.map { e => <div class="outgoing-edge">{e.fromId},{e.toId}</div> } }
{
Expand All @@ -375,6 +371,12 @@ private[spark] object UIUtils extends Logging {
</div>
}

def tooltip(text: String, position: String): Seq[Node] = {
<sup>
(<a data-toggle="tooltip" data-placement={position} title={text}>?</a>)
</sup>
}

/** Return a script element that automatically expands the DAG visualization on page load. */
def expandDagVizOnLoad(forJob: Boolean): Seq[Node] = {
<script type="text/javascript">
Expand Down
15 changes: 14 additions & 1 deletion core/src/main/scala/org/apache/spark/ui/jobs/StageTable.scala
Original file line number Diff line number Diff line change
Expand Up @@ -121,10 +121,23 @@ private[ui] class StageTableBase(
<div>{stageDesc.getOrElse("")} {killLink} {nameLink} {details}</div>
}

protected def missingStageRow(stageId: Int): Seq[Node] = {
<td>{stageId}</td> ++
{if (isFairScheduler) {<td>-</td>} else Seq.empty} ++
<td>No data available for this stage</td> ++ // Description
<td></td> ++ // Submitted
<td></td> ++ // Duration
<td></td> ++ // Tasks: Succeeded/Total
<td></td> ++ // Input
<td></td> ++ // Output
<td></td> ++ // Shuffle Read
<td></td> // Shuffle Write
}

protected def stageRow(s: StageInfo): Seq[Node] = {
val stageDataOption = listener.stageIdToData.get((s.stageId, s.attemptId))
if (stageDataOption.isEmpty) {
return <td>{s.stageId}</td><td>No data available for this stage</td>
return missingStageRow(s.stageId)
}

val stageData = stageDataOption.get
Expand Down
Loading

0 comments on commit 7bde7ae

Please sign in to comment.