From 0bfacd5c5dd7d10a69bcbcbda630f0843d1cf285 Mon Sep 17 00:00:00 2001 From: Xiangrui Meng Date: Thu, 5 Mar 2015 11:50:09 -0800 Subject: [PATCH 1/2] [SPARK-6090][MLLIB] add a basic BinaryClassificationMetrics to PySpark/MLlib A simple wrapper around the Scala implementation. `DataFrame` is used for serialization/deserialization. Methods that return `RDD`s are not supported in this PR. davies If we recognize Scala's `Product`s in Py4J, we can easily add wrappers for Scala methods that returns `RDD[(Double, Double)]`. Is it easy to register serializer for `Product` in PySpark? Author: Xiangrui Meng Closes #4863 from mengxr/SPARK-6090 and squashes the following commits: 009a3a3 [Xiangrui Meng] provide schema dcddab5 [Xiangrui Meng] add a basic BinaryClassificationMetrics to PySpark/MLlib --- .../BinaryClassificationMetrics.scala | 8 ++ python/docs/pyspark.mllib.rst | 7 ++ python/pyspark/mllib/evaluation.py | 83 +++++++++++++++++++ python/run-tests | 1 + 4 files changed, 99 insertions(+) create mode 100644 python/pyspark/mllib/evaluation.py diff --git a/mllib/src/main/scala/org/apache/spark/mllib/evaluation/BinaryClassificationMetrics.scala b/mllib/src/main/scala/org/apache/spark/mllib/evaluation/BinaryClassificationMetrics.scala index ced042e2f96ca..c1d1a224817e8 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/evaluation/BinaryClassificationMetrics.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/evaluation/BinaryClassificationMetrics.scala @@ -22,6 +22,7 @@ import org.apache.spark.Logging import org.apache.spark.SparkContext._ import org.apache.spark.mllib.evaluation.binary._ import org.apache.spark.rdd.{RDD, UnionRDD} +import org.apache.spark.sql.DataFrame /** * :: Experimental :: @@ -53,6 +54,13 @@ class BinaryClassificationMetrics( */ def this(scoreAndLabels: RDD[(Double, Double)]) = this(scoreAndLabels, 0) + /** + * An auxiliary constructor taking a DataFrame. + * @param scoreAndLabels a DataFrame with two double columns: score and label + */ + private[mllib] def this(scoreAndLabels: DataFrame) = + this(scoreAndLabels.map(r => (r.getDouble(0), r.getDouble(1)))) + /** Unpersist intermediate RDDs used in the computation. */ def unpersist() { cumulativeCounts.unpersist() diff --git a/python/docs/pyspark.mllib.rst b/python/docs/pyspark.mllib.rst index b706c5e376ef4..15101470afc07 100644 --- a/python/docs/pyspark.mllib.rst +++ b/python/docs/pyspark.mllib.rst @@ -16,6 +16,13 @@ pyspark.mllib.clustering module :members: :undoc-members: +pyspark.mllib.evaluation module +------------------------------- + +.. automodule:: pyspark.mllib.evaluation + :members: + :undoc-members: + pyspark.mllib.feature module ------------------------------- diff --git a/python/pyspark/mllib/evaluation.py b/python/pyspark/mllib/evaluation.py new file mode 100644 index 0000000000000..16cb49cc0cfff --- /dev/null +++ b/python/pyspark/mllib/evaluation.py @@ -0,0 +1,83 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +from pyspark.mllib.common import JavaModelWrapper +from pyspark.sql import SQLContext +from pyspark.sql.types import StructField, StructType, DoubleType + + +class BinaryClassificationMetrics(JavaModelWrapper): + """ + Evaluator for binary classification. + + >>> scoreAndLabels = sc.parallelize([ + ... (0.1, 0.0), (0.1, 1.0), (0.4, 0.0), (0.6, 0.0), (0.6, 1.0), (0.6, 1.0), (0.8, 1.0)], 2) + >>> metrics = BinaryClassificationMetrics(scoreAndLabels) + >>> metrics.areaUnderROC() + 0.70... + >>> metrics.areaUnderPR() + 0.83... + >>> metrics.unpersist() + """ + + def __init__(self, scoreAndLabels): + """ + :param scoreAndLabels: an RDD of (score, label) pairs + """ + sc = scoreAndLabels.ctx + sql_ctx = SQLContext(sc) + df = sql_ctx.createDataFrame(scoreAndLabels, schema=StructType([ + StructField("score", DoubleType(), nullable=False), + StructField("label", DoubleType(), nullable=False)])) + java_class = sc._jvm.org.apache.spark.mllib.evaluation.BinaryClassificationMetrics + java_model = java_class(df._jdf) + super(BinaryClassificationMetrics, self).__init__(java_model) + + def areaUnderROC(self): + """ + Computes the area under the receiver operating characteristic + (ROC) curve. + """ + return self.call("areaUnderROC") + + def areaUnderPR(self): + """ + Computes the area under the precision-recall curve. + """ + return self.call("areaUnderPR") + + def unpersist(self): + """ + Unpersists intermediate RDDs used in the computation. + """ + self.call("unpersist") + + +def _test(): + import doctest + from pyspark import SparkContext + import pyspark.mllib.evaluation + globs = pyspark.mllib.evaluation.__dict__.copy() + globs['sc'] = SparkContext('local[4]', 'PythonTest') + (failure_count, test_count) = doctest.testmod(globs=globs, optionflags=doctest.ELLIPSIS) + globs['sc'].stop() + if failure_count: + exit(-1) + + +if __name__ == "__main__": + _test() diff --git a/python/run-tests b/python/run-tests index a2c2f37a54eda..b7630c356cfae 100755 --- a/python/run-tests +++ b/python/run-tests @@ -75,6 +75,7 @@ function run_mllib_tests() { echo "Run mllib tests ..." run_test "pyspark/mllib/classification.py" run_test "pyspark/mllib/clustering.py" + run_test "pyspark/mllib/evaluation.py" run_test "pyspark/mllib/feature.py" run_test "pyspark/mllib/linalg.py" run_test "pyspark/mllib/rand.py" From 424a86a1ed2a3e6dd54cf8b09fe2f13a1311b7e6 Mon Sep 17 00:00:00 2001 From: Josh Rosen Date: Thu, 5 Mar 2015 12:04:00 -0800 Subject: [PATCH 2/2] [SPARK-6175] Fix standalone executor log links when ephemeral ports or SPARK_PUBLIC_DNS are used This patch fixes two issues with the executor log viewing links added in Spark 1.3. In standalone mode, the log URLs might include a port value of 0 rather than the actual bound port of the UI, which broke the ability to view logs from workers whose web UIs had been configured to bind to ephemeral ports. In addition, the URLs used workers' local hostnames instead of respecting SPARK_PUBLIC_DNS, which prevented this feature from working properly on Spark EC2 clusters because the links would point to internal DNS names instead of external ones. I included tests for both of these bugs: - We now browse to the URLs and verify that they point to the expected pages. - To test SPARK_PUBLIC_DNS, I changed the code that reads the environment variable to do so via `SparkConf.getenv`, then used a custom SparkConf subclass to mock the environment variable (this pattern is used elsewhere in Spark's tests). Author: Josh Rosen Closes #4903 from JoshRosen/SPARK-6175 and squashes the following commits: 5577f41 [Josh Rosen] Remove println cfec135 [Josh Rosen] Use webUi.boundPort and publicAddress in log links 27918c7 [Josh Rosen] Add failing unit tests for standalone log URL viewing c250fbe [Josh Rosen] Respect SparkConf in local-cluster Workers. 422a2ef [Josh Rosen] Use conf.getenv to read SPARK_PUBLIC_DNS --- .../spark/deploy/LocalSparkCluster.scala | 2 +- .../apache/spark/deploy/master/Master.scala | 2 +- .../spark/deploy/worker/ExecutorRunner.scala | 4 +- .../apache/spark/deploy/worker/Worker.scala | 9 ++-- .../scala/org/apache/spark/ui/WebUI.scala | 2 +- .../spark/deploy/JsonProtocolSuite.scala | 2 +- .../spark/deploy/LogUrlsStandaloneSuite.scala | 54 +++++++++++++++---- .../deploy/worker/ExecutorRunnerTest.scala | 2 +- 8 files changed, 57 insertions(+), 20 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/deploy/LocalSparkCluster.scala b/core/src/main/scala/org/apache/spark/deploy/LocalSparkCluster.scala index 0401b15446a7b..3ab425aab84c8 100644 --- a/core/src/main/scala/org/apache/spark/deploy/LocalSparkCluster.scala +++ b/core/src/main/scala/org/apache/spark/deploy/LocalSparkCluster.scala @@ -59,7 +59,7 @@ class LocalSparkCluster( /* Start the Workers */ for (workerNum <- 1 to numWorkers) { val (workerSystem, _) = Worker.startSystemAndActor(localHostname, 0, 0, coresPerWorker, - memoryPerWorker, masters, null, Some(workerNum)) + memoryPerWorker, masters, null, Some(workerNum), _conf) workerActorSystems += workerSystem } diff --git a/core/src/main/scala/org/apache/spark/deploy/master/Master.scala b/core/src/main/scala/org/apache/spark/deploy/master/Master.scala index 4584b730e3420..15814293227ab 100644 --- a/core/src/main/scala/org/apache/spark/deploy/master/Master.scala +++ b/core/src/main/scala/org/apache/spark/deploy/master/Master.scala @@ -96,7 +96,7 @@ private[spark] class Master( val webUi = new MasterWebUI(this, webUiPort) val masterPublicAddress = { - val envVar = System.getenv("SPARK_PUBLIC_DNS") + val envVar = conf.getenv("SPARK_PUBLIC_DNS") if (envVar != null) envVar else host } diff --git a/core/src/main/scala/org/apache/spark/deploy/worker/ExecutorRunner.scala b/core/src/main/scala/org/apache/spark/deploy/worker/ExecutorRunner.scala index 066d46c4473eb..023f3c6269062 100644 --- a/core/src/main/scala/org/apache/spark/deploy/worker/ExecutorRunner.scala +++ b/core/src/main/scala/org/apache/spark/deploy/worker/ExecutorRunner.scala @@ -44,6 +44,7 @@ private[spark] class ExecutorRunner( val workerId: String, val host: String, val webUiPort: Int, + val publicAddress: String, val sparkHome: File, val executorDir: File, val workerUrl: String, @@ -140,7 +141,8 @@ private[spark] class ExecutorRunner( builder.environment.put("SPARK_LAUNCH_WITH_SCALA", "0") // Add webUI log urls - val baseUrl = s"http://$host:$webUiPort/logPage/?appId=$appId&executorId=$execId&logType=" + val baseUrl = + s"http://$publicAddress:$webUiPort/logPage/?appId=$appId&executorId=$execId&logType=" builder.environment.put("SPARK_LOG_URL_STDERR", s"${baseUrl}stderr") builder.environment.put("SPARK_LOG_URL_STDOUT", s"${baseUrl}stdout") diff --git a/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala b/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala index 2473a90aa9309..f2e7418f4bf15 100755 --- a/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala +++ b/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala @@ -121,7 +121,7 @@ private[spark] class Worker( val shuffleService = new StandaloneWorkerShuffleService(conf, securityMgr) val publicAddress = { - val envVar = System.getenv("SPARK_PUBLIC_DNS") + val envVar = conf.getenv("SPARK_PUBLIC_DNS") if (envVar != null) envVar else host } var webUi: WorkerWebUI = null @@ -362,7 +362,8 @@ private[spark] class Worker( self, workerId, host, - webUiPort, + webUi.boundPort, + publicAddress, sparkHome, executorDir, akkaUrl, @@ -538,10 +539,10 @@ private[spark] object Worker extends Logging { memory: Int, masterUrls: Array[String], workDir: String, - workerNumber: Option[Int] = None): (ActorSystem, Int) = { + workerNumber: Option[Int] = None, + conf: SparkConf = new SparkConf): (ActorSystem, Int) = { // The LocalSparkCluster runs multiple local sparkWorkerX actor systems - val conf = new SparkConf val systemName = "sparkWorker" + workerNumber.map(_.toString).getOrElse("") val actorName = "Worker" val securityMgr = new SecurityManager(conf) diff --git a/core/src/main/scala/org/apache/spark/ui/WebUI.scala b/core/src/main/scala/org/apache/spark/ui/WebUI.scala index 9be65a4a39a09..ec68837a1516c 100644 --- a/core/src/main/scala/org/apache/spark/ui/WebUI.scala +++ b/core/src/main/scala/org/apache/spark/ui/WebUI.scala @@ -47,7 +47,7 @@ private[spark] abstract class WebUI( protected val handlers = ArrayBuffer[ServletContextHandler]() protected var serverInfo: Option[ServerInfo] = None protected val localHostName = Utils.localHostName() - protected val publicHostName = Option(System.getenv("SPARK_PUBLIC_DNS")).getOrElse(localHostName) + protected val publicHostName = Option(conf.getenv("SPARK_PUBLIC_DNS")).getOrElse(localHostName) private val className = Utils.getFormattedClassName(this) def getBasePath: String = basePath diff --git a/core/src/test/scala/org/apache/spark/deploy/JsonProtocolSuite.scala b/core/src/test/scala/org/apache/spark/deploy/JsonProtocolSuite.scala index e955636cf5b59..68b5776fc6515 100644 --- a/core/src/test/scala/org/apache/spark/deploy/JsonProtocolSuite.scala +++ b/core/src/test/scala/org/apache/spark/deploy/JsonProtocolSuite.scala @@ -119,7 +119,7 @@ class JsonProtocolSuite extends FunSuite { def createExecutorRunner(): ExecutorRunner = { new ExecutorRunner("appId", 123, createAppDesc(), 4, 1234, null, "workerId", "host", 123, - new File("sparkHome"), new File("workDir"), "akka://worker", + "publicAddress", new File("sparkHome"), new File("workDir"), "akka://worker", new SparkConf, Seq("localDir"), ExecutorState.RUNNING) } diff --git a/core/src/test/scala/org/apache/spark/deploy/LogUrlsStandaloneSuite.scala b/core/src/test/scala/org/apache/spark/deploy/LogUrlsStandaloneSuite.scala index f33bdc73e40ac..54dd7c9c45c61 100644 --- a/core/src/test/scala/org/apache/spark/deploy/LogUrlsStandaloneSuite.scala +++ b/core/src/test/scala/org/apache/spark/deploy/LogUrlsStandaloneSuite.scala @@ -17,35 +17,69 @@ package org.apache.spark.deploy +import java.net.URL + import scala.collection.mutable +import scala.io.Source -import org.scalatest.{BeforeAndAfter, FunSuite} +import org.scalatest.FunSuite import org.apache.spark.scheduler.cluster.ExecutorInfo import org.apache.spark.scheduler.{SparkListenerExecutorAdded, SparkListener} -import org.apache.spark.{SparkContext, LocalSparkContext} +import org.apache.spark.{SparkConf, SparkContext, LocalSparkContext} -class LogUrlsStandaloneSuite extends FunSuite with LocalSparkContext with BeforeAndAfter { +class LogUrlsStandaloneSuite extends FunSuite with LocalSparkContext { /** Length of time to wait while draining listener events. */ - val WAIT_TIMEOUT_MILLIS = 10000 + private val WAIT_TIMEOUT_MILLIS = 10000 - before { + test("verify that correct log urls get propagated from workers") { sc = new SparkContext("local-cluster[2,1,512]", "test") + + val listener = new SaveExecutorInfo + sc.addSparkListener(listener) + + // Trigger a job so that executors get added + sc.parallelize(1 to 100, 4).map(_.toString).count() + + assert(sc.listenerBus.waitUntilEmpty(WAIT_TIMEOUT_MILLIS)) + listener.addedExecutorInfos.values.foreach { info => + assert(info.logUrlMap.nonEmpty) + // Browse to each URL to check that it's valid + info.logUrlMap.foreach { case (logType, logUrl) => + val html = Source.fromURL(logUrl).mkString + assert(html.contains(s"$logType log page")) + } + } } - test("verify log urls get propagated from workers") { + test("verify that log urls reflect SPARK_PUBLIC_DNS (SPARK-6175)") { + val SPARK_PUBLIC_DNS = "public_dns" + class MySparkConf extends SparkConf(false) { + override def getenv(name: String) = { + if (name == "SPARK_PUBLIC_DNS") SPARK_PUBLIC_DNS + else super.getenv(name) + } + + override def clone: SparkConf = { + new MySparkConf().setAll(getAll) + } + } + val conf = new MySparkConf() + sc = new SparkContext("local-cluster[2,1,512]", "test", conf) + val listener = new SaveExecutorInfo sc.addSparkListener(listener) - val rdd1 = sc.parallelize(1 to 100, 4) - val rdd2 = rdd1.map(_.toString) - rdd2.setName("Target RDD") - rdd2.count() + // Trigger a job so that executors get added + sc.parallelize(1 to 100, 4).map(_.toString).count() assert(sc.listenerBus.waitUntilEmpty(WAIT_TIMEOUT_MILLIS)) listener.addedExecutorInfos.values.foreach { info => assert(info.logUrlMap.nonEmpty) + info.logUrlMap.values.foreach { logUrl => + assert(new URL(logUrl).getHost === SPARK_PUBLIC_DNS) + } } } diff --git a/core/src/test/scala/org/apache/spark/deploy/worker/ExecutorRunnerTest.scala b/core/src/test/scala/org/apache/spark/deploy/worker/ExecutorRunnerTest.scala index 76511699e5ac5..6fca6321e5a1b 100644 --- a/core/src/test/scala/org/apache/spark/deploy/worker/ExecutorRunnerTest.scala +++ b/core/src/test/scala/org/apache/spark/deploy/worker/ExecutorRunnerTest.scala @@ -33,7 +33,7 @@ class ExecutorRunnerTest extends FunSuite { val appDesc = new ApplicationDescription("app name", Some(8), 500, Command("foo", Seq(appId), Map(), Seq(), Seq(), Seq()), "appUiUrl") val er = new ExecutorRunner(appId, 1, appDesc, 8, 500, null, "blah", "worker321", 123, - new File(sparkHome), new File("ooga"), "blah", new SparkConf, Seq("localDir"), + "publicAddr", new File(sparkHome), new File("ooga"), "blah", new SparkConf, Seq("localDir"), ExecutorState.RUNNING) val builder = CommandUtils.buildProcessBuilder(appDesc.command, 512, sparkHome, er.substituteVariables) assert(builder.command().last === appId)