From 731f683b1bd8abbb83030b6bae14876658bbf098 Mon Sep 17 00:00:00 2001 From: Prashant Sharma Date: Thu, 3 Jul 2014 15:06:58 -0700 Subject: [PATCH 01/18] [SPARK-2109] Setting SPARK_MEM for bin/pyspark does not work. Trivial fix. Author: Prashant Sharma Closes #1050 from ScrapCodes/SPARK-2109/pyspark-script-bug and squashes the following commits: 77072b9 [Prashant Sharma] Changed echos to redirect to STDERR. 13f48a0 [Prashant Sharma] [SPARK-2109] Setting SPARK_MEM for bin/pyspark does not work. --- bin/compute-classpath.sh | 8 ++++---- bin/pyspark | 6 +++--- bin/run-example | 10 +++++----- bin/spark-class | 13 ++++++------- 4 files changed, 18 insertions(+), 19 deletions(-) diff --git a/bin/compute-classpath.sh b/bin/compute-classpath.sh index 2cf4e381c1c88..e81e8c060cb98 100755 --- a/bin/compute-classpath.sh +++ b/bin/compute-classpath.sh @@ -81,10 +81,10 @@ ASSEMBLY_JAR=$(ls "$assembly_folder"/spark-assembly*hadoop*.jar 2>/dev/null) # Verify that versions of java used to build the jars and run Spark are compatible jar_error_check=$("$JAR_CMD" -tf "$ASSEMBLY_JAR" nonexistent/class/path 2>&1) if [[ "$jar_error_check" =~ "invalid CEN header" ]]; then - echo "Loading Spark jar with '$JAR_CMD' failed. " - echo "This is likely because Spark was compiled with Java 7 and run " - echo "with Java 6. (see SPARK-1703). Please use Java 7 to run Spark " - echo "or build Spark with Java 6." + echo "Loading Spark jar with '$JAR_CMD' failed. " 1>&2 + echo "This is likely because Spark was compiled with Java 7 and run " 1>&2 + echo "with Java 6. (see SPARK-1703). Please use Java 7 to run Spark " 1>&2 + echo "or build Spark with Java 6." 1>&2 exit 1 fi diff --git a/bin/pyspark b/bin/pyspark index 0b5ed40e2157d..69b056fe28f2c 100755 --- a/bin/pyspark +++ b/bin/pyspark @@ -26,7 +26,7 @@ export SPARK_HOME="$FWDIR" SCALA_VERSION=2.10 if [[ "$@" = *--help ]] || [[ "$@" = *-h ]]; then - echo "Usage: ./bin/pyspark [options]" + echo "Usage: ./bin/pyspark [options]" 1>&2 $FWDIR/bin/spark-submit --help 2>&1 | grep -v Usage 1>&2 exit 0 fi @@ -36,8 +36,8 @@ if [ ! -f "$FWDIR/RELEASE" ]; then # Exit if the user hasn't compiled Spark ls "$FWDIR"/assembly/target/scala-$SCALA_VERSION/spark-assembly*hadoop*.jar >& /dev/null if [[ $? != 0 ]]; then - echo "Failed to find Spark assembly in $FWDIR/assembly/target" >&2 - echo "You need to build Spark before running this program" >&2 + echo "Failed to find Spark assembly in $FWDIR/assembly/target" 1>&2 + echo "You need to build Spark before running this program" 1>&2 exit 1 fi fi diff --git a/bin/run-example b/bin/run-example index e7a5fe3914fbd..942706d733122 100755 --- a/bin/run-example +++ b/bin/run-example @@ -27,9 +27,9 @@ if [ -n "$1" ]; then EXAMPLE_CLASS="$1" shift else - echo "Usage: ./bin/run-example [example-args]" - echo " - set MASTER=XX to use a specific master" - echo " - can use abbreviated example class name (e.g. SparkPi, mllib.LinearRegression)" + echo "Usage: ./bin/run-example [example-args]" 1>&2 + echo " - set MASTER=XX to use a specific master" 1>&2 + echo " - can use abbreviated example class name (e.g. SparkPi, mllib.LinearRegression)" 1>&2 exit 1 fi @@ -40,8 +40,8 @@ elif [ -e "$EXAMPLES_DIR"/target/scala-$SCALA_VERSION/spark-examples-*hadoop*.ja fi if [[ -z $SPARK_EXAMPLES_JAR ]]; then - echo "Failed to find Spark examples assembly in $FWDIR/lib or $FWDIR/examples/target" >&2 - echo "You need to build Spark before running this program" >&2 + echo "Failed to find Spark examples assembly in $FWDIR/lib or $FWDIR/examples/target" 1>&2 + echo "You need to build Spark before running this program" 1>&2 exit 1 fi diff --git a/bin/spark-class b/bin/spark-class index 60d9657c0ffcd..04fa52c6756b1 100755 --- a/bin/spark-class +++ b/bin/spark-class @@ -33,13 +33,13 @@ export SPARK_HOME="$FWDIR" . $FWDIR/bin/load-spark-env.sh if [ -z "$1" ]; then - echo "Usage: spark-class []" >&2 + echo "Usage: spark-class []" 1>&2 exit 1 fi if [ -n "$SPARK_MEM" ]; then - echo "Warning: SPARK_MEM is deprecated, please use a more specific config option" - echo "(e.g., spark.executor.memory or SPARK_DRIVER_MEMORY)." + echo -e "Warning: SPARK_MEM is deprecated, please use a more specific config option" 1>&2 + echo -e "(e.g., spark.executor.memory or SPARK_DRIVER_MEMORY)." 1>&2 fi # Use SPARK_MEM or 512m as the default memory, to be overridden by specific options @@ -147,10 +147,9 @@ fi export CLASSPATH if [ "$SPARK_PRINT_LAUNCH_COMMAND" == "1" ]; then - echo -n "Spark Command: " - echo "$RUNNER" -cp "$CLASSPATH" $JAVA_OPTS "$@" - echo "========================================" - echo + echo -n "Spark Command: " 1>&2 + echo "$RUNNER" -cp "$CLASSPATH" $JAVA_OPTS "$@" 1>&2 + echo -e "========================================\n" 1>&2 fi exec "$RUNNER" -cp "$CLASSPATH" $JAVA_OPTS "$@" From d4c30cd9918e18dde2a52909e36eaef6eb5996ab Mon Sep 17 00:00:00 2001 From: Zongheng Yang Date: Thu, 3 Jul 2014 17:37:53 -0700 Subject: [PATCH 02/18] [HOTFIX] Synchronize on SQLContext.settings in tests. Let's see if this fixes the ongoing series of test failures in a master build machine (https://amplab.cs.berkeley.edu/jenkins/job/Spark-Master-SBT-pre-YARN/SPARK_HADOOP_VERSION=1.0.4,label=centos/81/). pwendell marmbrus Author: Zongheng Yang Closes #1277 from concretevitamin/test-fix and squashes the following commits: 28c88bd [Zongheng Yang] Synchronize on SQLContext.settings in tests. --- .../scala/org/apache/spark/sql/SQLConf.scala | 2 +- .../org/apache/spark/sql/JoinSuite.scala | 40 +++++------ .../org/apache/spark/sql/SQLConfSuite.scala | 64 +++++++++-------- .../org/apache/spark/sql/SQLQuerySuite.scala | 68 ++++++++++--------- 4 files changed, 91 insertions(+), 83 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/SQLConf.scala b/sql/core/src/main/scala/org/apache/spark/sql/SQLConf.scala index 2fe7f94663996..3b5abab969861 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/SQLConf.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/SQLConf.scala @@ -50,7 +50,7 @@ trait SQLConf { /** ********************** SQLConf functionality methods ************ */ @transient - private val settings = java.util.Collections.synchronizedMap( + protected[sql] val settings = java.util.Collections.synchronizedMap( new java.util.HashMap[String, String]()) def set(props: Properties): Unit = { diff --git a/sql/core/src/test/scala/org/apache/spark/sql/JoinSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/JoinSuite.scala index 3d7d5eedbe8ed..054b14f8f7ffa 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/JoinSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/JoinSuite.scala @@ -39,25 +39,27 @@ class JoinSuite extends QueryTest { test("plans broadcast hash join, given hints") { def mkTest(buildSide: BuildSide, leftTable: String, rightTable: String) = { - TestSQLContext.set("spark.sql.join.broadcastTables", - s"${if (buildSide == BuildRight) rightTable else leftTable}") - val rdd = sql(s"""SELECT * FROM $leftTable JOIN $rightTable ON key = a""") - // Using `sparkPlan` because for relevant patterns in HashJoin to be - // matched, other strategies need to be applied. - val physical = rdd.queryExecution.sparkPlan - val bhj = physical.collect { case j: BroadcastHashJoin if j.buildSide == buildSide => j } - - assert(bhj.size === 1, "planner does not pick up hint to generate broadcast hash join") - checkAnswer( - rdd, - Seq( - (1, "1", 1, 1), - (1, "1", 1, 2), - (2, "2", 2, 1), - (2, "2", 2, 2), - (3, "3", 3, 1), - (3, "3", 3, 2) - )) + TestSQLContext.settings.synchronized { + TestSQLContext.set("spark.sql.join.broadcastTables", + s"${if (buildSide == BuildRight) rightTable else leftTable}") + val rdd = sql( s"""SELECT * FROM $leftTable JOIN $rightTable ON key = a""") + // Using `sparkPlan` because for relevant patterns in HashJoin to be + // matched, other strategies need to be applied. + val physical = rdd.queryExecution.sparkPlan + val bhj = physical.collect { case j: BroadcastHashJoin if j.buildSide == buildSide => j} + + assert(bhj.size === 1, "planner does not pick up hint to generate broadcast hash join") + checkAnswer( + rdd, + Seq( + (1, "1", 1, 1), + (1, "1", 1, 2), + (2, "2", 2, 1), + (2, "2", 2, 2), + (3, "3", 3, 1), + (3, "3", 3, 2) + )) + } } mkTest(BuildRight, "testData", "testData2") diff --git a/sql/core/src/test/scala/org/apache/spark/sql/SQLConfSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/SQLConfSuite.scala index 08293f7f0ca30..93792f698cfaf 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/SQLConfSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/SQLConfSuite.scala @@ -28,46 +28,50 @@ class SQLConfSuite extends QueryTest { val testVal = "test.val.0" test("programmatic ways of basic setting and getting") { - clear() - assert(getOption(testKey).isEmpty) - assert(getAll.toSet === Set()) + TestSQLContext.settings.synchronized { + clear() + assert(getOption(testKey).isEmpty) + assert(getAll.toSet === Set()) - set(testKey, testVal) - assert(get(testKey) == testVal) - assert(get(testKey, testVal + "_") == testVal) - assert(getOption(testKey) == Some(testVal)) - assert(contains(testKey)) + set(testKey, testVal) + assert(get(testKey) == testVal) + assert(get(testKey, testVal + "_") == testVal) + assert(getOption(testKey) == Some(testVal)) + assert(contains(testKey)) - // Tests SQLConf as accessed from a SQLContext is mutable after - // the latter is initialized, unlike SparkConf inside a SparkContext. - assert(TestSQLContext.get(testKey) == testVal) - assert(TestSQLContext.get(testKey, testVal + "_") == testVal) - assert(TestSQLContext.getOption(testKey) == Some(testVal)) - assert(TestSQLContext.contains(testKey)) + // Tests SQLConf as accessed from a SQLContext is mutable after + // the latter is initialized, unlike SparkConf inside a SparkContext. + assert(TestSQLContext.get(testKey) == testVal) + assert(TestSQLContext.get(testKey, testVal + "_") == testVal) + assert(TestSQLContext.getOption(testKey) == Some(testVal)) + assert(TestSQLContext.contains(testKey)) - clear() + clear() + } } test("parse SQL set commands") { - clear() - sql(s"set $testKey=$testVal") - assert(get(testKey, testVal + "_") == testVal) - assert(TestSQLContext.get(testKey, testVal + "_") == testVal) + TestSQLContext.settings.synchronized { + clear() + sql(s"set $testKey=$testVal") + assert(get(testKey, testVal + "_") == testVal) + assert(TestSQLContext.get(testKey, testVal + "_") == testVal) - sql("set mapred.reduce.tasks=20") - assert(get("mapred.reduce.tasks", "0") == "20") - sql("set mapred.reduce.tasks = 40") - assert(get("mapred.reduce.tasks", "0") == "40") + sql("set mapred.reduce.tasks=20") + assert(get("mapred.reduce.tasks", "0") == "20") + sql("set mapred.reduce.tasks = 40") + assert(get("mapred.reduce.tasks", "0") == "40") - val key = "spark.sql.key" - val vs = "val0,val_1,val2.3,my_table" - sql(s"set $key=$vs") - assert(get(key, "0") == vs) + val key = "spark.sql.key" + val vs = "val0,val_1,val2.3,my_table" + sql(s"set $key=$vs") + assert(get(key, "0") == vs) - sql(s"set $key=") - assert(get(key, "0") == "") + sql(s"set $key=") + assert(get(key, "0") == "") - clear() + clear() + } } } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala index bf7fafe952303..2c1cb1867010c 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala @@ -372,38 +372,40 @@ class SQLQuerySuite extends QueryTest { } test("SET commands semantics using sql()") { - clear() - val testKey = "test.key.0" - val testVal = "test.val.0" - val nonexistentKey = "nonexistent" - - // "set" itself returns all config variables currently specified in SQLConf. - assert(sql("SET").collect().size == 0) - - // "set key=val" - sql(s"SET $testKey=$testVal") - checkAnswer( - sql("SET"), - Seq(Seq(testKey, testVal)) - ) - - sql(s"SET ${testKey + testKey}=${testVal + testVal}") - checkAnswer( - sql("set"), - Seq( - Seq(testKey, testVal), - Seq(testKey + testKey, testVal + testVal)) - ) - - // "set key" - checkAnswer( - sql(s"SET $testKey"), - Seq(Seq(testKey, testVal)) - ) - checkAnswer( - sql(s"SET $nonexistentKey"), - Seq(Seq(nonexistentKey, "")) - ) - clear() + TestSQLContext.settings.synchronized { + clear() + val testKey = "test.key.0" + val testVal = "test.val.0" + val nonexistentKey = "nonexistent" + + // "set" itself returns all config variables currently specified in SQLConf. + assert(sql("SET").collect().size == 0) + + // "set key=val" + sql(s"SET $testKey=$testVal") + checkAnswer( + sql("SET"), + Seq(Seq(testKey, testVal)) + ) + + sql(s"SET ${testKey + testKey}=${testVal + testVal}") + checkAnswer( + sql("set"), + Seq( + Seq(testKey, testVal), + Seq(testKey + testKey, testVal + testVal)) + ) + + // "set key" + checkAnswer( + sql(s"SET $testKey"), + Seq(Seq(testKey, testVal)) + ) + checkAnswer( + sql(s"SET $nonexistentKey"), + Seq(Seq(nonexistentKey, "")) + ) + clear() + } } } From fdc4c112e7c2ac585d108d03209a642aa8bab7c8 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Cl=C3=A9ment=20MATHIEU?= Date: Thu, 3 Jul 2014 18:31:18 -0700 Subject: [PATCH 03/18] Streaming programming guide typos MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Fix a bad Java code sample and a broken link in the streaming programming guide. Author: Clément MATHIEU Closes #1286 from cykl/streaming-programming-guide-typos and squashes the following commits: b0908cb [Clément MATHIEU] Fix broken URL 9d3c535 [Clément MATHIEU] Spark streaming requires at least two working threads (scala version was OK) --- docs/streaming-programming-guide.md | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/docs/streaming-programming-guide.md b/docs/streaming-programming-guide.md index ce8e58d64a7ed..90a0eef60c200 100644 --- a/docs/streaming-programming-guide.md +++ b/docs/streaming-programming-guide.md @@ -148,7 +148,7 @@ import org.apache.spark.streaming.*; import org.apache.spark.streaming.api.java.*; import scala.Tuple2; // Create a StreamingContext with a local master -JavaStreamingContext jssc = new JavaStreamingContext("local", "JavaNetworkWordCount", new Duration(1000)) +JavaStreamingContext jssc = new JavaStreamingContext("local[2]", "JavaNetworkWordCount", new Duration(1000)) {% endhighlight %} Using this context, we then create a new DStream @@ -216,7 +216,7 @@ jssc.awaitTermination(); // Wait for the computation to terminate {% endhighlight %} The complete code can be found in the Spark Streaming example -[JavaNetworkWordCount]({{site.SPARK_GITHUB_URL}}/blob/master/examples/src/main/java/index.html?org/apache/spark/examples/streaming/JavaNetworkWordCount.java). +[JavaNetworkWordCount]({{site.SPARK_GITHUB_URL}}/blob/master/examples/src/main/java/org/apache/spark/examples/streaming/JavaNetworkWordCount.java).
From 5fa0a05763ab1d527efe20e3b10539ac5ffc36de Mon Sep 17 00:00:00 2001 From: Raymond Liu Date: Thu, 3 Jul 2014 19:24:22 -0700 Subject: [PATCH 04/18] [SPARK-1097] Workaround Hadoop conf ConcurrentModification issue Workaround Hadoop conf ConcurrentModification issue Author: Raymond Liu Closes #1273 from colorant/hadoopRDD and squashes the following commits: 994e98b [Raymond Liu] Address comments e2cda3d [Raymond Liu] Workaround Hadoop conf ConcurrentModification issue --- core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala b/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala index 98dcbf4e2dbfa..041028514399b 100644 --- a/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala +++ b/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala @@ -141,8 +141,8 @@ class HadoopRDD[K, V]( // local process. The local cache is accessed through HadoopRDD.putCachedMetadata(). // The caching helps minimize GC, since a JobConf can contain ~10KB of temporary objects. // synchronize to prevent ConcurrentModificationException (Spark-1097, Hadoop-10456) - broadcastedConf.synchronized { - val newJobConf = new JobConf(broadcastedConf.value.value) + conf.synchronized { + val newJobConf = new JobConf(conf) initLocalJobConfFuncOpt.map(f => f(newJobConf)) HadoopRDD.putCachedMetadata(jobConfCacheKey, newJobConf) newJobConf From 586feb5c9528042420f678f78bacb6c254a5eaf8 Mon Sep 17 00:00:00 2001 From: Aaron Davidson Date: Thu, 3 Jul 2014 22:31:41 -0700 Subject: [PATCH 05/18] [SPARK-2350] Don't NPE while launching drivers Prior to this change, we could throw a NPE if we launch a driver while another one is waiting, because removing from an iterator while iterating over it is not safe. Author: Aaron Davidson Closes #1289 from aarondav/master-fail and squashes the following commits: 1cf1cf4 [Aaron Davidson] SPARK-2350: Don't NPE while launching drivers --- core/src/main/scala/org/apache/spark/deploy/master/Master.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/core/src/main/scala/org/apache/spark/deploy/master/Master.scala b/core/src/main/scala/org/apache/spark/deploy/master/Master.scala index 11545b8203707..a304102a49086 100644 --- a/core/src/main/scala/org/apache/spark/deploy/master/Master.scala +++ b/core/src/main/scala/org/apache/spark/deploy/master/Master.scala @@ -481,7 +481,7 @@ private[spark] class Master( // First schedule drivers, they take strict precedence over applications val shuffledWorkers = Random.shuffle(workers) // Randomization helps balance drivers for (worker <- shuffledWorkers if worker.state == WorkerState.ALIVE) { - for (driver <- waitingDrivers) { + for (driver <- List(waitingDrivers: _*)) { // iterate over a copy of waitingDrivers if (worker.memoryFree >= driver.desc.mem && worker.coresFree >= driver.desc.cores) { launchDriver(worker, driver) waitingDrivers -= driver From 3894a49be9b532cc026d908a0f49bca850504498 Mon Sep 17 00:00:00 2001 From: Andrew Or Date: Thu, 3 Jul 2014 22:48:23 -0700 Subject: [PATCH 06/18] [SPARK-2307][Reprise] Correctly report RDD blocks on SparkUI **Problem.** The existing code in `ExecutorPage.scala` requires a linear scan through all the blocks to filter out the uncached ones. Every refresh could be expensive if there are many blocks and many executors. **Solution.** The proper semantics should be the following: `StorageStatusListener` should contain only block statuses that are cached. This means as soon as a block is unpersisted by any mean, its status should be removed. This is reflected in the changes made in `StorageStatusListener.scala`. Further, the `StorageTab` must stop relying on the `StorageStatusListener` changing a dropped block's status to `StorageLevel.NONE` (which no longer happens). This is reflected in the changes made in `StorageTab.scala` and `StorageUtils.scala`. ---------- If you have been following this chain of PRs like pwendell, you will quickly notice that this reverts the changes in #1249, which reverts the changes in #1080. In other words, we are adding back the changes from #1080, and fixing SPARK-2307 on top of those changes. Please ask questions if you are confused. Author: Andrew Or Closes #1255 from andrewor14/storage-ui-fix-reprise and squashes the following commits: 45416fa [Andrew Or] Merge branch 'master' of github.com:apache/spark into storage-ui-fix-reprise a82ea25 [Andrew Or] Add tests for StorageStatusListener 8773b01 [Andrew Or] Update comment / minor changes 3afde3f [Andrew Or] Correctly report the number of blocks on SparkUI --- .../spark/storage/StorageStatusListener.scala | 17 +- .../apache/spark/storage/StorageUtils.scala | 15 +- .../apache/spark/ui/exec/ExecutorsPage.scala | 4 +- .../apache/spark/ui/exec/ExecutorsTab.scala | 4 +- .../apache/spark/ui/storage/StorageTab.scala | 15 +- .../storage/StorageStatusListenerSuite.scala | 152 ++++++++++++++++++ 6 files changed, 184 insertions(+), 23 deletions(-) create mode 100644 core/src/test/scala/org/apache/spark/storage/StorageStatusListenerSuite.scala diff --git a/core/src/main/scala/org/apache/spark/storage/StorageStatusListener.scala b/core/src/main/scala/org/apache/spark/storage/StorageStatusListener.scala index a6e6627d54e01..41c960c867e2e 100644 --- a/core/src/main/scala/org/apache/spark/storage/StorageStatusListener.scala +++ b/core/src/main/scala/org/apache/spark/storage/StorageStatusListener.scala @@ -28,26 +28,31 @@ import org.apache.spark.scheduler._ */ @DeveloperApi class StorageStatusListener extends SparkListener { - private val executorIdToStorageStatus = mutable.Map[String, StorageStatus]() + // This maintains only blocks that are cached (i.e. storage level is not StorageLevel.NONE) + private[storage] val executorIdToStorageStatus = mutable.Map[String, StorageStatus]() def storageStatusList = executorIdToStorageStatus.values.toSeq /** Update storage status list to reflect updated block statuses */ - def updateStorageStatus(execId: String, updatedBlocks: Seq[(BlockId, BlockStatus)]) { - val filteredStatus = storageStatusList.find(_.blockManagerId.executorId == execId) + private def updateStorageStatus(execId: String, updatedBlocks: Seq[(BlockId, BlockStatus)]) { + val filteredStatus = executorIdToStorageStatus.get(execId) filteredStatus.foreach { storageStatus => updatedBlocks.foreach { case (blockId, updatedStatus) => - storageStatus.blocks(blockId) = updatedStatus + if (updatedStatus.storageLevel == StorageLevel.NONE) { + storageStatus.blocks.remove(blockId) + } else { + storageStatus.blocks(blockId) = updatedStatus + } } } } /** Update storage status list to reflect the removal of an RDD from the cache */ - def updateStorageStatus(unpersistedRDDId: Int) { + private def updateStorageStatus(unpersistedRDDId: Int) { storageStatusList.foreach { storageStatus => val unpersistedBlocksIds = storageStatus.rddBlocks.keys.filter(_.rddId == unpersistedRDDId) unpersistedBlocksIds.foreach { blockId => - storageStatus.blocks(blockId) = BlockStatus(StorageLevel.NONE, 0L, 0L, 0L) + storageStatus.blocks.remove(blockId) } } } diff --git a/core/src/main/scala/org/apache/spark/storage/StorageUtils.scala b/core/src/main/scala/org/apache/spark/storage/StorageUtils.scala index f3bde1df45c79..177281f663367 100644 --- a/core/src/main/scala/org/apache/spark/storage/StorageUtils.scala +++ b/core/src/main/scala/org/apache/spark/storage/StorageUtils.scala @@ -75,17 +75,26 @@ private[spark] object StorageUtils { /** Returns storage information of all RDDs in the given list. */ def rddInfoFromStorageStatus( storageStatuses: Seq[StorageStatus], - rddInfos: Seq[RDDInfo]): Array[RDDInfo] = { + rddInfos: Seq[RDDInfo], + updatedBlocks: Seq[(BlockId, BlockStatus)] = Seq.empty): Array[RDDInfo] = { + + // Mapping from a block ID -> its status + val blockMap = mutable.Map(storageStatuses.flatMap(_.rddBlocks): _*) + + // Record updated blocks, if any + updatedBlocks + .collect { case (id: RDDBlockId, status) => (id, status) } + .foreach { case (id, status) => blockMap(id) = status } // Mapping from RDD ID -> an array of associated BlockStatuses - val blockStatusMap = storageStatuses.flatMap(_.rddBlocks).toMap + val rddBlockMap = blockMap .groupBy { case (k, _) => k.rddId } .mapValues(_.values.toArray) // Mapping from RDD ID -> the associated RDDInfo (with potentially outdated storage information) val rddInfoMap = rddInfos.map { info => (info.id, info) }.toMap - val rddStorageInfos = blockStatusMap.flatMap { case (rddId, blocks) => + val rddStorageInfos = rddBlockMap.flatMap { case (rddId, blocks) => // Add up memory, disk and Tachyon sizes val persistedBlocks = blocks.filter { status => status.memSize + status.diskSize + status.tachyonSize > 0 } diff --git a/core/src/main/scala/org/apache/spark/ui/exec/ExecutorsPage.scala b/core/src/main/scala/org/apache/spark/ui/exec/ExecutorsPage.scala index 9625337ae21a5..95b4a4e91d333 100644 --- a/core/src/main/scala/org/apache/spark/ui/exec/ExecutorsPage.scala +++ b/core/src/main/scala/org/apache/spark/ui/exec/ExecutorsPage.scala @@ -110,9 +110,7 @@ private[ui] class ExecutorsPage(parent: ExecutorsTab) extends WebUIPage("") { val status = listener.storageStatusList(statusId) val execId = status.blockManagerId.executorId val hostPort = status.blockManagerId.hostPort - val rddBlocks = status.blocks.count { case (_, blockStatus) => - blockStatus.storageLevel != StorageLevel.NONE - } + val rddBlocks = status.blocks.size val memUsed = status.memUsed val maxMem = status.maxMem val diskUsed = status.diskUsed diff --git a/core/src/main/scala/org/apache/spark/ui/exec/ExecutorsTab.scala b/core/src/main/scala/org/apache/spark/ui/exec/ExecutorsTab.scala index 58eeb86bf9a3a..5c2d1d1fe75d3 100644 --- a/core/src/main/scala/org/apache/spark/ui/exec/ExecutorsTab.scala +++ b/core/src/main/scala/org/apache/spark/ui/exec/ExecutorsTab.scala @@ -39,9 +39,7 @@ private[ui] class ExecutorsTab(parent: SparkUI) extends WebUITab(parent, "execut * A SparkListener that prepares information to be displayed on the ExecutorsTab */ @DeveloperApi -class ExecutorsListener(storageStatusListener: StorageStatusListener) - extends SparkListener { - +class ExecutorsListener(storageStatusListener: StorageStatusListener) extends SparkListener { val executorToTasksActive = HashMap[String, Int]() val executorToTasksComplete = HashMap[String, Int]() val executorToTasksFailed = HashMap[String, Int]() diff --git a/core/src/main/scala/org/apache/spark/ui/storage/StorageTab.scala b/core/src/main/scala/org/apache/spark/ui/storage/StorageTab.scala index c4bb7aab50393..0cc0cf3117173 100644 --- a/core/src/main/scala/org/apache/spark/ui/storage/StorageTab.scala +++ b/core/src/main/scala/org/apache/spark/ui/storage/StorageTab.scala @@ -22,7 +22,7 @@ import scala.collection.mutable import org.apache.spark.annotation.DeveloperApi import org.apache.spark.ui._ import org.apache.spark.scheduler._ -import org.apache.spark.storage.{RDDInfo, StorageStatusListener, StorageUtils} +import org.apache.spark.storage._ /** Web UI showing storage status of all RDD's in the given SparkContext. */ private[ui] class StorageTab(parent: SparkUI) extends WebUITab(parent, "storage") { @@ -40,9 +40,7 @@ private[ui] class StorageTab(parent: SparkUI) extends WebUITab(parent, "storage" * A SparkListener that prepares information to be displayed on the BlockManagerUI. */ @DeveloperApi -class StorageListener(storageStatusListener: StorageStatusListener) - extends SparkListener { - +class StorageListener(storageStatusListener: StorageStatusListener) extends SparkListener { private val _rddInfoMap = mutable.Map[Int, RDDInfo]() def storageStatusList = storageStatusListener.storageStatusList @@ -51,9 +49,10 @@ class StorageListener(storageStatusListener: StorageStatusListener) def rddInfoList = _rddInfoMap.values.filter(_.numCachedPartitions > 0).toSeq /** Update each RDD's info to reflect any updates to the RDD's storage status */ - private def updateRDDInfo() { + private def updateRDDInfo(updatedBlocks: Seq[(BlockId, BlockStatus)] = Seq.empty) { val rddInfos = _rddInfoMap.values.toSeq - val updatedRddInfos = StorageUtils.rddInfoFromStorageStatus(storageStatusList, rddInfos) + val updatedRddInfos = + StorageUtils.rddInfoFromStorageStatus(storageStatusList, rddInfos, updatedBlocks) updatedRddInfos.foreach { info => _rddInfoMap(info.id) = info } } @@ -64,7 +63,7 @@ class StorageListener(storageStatusListener: StorageStatusListener) override def onTaskEnd(taskEnd: SparkListenerTaskEnd) = synchronized { val metrics = taskEnd.taskMetrics if (metrics != null && metrics.updatedBlocks.isDefined) { - updateRDDInfo() + updateRDDInfo(metrics.updatedBlocks.get) } } @@ -79,6 +78,6 @@ class StorageListener(storageStatusListener: StorageStatusListener) } override def onUnpersistRDD(unpersistRDD: SparkListenerUnpersistRDD) = synchronized { - updateRDDInfo() + _rddInfoMap.remove(unpersistRDD.rddId) } } diff --git a/core/src/test/scala/org/apache/spark/storage/StorageStatusListenerSuite.scala b/core/src/test/scala/org/apache/spark/storage/StorageStatusListenerSuite.scala new file mode 100644 index 0000000000000..2179c6dd3302e --- /dev/null +++ b/core/src/test/scala/org/apache/spark/storage/StorageStatusListenerSuite.scala @@ -0,0 +1,152 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.storage + +import org.scalatest.FunSuite +import org.apache.spark.Success +import org.apache.spark.executor.TaskMetrics +import org.apache.spark.scheduler._ + +/** + * Test the behavior of StorageStatusListener in response to all relevant events. + */ +class StorageStatusListenerSuite extends FunSuite { + private val bm1 = BlockManagerId("big", "dog", 1, 1) + private val bm2 = BlockManagerId("fat", "duck", 2, 2) + private val taskInfo1 = new TaskInfo(0, 0, 0, 0, "big", "dog", TaskLocality.ANY, false) + private val taskInfo2 = new TaskInfo(0, 0, 0, 0, "fat", "duck", TaskLocality.ANY, false) + + test("block manager added/removed") { + val listener = new StorageStatusListener + + // Block manager add + assert(listener.executorIdToStorageStatus.size === 0) + listener.onBlockManagerAdded(SparkListenerBlockManagerAdded(bm1, 1000L)) + assert(listener.executorIdToStorageStatus.size === 1) + assert(listener.executorIdToStorageStatus.get("big").isDefined) + assert(listener.executorIdToStorageStatus("big").blockManagerId === bm1) + assert(listener.executorIdToStorageStatus("big").maxMem === 1000L) + assert(listener.executorIdToStorageStatus("big").blocks.isEmpty) + listener.onBlockManagerAdded(SparkListenerBlockManagerAdded(bm2, 2000L)) + assert(listener.executorIdToStorageStatus.size === 2) + assert(listener.executorIdToStorageStatus.get("fat").isDefined) + assert(listener.executorIdToStorageStatus("fat").blockManagerId === bm2) + assert(listener.executorIdToStorageStatus("fat").maxMem === 2000L) + assert(listener.executorIdToStorageStatus("fat").blocks.isEmpty) + + // Block manager remove + listener.onBlockManagerRemoved(SparkListenerBlockManagerRemoved(bm1)) + assert(listener.executorIdToStorageStatus.size === 1) + assert(!listener.executorIdToStorageStatus.get("big").isDefined) + assert(listener.executorIdToStorageStatus.get("fat").isDefined) + listener.onBlockManagerRemoved(SparkListenerBlockManagerRemoved(bm2)) + assert(listener.executorIdToStorageStatus.size === 0) + assert(!listener.executorIdToStorageStatus.get("big").isDefined) + assert(!listener.executorIdToStorageStatus.get("fat").isDefined) + } + + test("task end without updated blocks") { + val listener = new StorageStatusListener + listener.onBlockManagerAdded(SparkListenerBlockManagerAdded(bm1, 1000L)) + listener.onBlockManagerAdded(SparkListenerBlockManagerAdded(bm2, 2000L)) + val taskMetrics = new TaskMetrics + + // Task end with no updated blocks + assert(listener.executorIdToStorageStatus("big").blocks.isEmpty) + assert(listener.executorIdToStorageStatus("fat").blocks.isEmpty) + listener.onTaskEnd(SparkListenerTaskEnd(1, "obliteration", Success, taskInfo1, taskMetrics)) + assert(listener.executorIdToStorageStatus("big").blocks.isEmpty) + assert(listener.executorIdToStorageStatus("fat").blocks.isEmpty) + listener.onTaskEnd(SparkListenerTaskEnd(1, "obliteration", Success, taskInfo2, taskMetrics)) + assert(listener.executorIdToStorageStatus("big").blocks.isEmpty) + assert(listener.executorIdToStorageStatus("fat").blocks.isEmpty) + } + + test("task end with updated blocks") { + val listener = new StorageStatusListener + listener.onBlockManagerAdded(SparkListenerBlockManagerAdded(bm1, 1000L)) + listener.onBlockManagerAdded(SparkListenerBlockManagerAdded(bm2, 2000L)) + val taskMetrics1 = new TaskMetrics + val taskMetrics2 = new TaskMetrics + val block1 = (RDDBlockId(1, 1), BlockStatus(StorageLevel.DISK_ONLY, 0L, 100L, 0L)) + val block2 = (RDDBlockId(1, 2), BlockStatus(StorageLevel.DISK_ONLY, 0L, 200L, 0L)) + val block3 = (RDDBlockId(4, 0), BlockStatus(StorageLevel.DISK_ONLY, 0L, 300L, 0L)) + taskMetrics1.updatedBlocks = Some(Seq(block1, block2)) + taskMetrics2.updatedBlocks = Some(Seq(block3)) + + // Task end with new blocks + assert(listener.executorIdToStorageStatus("big").blocks.isEmpty) + assert(listener.executorIdToStorageStatus("fat").blocks.isEmpty) + listener.onTaskEnd(SparkListenerTaskEnd(1, "obliteration", Success, taskInfo1, taskMetrics1)) + assert(listener.executorIdToStorageStatus("big").blocks.size === 2) + assert(listener.executorIdToStorageStatus("fat").blocks.size === 0) + assert(listener.executorIdToStorageStatus("big").blocks.contains(RDDBlockId(1, 1))) + assert(listener.executorIdToStorageStatus("big").blocks.contains(RDDBlockId(1, 2))) + assert(listener.executorIdToStorageStatus("fat").blocks.isEmpty) + listener.onTaskEnd(SparkListenerTaskEnd(1, "obliteration", Success, taskInfo2, taskMetrics2)) + assert(listener.executorIdToStorageStatus("big").blocks.size === 2) + assert(listener.executorIdToStorageStatus("fat").blocks.size === 1) + assert(listener.executorIdToStorageStatus("big").blocks.contains(RDDBlockId(1, 1))) + assert(listener.executorIdToStorageStatus("big").blocks.contains(RDDBlockId(1, 2))) + assert(listener.executorIdToStorageStatus("fat").blocks.contains(RDDBlockId(4, 0))) + + // Task end with dropped blocks + val droppedBlock1 = (RDDBlockId(1, 1), BlockStatus(StorageLevel.NONE, 0L, 0L, 0L)) + val droppedBlock2 = (RDDBlockId(1, 2), BlockStatus(StorageLevel.NONE, 0L, 0L, 0L)) + val droppedBlock3 = (RDDBlockId(4, 0), BlockStatus(StorageLevel.NONE, 0L, 0L, 0L)) + taskMetrics1.updatedBlocks = Some(Seq(droppedBlock1, droppedBlock3)) + taskMetrics2.updatedBlocks = Some(Seq(droppedBlock2, droppedBlock3)) + listener.onTaskEnd(SparkListenerTaskEnd(1, "obliteration", Success, taskInfo1, taskMetrics1)) + assert(listener.executorIdToStorageStatus("big").blocks.size === 1) + assert(listener.executorIdToStorageStatus("fat").blocks.size === 1) + assert(!listener.executorIdToStorageStatus("big").blocks.contains(RDDBlockId(1, 1))) + assert(listener.executorIdToStorageStatus("big").blocks.contains(RDDBlockId(1, 2))) + assert(listener.executorIdToStorageStatus("fat").blocks.contains(RDDBlockId(4, 0))) + listener.onTaskEnd(SparkListenerTaskEnd(1, "obliteration", Success, taskInfo2, taskMetrics2)) + assert(listener.executorIdToStorageStatus("big").blocks.size === 1) + assert(listener.executorIdToStorageStatus("fat").blocks.size === 0) + assert(!listener.executorIdToStorageStatus("big").blocks.contains(RDDBlockId(1, 1))) + assert(listener.executorIdToStorageStatus("big").blocks.contains(RDDBlockId(1, 2))) + assert(listener.executorIdToStorageStatus("fat").blocks.isEmpty) + } + + test("unpersist RDD") { + val listener = new StorageStatusListener + listener.onBlockManagerAdded(SparkListenerBlockManagerAdded(bm1, 1000L)) + val taskMetrics1 = new TaskMetrics + val taskMetrics2 = new TaskMetrics + val block1 = (RDDBlockId(1, 1), BlockStatus(StorageLevel.DISK_ONLY, 0L, 100L, 0L)) + val block2 = (RDDBlockId(1, 2), BlockStatus(StorageLevel.DISK_ONLY, 0L, 200L, 0L)) + val block3 = (RDDBlockId(4, 0), BlockStatus(StorageLevel.DISK_ONLY, 0L, 300L, 0L)) + taskMetrics1.updatedBlocks = Some(Seq(block1, block2)) + taskMetrics2.updatedBlocks = Some(Seq(block3)) + listener.onTaskEnd(SparkListenerTaskEnd(1, "obliteration", Success, taskInfo1, taskMetrics1)) + listener.onTaskEnd(SparkListenerTaskEnd(1, "obliteration", Success, taskInfo1, taskMetrics2)) + assert(listener.executorIdToStorageStatus("big").blocks.size === 3) + + // Unpersist RDD + listener.onUnpersistRDD(SparkListenerUnpersistRDD(9090)) + assert(listener.executorIdToStorageStatus("big").blocks.size === 3) + listener.onUnpersistRDD(SparkListenerUnpersistRDD(4)) + assert(listener.executorIdToStorageStatus("big").blocks.size === 2) + assert(listener.executorIdToStorageStatus("big").blocks.contains(RDDBlockId(1, 1))) + assert(listener.executorIdToStorageStatus("big").blocks.contains(RDDBlockId(1, 2))) + listener.onUnpersistRDD(SparkListenerUnpersistRDD(1)) + assert(listener.executorIdToStorageStatus("big").blocks.isEmpty) + } +} From 97a0bfe1c0261384f09d53f9350de52fb6446d59 Mon Sep 17 00:00:00 2001 From: Aaron Davidson Date: Thu, 3 Jul 2014 23:02:36 -0700 Subject: [PATCH 07/18] SPARK-2282: Reuse PySpark Accumulator sockets to avoid crashing Spark JIRA: https://issues.apache.org/jira/browse/SPARK-2282 This issue is caused by a buildup of sockets in the TIME_WAIT stage of TCP, which is a stage that lasts for some period of time after the communication closes. This solution simply allows us to reuse sockets that are in TIME_WAIT, to avoid issues with the buildup of the rapid creation of these sockets. Author: Aaron Davidson Closes #1220 from aarondav/SPARK-2282 and squashes the following commits: 2e5cab3 [Aaron Davidson] SPARK-2282: Reuse PySpark Accumulator sockets to avoid crashing Spark --- core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala | 2 ++ 1 file changed, 2 insertions(+) diff --git a/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala b/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala index f6570d335757a..462e09466bfa6 100644 --- a/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala +++ b/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala @@ -599,6 +599,8 @@ private class PythonAccumulatorParam(@transient serverHost: String, serverPort: } else { // This happens on the master, where we pass the updates to Python through a socket val socket = new Socket(serverHost, serverPort) + // SPARK-2282: Immediately reuse closed sockets because we create one per task. + socket.setReuseAddress(true) val in = socket.getInputStream val out = new DataOutputStream(new BufferedOutputStream(socket.getOutputStream, bufferSize)) out.writeInt(val2.size) From 544880457de556d1ad52e8cb7e1eca19da95f517 Mon Sep 17 00:00:00 2001 From: Cheng Lian Date: Thu, 3 Jul 2014 23:41:54 -0700 Subject: [PATCH 08/18] [SPARK-2059][SQL] Don't throw TreeNodeException in `execution.ExplainCommand` This is a fix for the problem revealed by PR #1265. Currently `HiveComparisonSuite` ignores output of `ExplainCommand` since Catalyst query plan is quite different from Hive query plan. But exceptions throw from `CheckResolution` still breaks test cases. This PR catches any `TreeNodeException` and reports it as part of the query explanation. After merging this PR, PR #1265 can also be merged safely. For a normal query: ``` scala> hql("explain select key from src").foreach(println) ... [Physical execution plan:] [HiveTableScan [key#9], (MetastoreRelation default, src, None), None] ``` For a wrong query with unresolved attribute(s): ``` scala> hql("explain select kay from src").foreach(println) ... [Error occurred during query planning: ] [Unresolved attributes: 'kay, tree:] [Project ['kay]] [ LowerCaseSchema ] [ MetastoreRelation default, src, None] ``` Author: Cheng Lian Closes #1294 from liancheng/safe-explain and squashes the following commits: 4318911 [Cheng Lian] Don't throw TreeNodeException in `execution.ExplainCommand` --- .../scala/org/apache/spark/sql/execution/commands.scala | 9 ++++++--- 1 file changed, 6 insertions(+), 3 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/commands.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/commands.scala index acb1b0f4dc229..98d2f89c8ae71 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/commands.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/commands.scala @@ -19,9 +19,10 @@ package org.apache.spark.sql.execution import org.apache.spark.annotation.DeveloperApi import org.apache.spark.rdd.RDD -import org.apache.spark.sql.{SQLContext, Row} -import org.apache.spark.sql.catalyst.expressions.{GenericRow, Attribute} +import org.apache.spark.sql.catalyst.errors.TreeNodeException +import org.apache.spark.sql.catalyst.expressions.{Attribute, GenericRow} import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan +import org.apache.spark.sql.{Row, SQLContext} trait Command { /** @@ -86,8 +87,10 @@ case class ExplainCommand( extends LeafNode with Command { // Run through the optimizer to generate the physical plan. - override protected[sql] lazy val sideEffectResult: Seq[String] = { + override protected[sql] lazy val sideEffectResult: Seq[String] = try { "Physical execution plan:" +: context.executePlan(logicalPlan).executedPlan.toString.split("\n") + } catch { case cause: TreeNodeException[_] => + "Error occurred during query planning: " +: cause.getMessage.split("\n") } def execute(): RDD[Row] = { From d43415075b3468fe8aa56de5d2907d409bb96347 Mon Sep 17 00:00:00 2001 From: Prashant Sharma Date: Fri, 4 Jul 2014 00:05:27 -0700 Subject: [PATCH 09/18] [SPARK-1199][REPL] Remove VALId and use the original import style for defined classes. This is an alternate solution to #1176. Author: Prashant Sharma Closes #1179 from ScrapCodes/SPARK-1199/repl-fix-second-approach and squashes the following commits: 820b34b [Prashant Sharma] Here we generate two kinds of import wrappers based on whether it is a class or not. --- .../org/apache/spark/repl/SparkIMain.scala | 7 ++++-- .../org/apache/spark/repl/SparkImports.scala | 23 +++++++++++-------- .../org/apache/spark/repl/ReplSuite.scala | 12 ++++++++++ 3 files changed, 31 insertions(+), 11 deletions(-) diff --git a/repl/src/main/scala/org/apache/spark/repl/SparkIMain.scala b/repl/src/main/scala/org/apache/spark/repl/SparkIMain.scala index 7c83fa9d4d888..3842c291d0b7b 100644 --- a/repl/src/main/scala/org/apache/spark/repl/SparkIMain.scala +++ b/repl/src/main/scala/org/apache/spark/repl/SparkIMain.scala @@ -744,7 +744,7 @@ import org.apache.spark.util.Utils * * Read! Eval! Print! Some of that not yet centralized here. */ - class ReadEvalPrint(lineId: Int) { + class ReadEvalPrint(val lineId: Int) { def this() = this(freshLineId()) private var lastRun: Run = _ @@ -1241,7 +1241,10 @@ import org.apache.spark.util.Utils // old style beSilentDuring(parse(code)) foreach { ts => ts foreach { t => - withoutUnwrapping(logDebug(asCompactString(t))) + if (isShow || isShowRaw) + withoutUnwrapping(echo(asCompactString(t))) + else + withoutUnwrapping(logDebug(asCompactString(t))) } } } diff --git a/repl/src/main/scala/org/apache/spark/repl/SparkImports.scala b/repl/src/main/scala/org/apache/spark/repl/SparkImports.scala index 419796b68b113..bce5c74b9d0da 100644 --- a/repl/src/main/scala/org/apache/spark/repl/SparkImports.scala +++ b/repl/src/main/scala/org/apache/spark/repl/SparkImports.scala @@ -182,15 +182,26 @@ trait SparkImports { // ambiguity errors will not be generated. Also, quote // the name of the variable, so that we don't need to // handle quoting keywords separately. + case x: ClassHandler => + // I am trying to guess if the import is a defined class + // This is an ugly hack, I am not 100% sure of the consequences. + // Here we, let everything but "defined classes" use the import with val. + // The reason for this is, otherwise the remote executor tries to pull the + // classes involved and may fail. + for (imv <- x.definedNames) { + val objName = req.lineRep.readPath + code.append("import " + objName + ".INSTANCE" + req.accessPath + ".`" + imv + "`\n") + } + case x => for (imv <- x.definedNames) { if (currentImps contains imv) addWrapper() val objName = req.lineRep.readPath - val valName = "$VAL" + newValId(); + val valName = "$VAL" + req.lineRep.lineId if(!code.toString.endsWith(".`" + imv + "`;\n")) { // Which means already imported - code.append("val " + valName + " = " + objName + ".INSTANCE;\n") - code.append("import " + valName + req.accessPath + ".`" + imv + "`;\n") + code.append("val " + valName + " = " + objName + ".INSTANCE;\n") + code.append("import " + valName + req.accessPath + ".`" + imv + "`;\n") } // code.append("val " + valName + " = " + objName + ".INSTANCE;\n") // code.append("import " + valName + req.accessPath + ".`" + imv + "`;\n") @@ -211,10 +222,4 @@ trait SparkImports { private def membersAtPickler(sym: Symbol): List[Symbol] = beforePickler(sym.info.nonPrivateMembers.toList) - private var curValId = 0 - - private def newValId(): Int = { - curValId += 1 - curValId - } } diff --git a/repl/src/test/scala/org/apache/spark/repl/ReplSuite.scala b/repl/src/test/scala/org/apache/spark/repl/ReplSuite.scala index f4ba8d9cc079b..f2aa42dbcb4fc 100644 --- a/repl/src/test/scala/org/apache/spark/repl/ReplSuite.scala +++ b/repl/src/test/scala/org/apache/spark/repl/ReplSuite.scala @@ -235,6 +235,18 @@ class ReplSuite extends FunSuite { assertContains("res4: Array[Int] = Array(0, 0, 0, 0, 0)", output) } + test("SPARK-1199-simple-reproduce") { + val output = runInterpreter("local-cluster[1,1,512]", + """ + |case class Sum(exp: String, exp2: String) + |val a = Sum("A", "B") + |def b(a: Sum): String = a match { case Sum(_, _) => "Found Sum" } + |b(a) + """.stripMargin) + assertDoesNotContain("error:", output) + assertDoesNotContain("Exception", output) + } + if (System.getenv("MESOS_NATIVE_LIBRARY") != null) { test("running on Mesos") { val output = runInterpreter("localquiet", From 0bbe61223eda3f33bbf8992d2a8f0d47813f4873 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?baishuo=28=E7=99=BD=E7=A1=95=29?= Date: Fri, 4 Jul 2014 00:25:31 -0700 Subject: [PATCH 10/18] Update SQLConf.scala MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit use concurrent.ConcurrentHashMap instead of util.Collections.synchronizedMap Author: baishuo(白硕) Closes #1272 from baishuo/master and squashes the following commits: 51ec55d [baishuo(白硕)] Update SQLConf.scala 63da043 [baishuo(白硕)] Update SQLConf.scala 36b6dbd [baishuo(白硕)] Update SQLConf.scala 864faa0 [baishuo(白硕)] Update SQLConf.scala 593096b [baishuo(白硕)] Update SQLConf.scala 7304d9b [baishuo(白硕)] Update SQLConf.scala 843581c [baishuo(白硕)] Update SQLConf.scala 1d3e4a2 [baishuo(白硕)] Update SQLConf.scala 0740f28 [baishuo(白硕)] Update SQLConf.scala --- .../src/main/scala/org/apache/spark/sql/SQLConf.scala | 9 +++------ 1 file changed, 3 insertions(+), 6 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/SQLConf.scala b/sql/core/src/main/scala/org/apache/spark/sql/SQLConf.scala index 3b5abab969861..95ed0f28507fc 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/SQLConf.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/SQLConf.scala @@ -64,20 +64,17 @@ trait SQLConf { } def get(key: String): String = { - if (!settings.containsKey(key)) { - throw new NoSuchElementException(key) - } - settings.get(key) + Option(settings.get(key)).getOrElse(throw new NoSuchElementException(key)) } def get(key: String, defaultValue: String): String = { - if (!settings.containsKey(key)) defaultValue else settings.get(key) + Option(settings.get(key)).getOrElse(defaultValue) } def getAll: Array[(String, String)] = settings.asScala.toArray def getOption(key: String): Option[String] = { - if (!settings.containsKey(key)) None else Some(settings.get(key)) + Option(settings.get(key)) } def contains(key: String): Boolean = settings.containsKey(key) From b3e768e154bd7175db44c3ffc3d8f783f15ab776 Mon Sep 17 00:00:00 2001 From: Reynold Xin Date: Fri, 4 Jul 2014 00:53:41 -0700 Subject: [PATCH 11/18] [SPARK-2059][SQL] Add analysis checks This replaces #1263 with a test case. Author: Reynold Xin Author: Michael Armbrust Closes #1265 from rxin/sql-analysis-error and squashes the following commits: a639e01 [Reynold Xin] Added a test case for unresolved attribute analysis. 7371e1b [Reynold Xin] Merge pull request #1263 from marmbrus/analysisChecks 448c088 [Michael Armbrust] Add analysis checks --- .../spark/sql/catalyst/analysis/Analyzer.scala | 16 ++++++++++++++++ .../sql/catalyst/analysis/AnalysisSuite.scala | 8 ++++++++ 2 files changed, 24 insertions(+) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala index 4ebc0e70d946b..c7188469bfb86 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala @@ -17,6 +17,7 @@ package org.apache.spark.sql.catalyst.analysis +import org.apache.spark.sql.catalyst.errors.TreeNodeException import org.apache.spark.sql.catalyst.expressions._ import org.apache.spark.sql.catalyst.plans.logical._ import org.apache.spark.sql.catalyst.rules._ @@ -54,10 +55,25 @@ class Analyzer(catalog: Catalog, registry: FunctionRegistry, caseSensitive: Bool ResolveFunctions :: GlobalAggregates :: typeCoercionRules :_*), + Batch("Check Analysis", Once, + CheckResolution), Batch("AnalysisOperators", fixedPoint, EliminateAnalysisOperators) ) + /** + * Makes sure all attributes have been resolved. + */ + object CheckResolution extends Rule[LogicalPlan] { + def apply(plan: LogicalPlan): LogicalPlan = { + plan.transform { + case p if p.expressions.exists(!_.resolved) => + throw new TreeNodeException(p, + s"Unresolved attributes: ${p.expressions.filterNot(_.resolved).mkString(",")}") + } + } + } + /** * Replaces [[UnresolvedRelation]]s with concrete relations from the catalog. */ diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/AnalysisSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/AnalysisSuite.scala index 4c313585c6386..f14df8137683b 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/AnalysisSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/AnalysisSuite.scala @@ -19,6 +19,7 @@ package org.apache.spark.sql.catalyst.analysis import org.scalatest.FunSuite +import org.apache.spark.sql.catalyst.errors.TreeNodeException import org.apache.spark.sql.catalyst.plans.logical._ /* Implicit conversions */ @@ -34,4 +35,11 @@ class AnalysisSuite extends FunSuite { analyze(Project(Seq(UnresolvedAttribute("a")), testRelation)) === Project(testRelation.output, testRelation)) } + + test("throw errors for unresolved attributes during analysis") { + val e = intercept[TreeNodeException[_]] { + analyze(Project(Seq(UnresolvedAttribute("abcd")), testRelation)) + } + assert(e.getMessage().toLowerCase.contains("unresolved")) + } } From 5dadda86456e1d3918e320b83aec7e2f1352d95d Mon Sep 17 00:00:00 2001 From: Yanjie Gao Date: Fri, 4 Jul 2014 02:43:57 -0700 Subject: [PATCH 12/18] [SPARK-2234][SQL]Spark SQL basicOperators add Except operator Hi all, I want to submit a Except operator in basicOperators.scala In SQL case.SQL support two table do except operator. select * from table1 except select * from table2 This operator support the substract function .Return an table with the elements from `this` that are not in `other`.This operator should limit the input SparkPlan Seq only has two member.The check will later support JIRA:https://issues.apache.org/jira/browse/SPARK-2234 Author: Yanjie Gao Author: YanjieGao <396154235@qq.com> Author: root Author: gaoyanjie Closes #1151 from YanjieGao/patch-6 and squashes the following commits: f19f899 [YanjieGao] add a new blank line in basicoperators.scala 2ff7d73 [YanjieGao] resolve the identation in SqlParser and SparkStrategies fdb5227 [YanjieGao] Merge remote branch 'upstream/master' into patch-6 9940d19 [YanjieGao] make comment less than 100c 09c7413 [YanjieGao] pr 1151 SqlParser add cache ,basic Operator rename Except and modify comment b4b5867 [root] Merge remote branch 'upstream/master' into patch-6 b4c3869 [Yanjie Gao] change SparkStrategies Sparkcontext to SqlContext 7e0ec29 [Yanjie Gao] delete multi test 7e7c83f [Yanjie Gao] delete conflict except b01beb8 [YanjieGao] resolve conflict sparkstrategies and basicOperators 4dc8166 [YanjieGao] resolve conflict fa68a98 [Yanjie Gao] Update joins.scala 8e6bb00 [Yanjie Gao] delete conflict except dd9ba5e [Yanjie Gao] Update joins.scala a0d4e73 [Yanjie Gao] delete skew join 60f5ddd [Yanjie Gao] update less than 100c 0e72233 [Yanjie Gao] update SQLQuerySuite on master branch 7f916b5 [Yanjie Gao] update execution/basicOperators on master branch a28dece [Yanjie Gao] Update logical/basicOperators on master branch a639935 [Yanjie Gao] Update SparkStrategies.scala 3bf7def [Yanjie Gao] update SqlParser on master branch 26f833f [Yanjie Gao] update SparkStrategies.scala on master branch 8dd063f [Yanjie Gao] Update logical/basicOperators on master branch 9847dcf [Yanjie Gao] update SqlParser on masterbranch d6a4604 [Yanjie Gao] Update joins.scala 424c507 [Yanjie Gao] Update joins.scala 7680742 [Yanjie Gao] Update SqlParser.scala a7193d8 [gaoyanjie] [SPARK-2234][SQL]Spark SQL basicOperators add Except operator #1151 5c8a224 [Yanjie Gao] update the line less than 100c ee066b3 [Yanjie Gao] Update basicOperators.scala 32a80ab [Yanjie Gao] remove except in HiveQl cf232eb [Yanjie Gao] update 1comment 2space3 left.out f1ea3f3 [Yanjie Gao] remove comment 7ea9b91 [Yanjie Gao] remove annotation 7f3d613 [Yanjie Gao] update .map(_.copy()) 670a1bb [Yanjie Gao] Update HiveQl.scala 3fe7746 [Yanjie Gao] Update SQLQuerySuite.scala a36eb0a [Yanjie Gao] Update basicOperators.scala 7859e56 [Yanjie Gao] Update SparkStrategies.scala 052346d [Yanjie Gao] Subtract is conflict with Subtract(e1,e2) aab3785 [Yanjie Gao] Update SQLQuerySuite.scala 4bf80b1 [Yanjie Gao] update subtract to except 4bdd520 [Yanjie Gao] Update SqlParser.scala 2d4bfbd [Yanjie Gao] Update SQLQuerySuite.scala 0808921 [Yanjie Gao] SQLQuerySuite a8a1948 [Yanjie Gao] SparkStrategies 1fe96c0 [Yanjie Gao] HiveQl.scala update 3305e40 [Yanjie Gao] SqlParser 7a98c37 [Yanjie Gao] Update basicOperators.scala cf5b9d0 [Yanjie Gao] Update basicOperators.scala 8945835 [Yanjie Gao] object SkewJoin extends Strategy 2b98962 [Yanjie Gao] Update SqlParser.scala dd32980 [Yanjie Gao] update1 68815b2 [Yanjie Gao] Reformat the code style 4eb43ec [Yanjie Gao] Update basicOperators.scala aa06072 [Yanjie Gao] Reformat the code sytle --- .../org/apache/spark/sql/catalyst/SqlParser.scala | 3 +++ .../catalyst/plans/logical/basicOperators.scala | 6 ++++++ .../spark/sql/execution/SparkStrategies.scala | 2 ++ .../spark/sql/execution/basicOperators.scala | 15 +++++++++++++++ .../org/apache/spark/sql/SQLQuerySuite.scala | 14 ++++++++++++++ 5 files changed, 40 insertions(+) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/SqlParser.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/SqlParser.scala index 61762fa2a7c30..ecb11129557bd 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/SqlParser.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/SqlParser.scala @@ -118,6 +118,8 @@ class SqlParser extends StandardTokenParsers with PackratParsers { protected val UNCACHE = Keyword("UNCACHE") protected val UNION = Keyword("UNION") protected val WHERE = Keyword("WHERE") + protected val EXCEPT = Keyword("EXCEPT") + // Use reflection to find the reserved words defined in this class. protected val reservedWords = @@ -138,6 +140,7 @@ class SqlParser extends StandardTokenParsers with PackratParsers { protected lazy val query: Parser[LogicalPlan] = ( select * ( UNION ~ ALL ^^^ { (q1: LogicalPlan, q2: LogicalPlan) => Union(q1, q2) } | + EXCEPT ^^^ { (q1: LogicalPlan, q2: LogicalPlan) => Except(q1, q2)} | UNION ~ opt(DISTINCT) ^^^ { (q1: LogicalPlan, q2: LogicalPlan) => Distinct(Union(q1, q2)) } ) | insert | cache diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicOperators.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicOperators.scala index 3e0639867b278..bac5a724647f8 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicOperators.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicOperators.scala @@ -89,6 +89,12 @@ case class Join( } } +case class Except(left: LogicalPlan, right: LogicalPlan) extends BinaryNode { + def output = left.output + + def references = Set.empty +} + case class InsertIntoTable( table: LogicalPlan, partition: Map[String, Option[String]], diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala index 0925605b7c4d9..9e036e127bd56 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala @@ -273,6 +273,8 @@ private[sql] abstract class SparkStrategies extends QueryPlanner[SparkPlan] { execution.Limit(limit, planLater(child))(sqlContext) :: Nil case Unions(unionChildren) => execution.Union(unionChildren.map(planLater))(sqlContext) :: Nil + case logical.Except(left,right) => + execution.Except(planLater(left),planLater(right)) :: Nil case logical.Generate(generator, join, outer, _, child) => execution.Generate(generator, join = join, outer = outer, planLater(child)) :: Nil case logical.NoRelation => diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/basicOperators.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/basicOperators.scala index a278f1ca98476..4b59e0b4e58e4 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/basicOperators.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/basicOperators.scala @@ -205,3 +205,18 @@ object ExistingRdd { case class ExistingRdd(output: Seq[Attribute], rdd: RDD[Row]) extends LeafNode { override def execute() = rdd } + +/** + * :: DeveloperApi :: + * Returns a table with the elements from left that are not in right using + * the built-in spark subtract function. + */ +@DeveloperApi +case class Except(left: SparkPlan, right: SparkPlan) extends BinaryNode { + override def output = left.output + + override def execute() = { + left.execute().map(_.copy()).subtract(right.execute().map(_.copy())) + } +} + diff --git a/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala index 2c1cb1867010c..5c6701e203d17 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala @@ -371,6 +371,20 @@ class SQLQuerySuite extends QueryTest { (3, null))) } + test("EXCEPT") { + + checkAnswer( + sql("SELECT * FROM lowerCaseData EXCEPT SELECT * FROM upperCaseData "), + (1, "a") :: + (2, "b") :: + (3, "c") :: + (4, "d") :: Nil) + checkAnswer( + sql("SELECT * FROM lowerCaseData EXCEPT SELECT * FROM lowerCaseData "), Nil) + checkAnswer( + sql("SELECT * FROM upperCaseData EXCEPT SELECT * FROM upperCaseData "), Nil) + } + test("SET commands semantics using sql()") { TestSQLContext.settings.synchronized { clear() From fc716589384f70096ba2fe1eb6a4131c61b21c7c Mon Sep 17 00:00:00 2001 From: Patrick Wendell Date: Fri, 4 Jul 2014 10:01:19 -0700 Subject: [PATCH 13/18] HOTFIX: Clean before building docs during release. If the docs are built after a Maven build has finished the intermediate state somehow causes a compiler bug during sbt compilation. This just does a clean before attempting to build the docs. --- dev/create-release/create-release.sh | 1 + 1 file changed, 1 insertion(+) diff --git a/dev/create-release/create-release.sh b/dev/create-release/create-release.sh index c96afe822c6a9..49bf78f60763a 100755 --- a/dev/create-release/create-release.sh +++ b/dev/create-release/create-release.sh @@ -125,6 +125,7 @@ scp spark-* \ # Docs cd spark +sbt/sbt clean cd docs PRODUCTION=1 jekyll build echo "Copying release documentation" From 0db5d5a22ef8dd6e3fbb2759cd25aac46bdf2ce9 Mon Sep 17 00:00:00 2001 From: Reynold Xin Date: Fri, 4 Jul 2014 17:33:07 -0700 Subject: [PATCH 14/18] Added SignalLogger to HistoryServer. This was omitted in #1260. @aarondav Author: Reynold Xin Closes #1300 from rxin/historyServer and squashes the following commits: af720a3 [Reynold Xin] Added SignalLogger to HistoryServer. --- .../org/apache/spark/deploy/history/HistoryServer.scala | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/deploy/history/HistoryServer.scala b/core/src/main/scala/org/apache/spark/deploy/history/HistoryServer.scala index 29a78a56c8ed5..56b38ddfc9313 100644 --- a/core/src/main/scala/org/apache/spark/deploy/history/HistoryServer.scala +++ b/core/src/main/scala/org/apache/spark/deploy/history/HistoryServer.scala @@ -27,7 +27,7 @@ import org.apache.spark.{Logging, SecurityManager, SparkConf} import org.apache.spark.deploy.SparkHadoopUtil import org.apache.spark.ui.{WebUI, SparkUI, UIUtils} import org.apache.spark.ui.JettyUtils._ -import org.apache.spark.util.Utils +import org.apache.spark.util.{SignalLogger, Utils} /** * A web server that renders SparkUIs of completed applications. @@ -169,10 +169,11 @@ class HistoryServer( * * This launches the HistoryServer as a Spark daemon. */ -object HistoryServer { +object HistoryServer extends Logging { private val conf = new SparkConf def main(argStrings: Array[String]) { + SignalLogger.register(log) initSecurity() val args = new HistoryServerArguments(conf, argStrings) val securityManager = new SecurityManager(conf) From 9d006c97371ddf357e0b821d5c6d1535d9b6fe41 Mon Sep 17 00:00:00 2001 From: Michael Armbrust Date: Fri, 4 Jul 2014 19:15:48 -0700 Subject: [PATCH 15/18] [SPARK-2370][SQL] Decrease metadata retrieved for partitioned hive queries. Author: Michael Armbrust Closes #1305 from marmbrus/usePrunerPartitions and squashes the following commits: 744aa20 [Michael Armbrust] Use getAllPartitionsForPruner instead of getPartitions, which avoids retrieving auth data --- .../scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala index 90eacf4268780..7c24b5cabf614 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala @@ -53,7 +53,7 @@ private[hive] class HiveMetastoreCatalog(hive: HiveContext) extends Catalog with val table = client.getTable(databaseName, tableName) val partitions: Seq[Partition] = if (table.isPartitioned) { - client.getPartitions(table) + client.getAllPartitionsForPruner(table).toSeq } else { Nil } From 42f3abd529e84f3b26386abe2bde30666e74b64e Mon Sep 17 00:00:00 2001 From: "ankit.bhardwaj" Date: Fri, 4 Jul 2014 22:06:10 -0700 Subject: [PATCH 16/18] [SPARK-2306]:BoundedPriorityQueue is private and not registered with Kry... Due to the non registration of BoundedPriorityQueue with kryoserializer, operations which are dependend on BoundedPriorityQueue are giving exceptions.One such instance is using top along with kryo serialization. Fixed the issue by registering BoundedPriorityQueue with kryoserializer. Author: ankit.bhardwaj Closes #1299 from AnkitBhardwaj12/BoundedPriorityQueueWithKryoIssue and squashes the following commits: a4ae8ed [ankit.bhardwaj] [SPARK-2306]:BoundedPriorityQueue is private and not registered with Kryo --- .../scala/org/apache/spark/serializer/KryoSerializer.scala | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/core/src/main/scala/org/apache/spark/serializer/KryoSerializer.scala b/core/src/main/scala/org/apache/spark/serializer/KryoSerializer.scala index 82b62aaf61521..1ce4243194798 100644 --- a/core/src/main/scala/org/apache/spark/serializer/KryoSerializer.scala +++ b/core/src/main/scala/org/apache/spark/serializer/KryoSerializer.scala @@ -30,6 +30,7 @@ import org.apache.spark.broadcast.HttpBroadcast import org.apache.spark.scheduler.MapStatus import org.apache.spark.storage._ import org.apache.spark.storage.{GetBlock, GotBlock, PutBlock} +import org.apache.spark.util.BoundedPriorityQueue import scala.reflect.ClassTag @@ -183,7 +184,8 @@ private[serializer] object KryoSerializer { classOf[GetBlock], classOf[MapStatus], classOf[BlockManagerId], - classOf[Array[Byte]] + classOf[Array[Byte]], + classOf[BoundedPriorityQueue[_]] ) } From 3da8df939ec63064692ba64d9188aeea908b305c Mon Sep 17 00:00:00 2001 From: Takuya UESHIN Date: Sat, 5 Jul 2014 11:48:08 -0700 Subject: [PATCH 17/18] [SPARK-2366] [SQL] Add column pruning for the right side of LeftSemi join. The right side of `LeftSemi` join needs columns only used in join condition. Author: Takuya UESHIN Closes #1301 from ueshin/issues/SPARK-2366 and squashes the following commits: 7677a39 [Takuya UESHIN] Update comments. 786d3a0 [Takuya UESHIN] Rename method name. e0957b1 [Takuya UESHIN] Add column pruning for the right side of LeftSemi join. --- .../sql/catalyst/optimizer/Optimizer.scala | 28 +++++++++++++------ 1 file changed, 20 insertions(+), 8 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala index fb517e40677ec..48ca31e70fe7c 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala @@ -52,6 +52,7 @@ object Optimizer extends RuleExecutor[LogicalPlan] { * - Inserting Projections beneath the following operators: * - Aggregate * - Project <- Join + * - LeftSemiJoin * - Collapse adjacent projections, performing alias substitution. */ object ColumnPruning extends Rule[LogicalPlan] { @@ -62,19 +63,22 @@ object ColumnPruning extends Rule[LogicalPlan] { // Eliminate unneeded attributes from either side of a Join. case Project(projectList, Join(left, right, joinType, condition)) => - // Collect the list of off references required either above or to evaluate the condition. + // Collect the list of all references required either above or to evaluate the condition. val allReferences: Set[Attribute] = projectList.flatMap(_.references).toSet ++ condition.map(_.references).getOrElse(Set.empty) /** Applies a projection only when the child is producing unnecessary attributes */ - def prunedChild(c: LogicalPlan) = - if ((c.outputSet -- allReferences.filter(c.outputSet.contains)).nonEmpty) { - Project(allReferences.filter(c.outputSet.contains).toSeq, c) - } else { - c - } + def pruneJoinChild(c: LogicalPlan) = prunedChild(c, allReferences) - Project(projectList, Join(prunedChild(left), prunedChild(right), joinType, condition)) + Project(projectList, Join(pruneJoinChild(left), pruneJoinChild(right), joinType, condition)) + + // Eliminate unneeded attributes from right side of a LeftSemiJoin. + case Join(left, right, LeftSemi, condition) => + // Collect the list of all references required to evaluate the condition. + val allReferences: Set[Attribute] = + condition.map(_.references).getOrElse(Set.empty) + + Join(left, prunedChild(right, allReferences), LeftSemi, condition) // Combine adjacent Projects. case Project(projectList1, Project(projectList2, child)) => @@ -97,6 +101,14 @@ object ColumnPruning extends Rule[LogicalPlan] { // Eliminate no-op Projects case Project(projectList, child) if child.output == projectList => child } + + /** Applies a projection only when the child is producing unnecessary attributes */ + private def prunedChild(c: LogicalPlan, allReferences: Set[Attribute]) = + if ((c.outputSet -- allReferences.filter(c.outputSet.contains)).nonEmpty) { + Project(allReferences.filter(c.outputSet.contains).toSeq, c) + } else { + c + } } /** From 9d5ecf8205b924dc8a3c13fed68beb78cc5c7553 Mon Sep 17 00:00:00 2001 From: Takuya UESHIN Date: Sat, 5 Jul 2014 11:51:48 -0700 Subject: [PATCH 18/18] [SPARK-2327] [SQL] Fix nullabilities of Join/Generate/Aggregate. Fix nullabilities of `Join`/`Generate`/`Aggregate` because: - Output attributes of opposite side of `OuterJoin` should be nullable. - Output attributes of generater side of `Generate` should be nullable if `join` is `true` and `outer` is `true`. - `AttributeReference` of `computedAggregates` of `Aggregate` should be the same as `aggregateExpression`'s. Author: Takuya UESHIN Closes #1266 from ueshin/issues/SPARK-2327 and squashes the following commits: 3ace83a [Takuya UESHIN] Add withNullability to Attribute and use it to change nullabilities. df1ae53 [Takuya UESHIN] Modify nullabilize to leave attribute if not resolved. 799ce56 [Takuya UESHIN] Add nullabilization to Generate of SparkPlan. a0fc9bc [Takuya UESHIN] Fix scalastyle errors. 0e31e37 [Takuya UESHIN] Fix Aggregate resultAttribute nullabilities. 09532ec [Takuya UESHIN] Fix Generate output nullabilities. f20f196 [Takuya UESHIN] Fix Join output nullabilities. --- .../sql/catalyst/analysis/unresolved.scala | 2 ++ .../catalyst/expressions/BoundAttribute.scala | 16 +++++----- .../expressions/namedExpressions.scala | 3 +- .../plans/logical/basicOperators.scala | 31 ++++++++++++++----- .../spark/sql/execution/Aggregate.scala | 4 +-- .../apache/spark/sql/execution/Generate.scala | 12 +++++-- .../apache/spark/sql/execution/joins.scala | 13 +++++++- 7 files changed, 60 insertions(+), 21 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/unresolved.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/unresolved.scala index d629172a7426e..7abeb032964e1 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/unresolved.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/unresolved.scala @@ -52,6 +52,7 @@ case class UnresolvedAttribute(name: String) extends Attribute with trees.LeafNo override lazy val resolved = false override def newInstance = this + override def withNullability(newNullability: Boolean) = this override def withQualifiers(newQualifiers: Seq[String]) = this // Unresolved attributes are transient at compile time and don't get evaluated during execution. @@ -95,6 +96,7 @@ case class Star( override lazy val resolved = false override def newInstance = this + override def withNullability(newNullability: Boolean) = this override def withQualifiers(newQualifiers: Seq[String]) = this def expand(input: Seq[Attribute]): Seq[NamedExpression] = { diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/BoundAttribute.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/BoundAttribute.scala index 655d4a08fe93b..9ce1f01056462 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/BoundAttribute.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/BoundAttribute.scala @@ -33,14 +33,16 @@ case class BoundReference(ordinal: Int, baseReference: Attribute) type EvaluatedType = Any - def nullable = baseReference.nullable - def dataType = baseReference.dataType - def exprId = baseReference.exprId - def qualifiers = baseReference.qualifiers - def name = baseReference.name + override def nullable = baseReference.nullable + override def dataType = baseReference.dataType + override def exprId = baseReference.exprId + override def qualifiers = baseReference.qualifiers + override def name = baseReference.name - def newInstance = BoundReference(ordinal, baseReference.newInstance) - def withQualifiers(newQualifiers: Seq[String]) = + override def newInstance = BoundReference(ordinal, baseReference.newInstance) + override def withNullability(newNullability: Boolean) = + BoundReference(ordinal, baseReference.withNullability(newNullability)) + override def withQualifiers(newQualifiers: Seq[String]) = BoundReference(ordinal, baseReference.withQualifiers(newQualifiers)) override def toString = s"$baseReference:$ordinal" diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/namedExpressions.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/namedExpressions.scala index 66ae22e95b60e..934bad8c27294 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/namedExpressions.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/namedExpressions.scala @@ -57,6 +57,7 @@ abstract class NamedExpression extends Expression { abstract class Attribute extends NamedExpression { self: Product => + def withNullability(newNullability: Boolean): Attribute def withQualifiers(newQualifiers: Seq[String]): Attribute def toAttribute = this @@ -133,7 +134,7 @@ case class AttributeReference(name: String, dataType: DataType, nullable: Boolea /** * Returns a copy of this [[AttributeReference]] with changed nullability. */ - def withNullability(newNullability: Boolean) = { + override def withNullability(newNullability: Boolean) = { if (nullable == newNullability) { this } else { diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicOperators.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicOperators.scala index bac5a724647f8..0728fa73fb108 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicOperators.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicOperators.scala @@ -18,7 +18,7 @@ package org.apache.spark.sql.catalyst.plans.logical import org.apache.spark.sql.catalyst.expressions._ -import org.apache.spark.sql.catalyst.plans.{LeftSemi, JoinType} +import org.apache.spark.sql.catalyst.plans._ import org.apache.spark.sql.catalyst.types._ case class Project(projectList: Seq[NamedExpression], child: LogicalPlan) extends UnaryNode { @@ -46,10 +46,16 @@ case class Generate( child: LogicalPlan) extends UnaryNode { - protected def generatorOutput: Seq[Attribute] = - alias + protected def generatorOutput: Seq[Attribute] = { + val output = alias .map(a => generator.output.map(_.withQualifiers(a :: Nil))) .getOrElse(generator.output) + if (join && outer) { + output.map(_.withNullability(true)) + } else { + output + } + } override def output = if (join) child.output ++ generatorOutput else generatorOutput @@ -81,11 +87,20 @@ case class Join( condition: Option[Expression]) extends BinaryNode { override def references = condition.map(_.references).getOrElse(Set.empty) - override def output = joinType match { - case LeftSemi => - left.output - case _ => - left.output ++ right.output + + override def output = { + joinType match { + case LeftSemi => + left.output + case LeftOuter => + left.output ++ right.output.map(_.withNullability(true)) + case RightOuter => + left.output.map(_.withNullability(true)) ++ right.output + case FullOuter => + left.output.map(_.withNullability(true)) ++ right.output.map(_.withNullability(true)) + case _ => + left.output ++ right.output + } } } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/Aggregate.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/Aggregate.scala index d85d2d7844e0b..c1ced8bfa404a 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/Aggregate.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/Aggregate.scala @@ -83,8 +83,8 @@ case class Aggregate( case a: AggregateExpression => ComputedAggregate( a, - BindReferences.bindReference(a, childOutput).asInstanceOf[AggregateExpression], - AttributeReference(s"aggResult:$a", a.dataType, nullable = true)()) + BindReferences.bindReference(a, childOutput), + AttributeReference(s"aggResult:$a", a.dataType, a.nullable)()) } }.toArray diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/Generate.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/Generate.scala index da1e08be59e23..47b3d00262dbb 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/Generate.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/Generate.scala @@ -18,7 +18,7 @@ package org.apache.spark.sql.execution import org.apache.spark.annotation.DeveloperApi -import org.apache.spark.sql.catalyst.expressions.{Generator, JoinedRow, Literal, Projection} +import org.apache.spark.sql.catalyst.expressions._ /** * :: DeveloperApi :: @@ -39,8 +39,16 @@ case class Generate( child: SparkPlan) extends UnaryNode { + protected def generatorOutput: Seq[Attribute] = { + if (join && outer) { + generator.output.map(_.withNullability(true)) + } else { + generator.output + } + } + override def output = - if (join) child.output ++ generator.output else generator.output + if (join) child.output ++ generatorOutput else generatorOutput override def execute() = { if (join) { diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins.scala index 32c5f26fe8aa0..7d1f11caae838 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins.scala @@ -319,7 +319,18 @@ case class BroadcastNestedLoopJoin( override def otherCopyArgs = sqlContext :: Nil - def output = left.output ++ right.output + override def output = { + joinType match { + case LeftOuter => + left.output ++ right.output.map(_.withNullability(true)) + case RightOuter => + left.output.map(_.withNullability(true)) ++ right.output + case FullOuter => + left.output.map(_.withNullability(true)) ++ right.output.map(_.withNullability(true)) + case _ => + left.output ++ right.output + } + } /** The Streamed Relation */ def left = streamed