Skip to content

Commit

Permalink
Merge remote-tracking branch 'asf/master' into eventlog-download
Browse files Browse the repository at this point in the history
Conflicts:
	core/src/main/scala/org/apache/spark/deploy/history/HistoryServer.scala
	core/src/main/scala/org/apache/spark/deploy/master/ui/MasterWebUI.scala
	core/src/main/scala/org/apache/spark/status/api/v1/ApiRootResource.scala
	core/src/main/scala/org/apache/spark/ui/SparkUI.scala
  • Loading branch information
harishreedharan committed May 21, 2015
2 parents fd6ab00 + 8730fbb commit 350d7e8
Show file tree
Hide file tree
Showing 254 changed files with 9,815 additions and 3,512 deletions.
2 changes: 1 addition & 1 deletion LICENSE
Original file line number Diff line number Diff line change
Expand Up @@ -861,7 +861,7 @@ The following components are provided under a BSD-style license. See project lin

(BSD 3 Clause) core (com.github.fommil.netlib:core:1.1.2 - https://github.com/fommil/netlib-java/core)
(BSD 3 Clause) JPMML-Model (org.jpmml:pmml-model:1.1.15 - https://github.com/jpmml/jpmml-model)
(BSD 3-clause style license) jblas (org.jblas:jblas:1.2.3 - http://jblas.org/)
(BSD 3-clause style license) jblas (org.jblas:jblas:1.2.4 - http://jblas.org/)
(BSD License) AntLR Parser Generator (antlr:antlr:2.7.7 - http://www.antlr.org/)
(BSD License) Javolution (javolution:javolution:5.5.1 - http://javolution.org)
(BSD licence) ANTLR ST4 4.0.4 (org.antlr:ST4:4.0.4 - http://www.stringtemplate.org)
Expand Down
10 changes: 5 additions & 5 deletions R/pkg/inst/tests/test_sparkSQL.R
Original file line number Diff line number Diff line change
Expand Up @@ -757,12 +757,12 @@ test_that("parquetFile works with multiple input paths", {
test_that("describe() on a DataFrame", {
df <- jsonFile(sqlCtx, jsonPath)
stats <- describe(df, "age")
expect_true(collect(stats)[1, "summary"] == "count")
expect_true(collect(stats)[2, "age"] == 24.5)
expect_true(collect(stats)[3, "age"] == 5.5)
expect_equal(collect(stats)[1, "summary"], "count")
expect_equal(collect(stats)[2, "age"], "24.5")
expect_equal(collect(stats)[3, "age"], "5.5")
stats <- describe(df)
expect_true(collect(stats)[4, "name"] == "Andy")
expect_true(collect(stats)[5, "age"] == 30.0)
expect_equal(collect(stats)[4, "name"], "Andy")
expect_equal(collect(stats)[5, "age"], "30")
})

unlink(parquetPath)
Expand Down

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -15,32 +15,21 @@
* limitations under the License.
*/

#dag-viz-graph svg path {
stroke: #444;
stroke-width: 1.5px;
}

#dag-viz-graph svg g.cluster rect {
stroke-width: 1px;
}

#dag-viz-graph svg g.node circle {
fill: #444;
#dag-viz-graph a, #dag-viz-graph a:hover {
text-decoration: none;
}

#dag-viz-graph svg g.node rect {
fill: #C3EBFF;
stroke: #3EC0FF;
stroke-width: 1px;
#dag-viz-graph .label {
font-weight: normal;
text-shadow: none;
}

#dag-viz-graph svg g.node.cached circle {
fill: #444;
#dag-viz-graph svg path {
stroke: #444;
stroke-width: 1.5px;
}

#dag-viz-graph svg g.node.cached rect {
fill: #B3F5C5;
stroke: #56F578;
#dag-viz-graph svg g.cluster rect {
stroke-width: 1px;
}

Expand All @@ -61,12 +50,23 @@
stroke-width: 1px;
}

#dag-viz-graph svg.job g.cluster[class*="stage"] rect {
#dag-viz-graph svg.job g.cluster.skipped rect {
fill: #D6D6D6;
stroke: #B7B7B7;
stroke-width: 1px;
}

#dag-viz-graph svg.job g.cluster.stage rect {
fill: #FFFFFF;
stroke: #FF99AC;
stroke-width: 1px;
}

#dag-viz-graph svg.job g.cluster.stage.skipped rect {
stroke: #ADADAD;
stroke-width: 1px;
}

#dag-viz-graph svg.job g#cross-stage-edges path {
fill: none;
}
Expand All @@ -75,6 +75,20 @@
fill: #333;
}

#dag-viz-graph svg.job g.cluster.skipped text {
fill: #666;
}

#dag-viz-graph svg.job g.node circle {
fill: #444;
}

#dag-viz-graph svg.job g.node.cached circle {
fill: #A3F545;
stroke: #52C366;
stroke-width: 2px;
}

/* Stage page specific styles */

#dag-viz-graph svg.stage g.cluster rect {
Expand All @@ -83,7 +97,7 @@
stroke-width: 1px;
}

#dag-viz-graph svg.stage g.cluster[class*="stage"] rect {
#dag-viz-graph svg.stage g.cluster.stage rect {
fill: #FFFFFF;
stroke: #FFA6B6;
stroke-width: 1px;
Expand All @@ -97,11 +111,14 @@
fill: #333;
}

#dag-viz-graph a, #dag-viz-graph a:hover {
text-decoration: none;
#dag-viz-graph svg.stage g.node rect {
fill: #C3EBFF;
stroke: #3EC0FF;
stroke-width: 1px;
}

#dag-viz-graph .label {
font-weight: normal;
text-shadow: none;
#dag-viz-graph svg.stage g.node.cached rect {
fill: #B3F5C5;
stroke: #52C366;
stroke-width: 2px;
}
50 changes: 32 additions & 18 deletions core/src/main/resources/org/apache/spark/ui/static/spark-dag-viz.js
Original file line number Diff line number Diff line change
Expand Up @@ -57,9 +57,7 @@ var VizConstants = {
stageSep: 40,
graphPrefix: "graph_",
nodePrefix: "node_",
stagePrefix: "stage_",
clusterPrefix: "cluster_",
stageClusterPrefix: "cluster_stage_"
clusterPrefix: "cluster_"
};

var JobPageVizConstants = {
Expand Down Expand Up @@ -133,9 +131,7 @@ function renderDagViz(forJob) {
}

// Render
var svg = graphContainer()
.append("svg")
.attr("class", jobOrStage);
var svg = graphContainer().append("svg").attr("class", jobOrStage);
if (forJob) {
renderDagVizForJob(svg);
} else {
Expand Down Expand Up @@ -185,23 +181,32 @@ function renderDagVizForJob(svgContainer) {
var dot = metadata.select(".dot-file").text();
var stageId = metadata.attr("stage-id");
var containerId = VizConstants.graphPrefix + stageId;
// Link each graph to the corresponding stage page (TODO: handle stage attempts)
var stageLink = $("#stage-" + stageId.replace(VizConstants.stagePrefix, "") + "-0")
.find("a")
.attr("href") + "&expandDagViz=true";
var container = svgContainer
.append("a")
.attr("xlink:href", stageLink)
.append("g")
.attr("id", containerId);
var isSkipped = metadata.attr("skipped") == "true";
var container;
if (isSkipped) {
container = svgContainer
.append("g")
.attr("id", containerId)
.attr("skipped", "true");
} else {
// Link each graph to the corresponding stage page (TODO: handle stage attempts)
// Use the link from the stage table so it also works for the history server
var attemptId = 0
var stageLink = d3.select("#stage-" + stageId + "-" + attemptId)
.select("a")
.attr("href") + "&expandDagViz=true";
container = svgContainer
.append("a")
.attr("xlink:href", stageLink)
.append("g")
.attr("id", containerId);
}

// Now we need to shift the container for this stage so it doesn't overlap with
// existing ones, taking into account the position and width of the last stage's
// container. We do not need to do this for the first stage of this job.
if (i > 0) {
var existingStages = svgContainer
.selectAll("g.cluster")
.filter("[class*=\"" + VizConstants.stageClusterPrefix + "\"]");
var existingStages = svgContainer.selectAll("g.cluster.stage")
if (!existingStages.empty()) {
var lastStage = d3.select(existingStages[0].pop());
var lastStageWidth = toFloat(lastStage.select("rect").attr("width"));
Expand All @@ -214,6 +219,12 @@ function renderDagVizForJob(svgContainer) {
// Actually render the stage
renderDot(dot, container, true);

// Mark elements as skipped if appropriate. Unfortunately we need to mark all
// elements instead of the parent container because of CSS override rules.
if (isSkipped) {
container.selectAll("g").classed("skipped", true);
}

// Round corners on rectangles
container
.selectAll("rect")
Expand Down Expand Up @@ -243,6 +254,9 @@ function renderDot(dot, container, forJob) {
var renderer = new dagreD3.render();
preprocessGraphLayout(g, forJob);
renderer(container, g);

// Find the stage cluster and mark it for styling and post-processing
container.selectAll("g.cluster[name*=\"Stage\"]").classed("stage", true);
}

/* -------------------- *
Expand Down
2 changes: 1 addition & 1 deletion core/src/main/scala/org/apache/spark/SparkConf.scala
Original file line number Diff line number Diff line change
Expand Up @@ -509,7 +509,7 @@ private[spark] object SparkConf extends Logging {
AlternateConfig("spark.reducer.maxMbInFlight", "1.4")),
"spark.kryoserializer.buffer" ->
Seq(AlternateConfig("spark.kryoserializer.buffer.mb", "1.4",
translation = s => s"${s.toDouble * 1000}k")),
translation = s => s"${(s.toDouble * 1000).toInt}k")),
"spark.kryoserializer.buffer.max" -> Seq(
AlternateConfig("spark.kryoserializer.buffer.max.mb", "1.4")),
"spark.shuffle.file.buffer" -> Seq(
Expand Down
78 changes: 75 additions & 3 deletions core/src/main/scala/org/apache/spark/SparkContext.scala
Original file line number Diff line number Diff line change
Expand Up @@ -678,7 +678,7 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli
*
* Note: Return statements are NOT allowed in the given body.
*/
private def withScope[U](body: => U): U = RDDOperationScope.withScope[U](this)(body)
private[spark] def withScope[U](body: => U): U = RDDOperationScope.withScope[U](this)(body)

// Methods for creating RDDs

Expand All @@ -697,6 +697,78 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli
new ParallelCollectionRDD[T](this, seq, numSlices, Map[Int, Seq[String]]())
}

/**
* Creates a new RDD[Long] containing elements from `start` to `end`(exclusive), increased by
* `step` every element.
*
* @note if we need to cache this RDD, we should make sure each partition does not exceed limit.
*
* @param start the start value.
* @param end the end value.
* @param step the incremental step
* @param numSlices the partition number of the new RDD.
* @return
*/
def range(
start: Long,
end: Long,
step: Long = 1,
numSlices: Int = defaultParallelism): RDD[Long] = withScope {
assertNotStopped()
// when step is 0, range will run infinitely
require(step != 0, "step cannot be 0")
val numElements: BigInt = {
val safeStart = BigInt(start)
val safeEnd = BigInt(end)
if ((safeEnd - safeStart) % step == 0 || safeEnd > safeStart ^ step > 0) {
(safeEnd - safeStart) / step
} else {
// the remainder has the same sign with range, could add 1 more
(safeEnd - safeStart) / step + 1
}
}
parallelize(0 until numSlices, numSlices).mapPartitionsWithIndex((i, _) => {
val partitionStart = (i * numElements) / numSlices * step + start
val partitionEnd = (((i + 1) * numElements) / numSlices) * step + start
def getSafeMargin(bi: BigInt): Long =
if (bi.isValidLong) {
bi.toLong
} else if (bi > 0) {
Long.MaxValue
} else {
Long.MinValue
}
val safePartitionStart = getSafeMargin(partitionStart)
val safePartitionEnd = getSafeMargin(partitionEnd)

new Iterator[Long] {
private[this] var number: Long = safePartitionStart
private[this] var overflow: Boolean = false

override def hasNext =
if (!overflow) {
if (step > 0) {
number < safePartitionEnd
} else {
number > safePartitionEnd
}
} else false

override def next() = {
val ret = number
number += step
if (number < ret ^ step < 0) {
// we have Long.MaxValue + Long.MaxValue < Long.MaxValue
// and Long.MinValue + Long.MinValue > Long.MinValue, so iff the step causes a step
// back, we are pretty sure that we have an overflow.
overflow = true
}
ret
}
}
})
}

/** Distribute a local Scala collection to form an RDD.
*
* This method is identical to `parallelize`.
Expand Down Expand Up @@ -1087,8 +1159,8 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli
kcf: () => WritableConverter[K], vcf: () => WritableConverter[V]): RDD[(K, V)] = {
withScope {
assertNotStopped()
val kc = kcf()
val vc = vcf()
val kc = clean(kcf)()
val vc = clean(vcf)()
val format = classOf[SequenceFileInputFormat[Writable, Writable]]
val writables = hadoopFile(path, format,
kc.writableClass(km).asInstanceOf[Class[Writable]],
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ private[spark] class PythonRDD(
pythonIncludes: JList[String],
preservePartitoning: Boolean,
pythonExec: String,
pythonVer: String,
broadcastVars: JList[Broadcast[PythonBroadcast]],
accumulator: Accumulator[JList[Array[Byte]]])
extends RDD[Array[Byte]](parent) {
Expand Down Expand Up @@ -210,6 +211,8 @@ private[spark] class PythonRDD(
val dataOut = new DataOutputStream(stream)
// Partition index
dataOut.writeInt(split.index)
// Python version of driver
PythonRDD.writeUTF(pythonVer, dataOut)
// sparkFilesDir
PythonRDD.writeUTF(SparkFiles.getRootDirectory, dataOut)
// Python includes (*.zip and *.egg files)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,8 +50,15 @@ private[spark] object PythonUtils {
/**
* Convert list of T into seq of T (for calling API with varargs)
*/
def toSeq[T](cols: JList[T]): Seq[T] = {
cols.toList.toSeq
def toSeq[T](vs: JList[T]): Seq[T] = {
vs.toList.toSeq
}

/**
* Convert list of T into array of T (for calling API with array)
*/
def toArray[T](vs: JList[T]): Array[T] = {
vs.toArray().asInstanceOf[Array[T]]
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,8 @@ import org.eclipse.jetty.servlet.{ServletContextHandler, ServletHolder}

import org.apache.spark.{Logging, SecurityManager, SparkConf}
import org.apache.spark.deploy.SparkHadoopUtil
import org.apache.spark.status.api.v1.{ApplicationInfo, ApplicationsListResource, ApiRootResource, UIRoot}
import org.apache.spark.status.api.v1.{ApiRootResource, ApplicationInfo, ApplicationsListResource,
UIRoot}
import org.apache.spark.ui.{SparkUI, UIUtils, WebUI}
import org.apache.spark.ui.JettyUtils._
import org.apache.spark.util.{SignalLogger, Utils}
Expand Down Expand Up @@ -126,7 +127,7 @@ class HistoryServer(
def initialize() {
attachPage(new HistoryPage(this))

attachHandler(ApiRootResource.getApiServlet(this))
attachHandler(ApiRootResource.getServletHandler(this))

attachHandler(createStaticHandler(SparkUI.STATIC_RESOURCE_DIR, "/static"))

Expand Down
Loading

0 comments on commit 350d7e8

Please sign in to comment.