Skip to content

Commit

Permalink
Merge remote-tracking branch 'apache/master' into SPARK-7752
Browse files Browse the repository at this point in the history
  • Loading branch information
mengxr committed May 21, 2015
2 parents 40ae53e + a70bf06 commit 711d1c6
Show file tree
Hide file tree
Showing 58 changed files with 1,522 additions and 291 deletions.
4 changes: 2 additions & 2 deletions core/src/main/scala/org/apache/spark/SparkContext.scala
Original file line number Diff line number Diff line change
Expand Up @@ -1159,8 +1159,8 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli
kcf: () => WritableConverter[K], vcf: () => WritableConverter[V]): RDD[(K, V)] = {
withScope {
assertNotStopped()
val kc = kcf()
val vc = vcf()
val kc = clean(kcf)()
val vc = clean(vcf)()
val format = classOf[SequenceFileInputFormat[Writable, Writable]]
val writables = hadoopFile(path, format,
kc.writableClass(km).asInstanceOf[Class[Writable]],
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,8 @@ import org.eclipse.jetty.servlet.{ServletContextHandler, ServletHolder}

import org.apache.spark.{Logging, SecurityManager, SparkConf}
import org.apache.spark.deploy.SparkHadoopUtil
import org.apache.spark.status.api.v1.{ApplicationInfo, ApplicationsListResource, JsonRootResource, UIRoot}
import org.apache.spark.status.api.v1.{ApiRootResource, ApplicationInfo, ApplicationsListResource,
UIRoot}
import org.apache.spark.ui.{SparkUI, UIUtils, WebUI}
import org.apache.spark.ui.JettyUtils._
import org.apache.spark.util.{SignalLogger, Utils}
Expand Down Expand Up @@ -125,7 +126,7 @@ class HistoryServer(
def initialize() {
attachPage(new HistoryPage(this))

attachHandler(JsonRootResource.getJsonServlet(this))
attachHandler(ApiRootResource.getServletHandler(this))

attachHandler(createStaticHandler(SparkUI.STATIC_RESOURCE_DIR, "/static"))

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,8 @@ package org.apache.spark.deploy.master.ui

import org.apache.spark.Logging
import org.apache.spark.deploy.master.Master
import org.apache.spark.status.api.v1.{ApplicationsListResource, ApplicationInfo, JsonRootResource, UIRoot}
import org.apache.spark.status.api.v1.{ApiRootResource, ApplicationsListResource, ApplicationInfo,
UIRoot}
import org.apache.spark.ui.{SparkUI, WebUI}
import org.apache.spark.ui.JettyUtils._
import org.apache.spark.util.RpcUtils
Expand Down Expand Up @@ -47,7 +48,7 @@ class MasterWebUI(val master: Master, requestedPort: Int)
attachPage(new HistoryNotFoundPage(this))
attachPage(masterPage)
attachHandler(createStaticHandler(MasterWebUI.STATIC_RESOURCE_DIR, "/static"))
attachHandler(JsonRootResource.getJsonServlet(this))
attachHandler(ApiRootResource.getServletHandler(this))
attachHandler(createRedirectHandler(
"/app/kill", "/", masterPage.handleAppKillRequest, httpMethods = Set("POST")))
attachHandler(createRedirectHandler(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -296,6 +296,7 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)])
* before sending results to a reducer, similarly to a "combiner" in MapReduce.
*/
def reduceByKeyLocally(func: (V, V) => V): Map[K, V] = self.withScope {
val cleanedF = self.sparkContext.clean(func)

if (keyClass.isArray) {
throw new SparkException("reduceByKeyLocally() does not support array keys")
Expand All @@ -305,15 +306,15 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)])
val map = new JHashMap[K, V]
iter.foreach { pair =>
val old = map.get(pair._1)
map.put(pair._1, if (old == null) pair._2 else func(old, pair._2))
map.put(pair._1, if (old == null) pair._2 else cleanedF(old, pair._2))
}
Iterator(map)
} : Iterator[JHashMap[K, V]]

val mergeMaps = (m1: JHashMap[K, V], m2: JHashMap[K, V]) => {
m2.foreach { pair =>
val old = m1.get(pair._1)
m1.put(pair._1, if (old == null) pair._2 else func(old, pair._2))
m1.put(pair._1, if (old == null) pair._2 else cleanedF(old, pair._2))
}
m1
} : JHashMap[K, V]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ import scala.util.control.NonFatal

import org.apache.spark.Logging

private[serializer] object SerializationDebugger extends Logging {
private[spark] object SerializationDebugger extends Logging {

/**
* Improve the given NotSerializableException with the serialization path leading from the given
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ import org.apache.spark.ui.SparkUI
* HistoryServerSuite.
*/
@Path("/v1")
private[v1] class JsonRootResource extends UIRootFromServletContext {
private[v1] class ApiRootResource extends UIRootFromServletContext {

@Path("applications")
def getApplicationList(): ApplicationListResource = {
Expand Down Expand Up @@ -166,11 +166,11 @@ private[v1] class JsonRootResource extends UIRootFromServletContext {

}

private[spark] object JsonRootResource {
private[spark] object ApiRootResource {

def getJsonServlet(uiRoot: UIRoot): ServletContextHandler = {
def getServletHandler(uiRoot: UIRoot): ServletContextHandler = {
val jerseyContext = new ServletContextHandler(ServletContextHandler.NO_SESSIONS)
jerseyContext.setContextPath("/json")
jerseyContext.setContextPath("/api")
val holder:ServletHolder = new ServletHolder(classOf[ServletContainer])
holder.setInitParameter("com.sun.jersey.config.property.resourceConfigClass",
"com.sun.jersey.api.core.PackagesResourceConfig")
Expand Down
5 changes: 3 additions & 2 deletions core/src/main/scala/org/apache/spark/ui/SparkUI.scala
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,8 @@ package org.apache.spark.ui

import java.util.Date

import org.apache.spark.status.api.v1.{ApplicationAttemptInfo, ApplicationInfo, JsonRootResource, UIRoot}
import org.apache.spark.status.api.v1.{ApiRootResource, ApplicationAttemptInfo, ApplicationInfo,
UIRoot}
import org.apache.spark.{Logging, SecurityManager, SparkConf, SparkContext}
import org.apache.spark.scheduler._
import org.apache.spark.storage.StorageStatusListener
Expand Down Expand Up @@ -64,7 +65,7 @@ private[spark] class SparkUI private (
attachTab(new ExecutorsTab(this))
attachHandler(createStaticHandler(SparkUI.STATIC_RESOURCE_DIR, "/static"))
attachHandler(createRedirectHandler("/", "/jobs", basePath = basePath))
attachHandler(JsonRootResource.getJsonServlet(this))
attachHandler(ApiRootResource.getServletHandler(this))
// This should be POST only, but, the YARN AM proxy won't proxy POSTs
attachHandler(createRedirectHandler(
"/stages/stage/kill", "/stages", stagesTab.handleKillRequest,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import scala.reflect.ClassTag;
import scala.runtime.AbstractFunction1;

import com.google.common.collect.Iterators;
import com.google.common.collect.HashMultiset;
import com.google.common.io.ByteStreams;
import org.junit.After;
Expand Down Expand Up @@ -252,6 +253,20 @@ public void doNotNeedToCallWriteBeforeUnsuccessfulStop() throws IOException {
createWriter(false).stop(false);
}

@Test
public void writeEmptyIterator() throws Exception {
final UnsafeShuffleWriter<Object, Object> writer = createWriter(true);
writer.write(Iterators.<Product2<Object, Object>>emptyIterator());
final Option<MapStatus> mapStatus = writer.stop(true);
assertTrue(mapStatus.isDefined());
assertTrue(mergedOutputFile.exists());
assertArrayEquals(new long[NUM_PARTITITONS], partitionSizesInMergedFile);
assertEquals(0, taskMetrics.shuffleWriteMetrics().get().shuffleRecordsWritten());
assertEquals(0, taskMetrics.shuffleWriteMetrics().get().shuffleBytesWritten());
assertEquals(0, taskMetrics.diskBytesSpilled());
assertEquals(0, taskMetrics.memoryBytesSpilled());
}

@Test
public void writeWithoutSpilling() throws Exception {
// In this example, each partition should have exactly one record:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -198,11 +198,11 @@ class HistoryServerSuite extends FunSuite with BeforeAndAfter with Matchers with
}

def getContentAndCode(path: String, port: Int = port): (Int, Option[String], Option[String]) = {
HistoryServerSuite.getContentAndCode(new URL(s"http://localhost:$port/json/v1/$path"))
HistoryServerSuite.getContentAndCode(new URL(s"http://localhost:$port/api/v1/$path"))
}

def getUrl(path: String): String = {
HistoryServerSuite.getUrl(new URL(s"http://localhost:$port/json/v1/$path"))
HistoryServerSuite.getUrl(new URL(s"http://localhost:$port/api/v1/$path"))
}

def generateExpectation(name: String, path: String): Unit = {
Expand Down
16 changes: 8 additions & 8 deletions core/src/test/scala/org/apache/spark/ui/UISeleniumSuite.scala
Original file line number Diff line number Diff line change
Expand Up @@ -497,7 +497,7 @@ class UISeleniumSuite extends FunSuite with WebBrowser with Matchers with Before
goToUi(sc, "/jobs/job/?id=7")
find("no-info").get.text should be ("No information to display for job 7")

val badJob = HistoryServerSuite.getContentAndCode(jsonUrl(sc.ui.get, "jobs/7"))
val badJob = HistoryServerSuite.getContentAndCode(apiUrl(sc.ui.get, "jobs/7"))
badJob._1 should be (HttpServletResponse.SC_NOT_FOUND)
badJob._2 should be (None)
badJob._3 should be (Some("unknown job: 7"))
Expand Down Expand Up @@ -540,18 +540,18 @@ class UISeleniumSuite extends FunSuite with WebBrowser with Matchers with Before

goToUi(sc, "/stages/stage/?id=12&attempt=0")
find("no-info").get.text should be ("No information to display for Stage 12 (Attempt 0)")
val badStage = HistoryServerSuite.getContentAndCode(jsonUrl(sc.ui.get,"stages/12/0"))
val badStage = HistoryServerSuite.getContentAndCode(apiUrl(sc.ui.get,"stages/12/0"))
badStage._1 should be (HttpServletResponse.SC_NOT_FOUND)
badStage._2 should be (None)
badStage._3 should be (Some("unknown stage: 12"))

val badAttempt = HistoryServerSuite.getContentAndCode(jsonUrl(sc.ui.get,"stages/19/15"))
val badAttempt = HistoryServerSuite.getContentAndCode(apiUrl(sc.ui.get,"stages/19/15"))
badAttempt._1 should be (HttpServletResponse.SC_NOT_FOUND)
badAttempt._2 should be (None)
badAttempt._3 should be (Some("unknown attempt for stage 19. Found attempts: [0]"))

val badStageAttemptList = HistoryServerSuite.getContentAndCode(
jsonUrl(sc.ui.get, "stages/12"))
apiUrl(sc.ui.get, "stages/12"))
badStageAttemptList._1 should be (HttpServletResponse.SC_NOT_FOUND)
badStageAttemptList._2 should be (None)
badStageAttemptList._3 should be (Some("unknown stage: 12"))
Expand All @@ -561,7 +561,7 @@ class UISeleniumSuite extends FunSuite with WebBrowser with Matchers with Before
test("live UI json application list") {
withSpark(newSparkContext()) { sc =>
val appListRawJson = HistoryServerSuite.getUrl(new URL(
sc.ui.get.appUIAddress + "/json/v1/applications"))
sc.ui.get.appUIAddress + "/api/v1/applications"))
val appListJsonAst = JsonMethods.parse(appListRawJson)
appListJsonAst.children.length should be (1)
val attempts = (appListJsonAst \ "attempts").children
Expand All @@ -587,10 +587,10 @@ class UISeleniumSuite extends FunSuite with WebBrowser with Matchers with Before
}

def getJson(ui: SparkUI, path: String): JValue = {
JsonMethods.parse(HistoryServerSuite.getUrl(jsonUrl(ui, path)))
JsonMethods.parse(HistoryServerSuite.getUrl(apiUrl(ui, path)))
}

def jsonUrl(ui: SparkUI, path: String): URL = {
new URL(ui.appUIAddress + "/json/v1/applications/test/" + path)
def apiUrl(ui: SparkUI, path: String): URL = {
new URL(ui.appUIAddress + "/api/v1/applications/test/" + path)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -112,6 +112,7 @@ class ClosureCleanerSuite extends FunSuite {
expectCorrectException { TestUserClosuresActuallyCleaned.testAggregateByKey(pairRdd) }
expectCorrectException { TestUserClosuresActuallyCleaned.testFoldByKey(pairRdd) }
expectCorrectException { TestUserClosuresActuallyCleaned.testReduceByKey(pairRdd) }
expectCorrectException { TestUserClosuresActuallyCleaned.testReduceByKeyLocally(pairRdd) }
expectCorrectException { TestUserClosuresActuallyCleaned.testMapValues(pairRdd) }
expectCorrectException { TestUserClosuresActuallyCleaned.testFlatMapValues(pairRdd) }
expectCorrectException { TestUserClosuresActuallyCleaned.testForeachAsync(rdd) }
Expand Down Expand Up @@ -315,6 +316,9 @@ private object TestUserClosuresActuallyCleaned {
}
def testFoldByKey(rdd: RDD[(Int, Int)]): Unit = { rdd.foldByKey(0) { case (_, _) => return; 1 } }
def testReduceByKey(rdd: RDD[(Int, Int)]): Unit = { rdd.reduceByKey { case (_, _) => return; 1 } }
def testReduceByKeyLocally(rdd: RDD[(Int, Int)]): Unit = {
rdd.reduceByKeyLocally { case (_, _) => return; 1 }
}
def testMapValues(rdd: RDD[(Int, Int)]): Unit = { rdd.mapValues { _ => return; 1 } }
def testFlatMapValues(rdd: RDD[(Int, Int)]): Unit = { rdd.flatMapValues { _ => return; Seq() } }

Expand Down
95 changes: 95 additions & 0 deletions docs/ml-features.md
Original file line number Diff line number Diff line change
Expand Up @@ -440,5 +440,100 @@ for expanded in polyDF.select("polyFeatures").take(3):
</div>
</div>

## OneHotEncoder

[One-hot encoding](http://en.wikipedia.org/wiki/One-hot) maps a column of label indices to a column of binary vectors, with at most a single one-value. This encoding allows algorithms which expect continuous features, such as Logistic Regression, to use categorical features

<div class="codetabs">
<div data-lang="scala" markdown="1">
{% highlight scala %}
import org.apache.spark.ml.feature.{OneHotEncoder, StringIndexer}

val df = sqlContext.createDataFrame(Seq(
(0, "a"),
(1, "b"),
(2, "c"),
(3, "a"),
(4, "a"),
(5, "c")
)).toDF("id", "category")

val indexer = new StringIndexer()
.setInputCol("category")
.setOutputCol("categoryIndex")
.fit(df)
val indexed = indexer.transform(df)

val encoder = new OneHotEncoder().setInputCol("categoryIndex").
setOutputCol("categoryVec")
val encoded = encoder.transform(indexed)
encoded.select("id", "categoryVec").foreach(println)
{% endhighlight %}
</div>

<div data-lang="java" markdown="1">
{% highlight java %}
import com.google.common.collect.Lists;

import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.ml.feature.OneHotEncoder;
import org.apache.spark.ml.feature.StringIndexer;
import org.apache.spark.ml.feature.StringIndexerModel;
import org.apache.spark.sql.DataFrame;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.RowFactory;
import org.apache.spark.sql.types.DataTypes;
import org.apache.spark.sql.types.Metadata;
import org.apache.spark.sql.types.StructField;
import org.apache.spark.sql.types.StructType;

JavaRDD<Row> jrdd = jsc.parallelize(Lists.newArrayList(
RowFactory.create(0, "a"),
RowFactory.create(1, "b"),
RowFactory.create(2, "c"),
RowFactory.create(3, "a"),
RowFactory.create(4, "a"),
RowFactory.create(5, "c")
));
StructType schema = new StructType(new StructField[]{
new StructField("id", DataTypes.DoubleType, false, Metadata.empty()),
new StructField("category", DataTypes.StringType, false, Metadata.empty())
});
DataFrame df = sqlContext.createDataFrame(jrdd, schema);
StringIndexerModel indexer = new StringIndexer()
.setInputCol("category")
.setOutputCol("categoryIndex")
.fit(df);
DataFrame indexed = indexer.transform(df);

OneHotEncoder encoder = new OneHotEncoder()
.setInputCol("categoryIndex")
.setOutputCol("categoryVec");
DataFrame encoded = encoder.transform(indexed);
{% endhighlight %}
</div>

<div data-lang="python" markdown="1">
{% highlight python %}
from pyspark.ml.feature import OneHotEncoder, StringIndexer

df = sqlContext.createDataFrame([
(0, "a"),
(1, "b"),
(2, "c"),
(3, "a"),
(4, "a"),
(5, "c")
], ["id", "category"])

stringIndexer = StringIndexer(inputCol="category", outputCol="categoryIndex")
model = stringIndexer.fit(df)
indexed = model.transform(df)
encoder = OneHotEncoder(includeFirst=False, inputCol="categoryIndex", outputCol="categoryVec")
encoded = encoder.transform(indexed)
{% endhighlight %}
</div>
</div>

# Feature Selectors

10 changes: 5 additions & 5 deletions docs/monitoring.md
Original file line number Diff line number Diff line change
Expand Up @@ -178,9 +178,9 @@ Note that the history server only displays completed Spark jobs. One way to sign

In addition to viewing the metrics in the UI, they are also available as JSON. This gives developers
an easy way to create new visualizations and monitoring tools for Spark. The JSON is available for
both running applications, and in the history server. The endpoints are mounted at `/json/v1`. Eg.,
for the history server, they would typically be accessible at `http://<server-url>:18080/json/v1`, and
for a running application, at `http://localhost:4040/json/v1`.
both running applications, and in the history server. The endpoints are mounted at `/api/v1`. Eg.,
for the history server, they would typically be accessible at `http://<server-url>:18080/api/v1`, and
for a running application, at `http://localhost:4040/api/v1`.

<table class="table">
<tr><th>Endpoint</th><th>Meaning</th></tr>
Expand Down Expand Up @@ -240,12 +240,12 @@ These endpoints have been strongly versioned to make it easier to develop applic
* Individual fields will never be removed for any given endpoint
* New endpoints may be added
* New fields may be added to existing endpoints
* New versions of the api may be added in the future at a separate endpoint (eg., `json/v2`). New versions are *not* required to be backwards compatible.
* New versions of the api may be added in the future at a separate endpoint (eg., `api/v2`). New versions are *not* required to be backwards compatible.
* Api versions may be dropped, but only after at least one minor release of co-existing with a new api version

Note that even when examining the UI of a running applications, the `applications/[app-id]` portion is
still required, though there is only one application available. Eg. to see the list of jobs for the
running app, you would go to `http://localhost:4040/json/v1/applications/[app-id]/jobs`. This is to
running app, you would go to `http://localhost:4040/api/v1/applications/[app-id]/jobs`. This is to
keep the paths consistent in both modes.

# Metrics
Expand Down
15 changes: 14 additions & 1 deletion docs/running-on-yarn.md
Original file line number Diff line number Diff line change
Expand Up @@ -71,9 +71,22 @@ Most of the configs are the same for Spark on YARN as for other deployment modes
</tr>
<tr>
<td><code>spark.yarn.scheduler.heartbeat.interval-ms</code></td>
<td>5000</td>
<td>3000</td>
<td>
The interval in ms in which the Spark application master heartbeats into the YARN ResourceManager.
The value is capped at half the value of YARN's configuration for the expiry interval
(<code>yarn.am.liveness-monitor.expiry-interval-ms</code>).
</td>
</tr>
<tr>
<td><code>spark.yarn.scheduler.initial-allocation.interval</code></td>
<td>200ms</td>
<td>
The initial interval in which the Spark application master eagerly heartbeats to the YARN ResourceManager
when there are pending container allocation requests. It should be no larger than
<code>spark.yarn.scheduler.heartbeat.interval-ms</code>. The allocation interval will doubled on
successive eager heartbeats if pending containers still exist, until
<code>spark.yarn.scheduler.heartbeat.interval-ms</code> is reached.
</td>
</tr>
<tr>
Expand Down
Loading

0 comments on commit 711d1c6

Please sign in to comment.