From 2bea0709f9a1597f3c0dcc68d7c0536f465b3640 Mon Sep 17 00:00:00 2001 From: Xiangrui Meng Date: Wed, 12 Feb 2014 10:47:52 -0800 Subject: [PATCH 01/12] Merge pull request #589 from mengxr/index. SPARK-1076: Convert Int to Long to avoid overflow Patch for PR #578. Author: Xiangrui Meng Closes #589 and squashes the following commits: 98c435e [Xiangrui Meng] cast Int to Long to avoid Int overflow --- core/src/main/scala/org/apache/spark/rdd/RDD.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/core/src/main/scala/org/apache/spark/rdd/RDD.scala b/core/src/main/scala/org/apache/spark/rdd/RDD.scala index ec8e311aff5df..d4fc28f55137b 100644 --- a/core/src/main/scala/org/apache/spark/rdd/RDD.scala +++ b/core/src/main/scala/org/apache/spark/rdd/RDD.scala @@ -873,7 +873,7 @@ abstract class RDD[T: ClassTag]( * won't trigger a spark job, which is different from [[org.apache.spark.rdd.RDD#zipWithIndex]]. */ def zipWithUniqueId(): RDD[(T, Long)] = { - val n = this.partitions.size + val n = this.partitions.size.toLong this.mapPartitionsWithIndex { case (k, iter) => iter.zipWithIndex.map { case (item, i) => (item, i * n + k) From 7e29e02791a34c0aebdf1b32a522f388efd6cadc Mon Sep 17 00:00:00 2001 From: Xiangrui Meng Date: Wed, 12 Feb 2014 16:26:25 -0800 Subject: [PATCH 02/12] Merge pull request #591 from mengxr/transient-new. SPARK-1076: [Fix #578] add @transient to some vals I'll try to be more careful next time. Author: Xiangrui Meng Closes #591 and squashes the following commits: 2b4f044 [Xiangrui Meng] add @transient to prev in ZippedWithIndexRDD add @transient to seed in PartitionwiseSampledRDD --- .../scala/org/apache/spark/rdd/PartitionwiseSampledRDD.scala | 2 +- .../main/scala/org/apache/spark/rdd/ZippedWithIndexRDD.scala | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/rdd/PartitionwiseSampledRDD.scala b/core/src/main/scala/org/apache/spark/rdd/PartitionwiseSampledRDD.scala index 629f7074c17c5..a74309d861318 100644 --- a/core/src/main/scala/org/apache/spark/rdd/PartitionwiseSampledRDD.scala +++ b/core/src/main/scala/org/apache/spark/rdd/PartitionwiseSampledRDD.scala @@ -45,7 +45,7 @@ class PartitionwiseSampledRDDPartition(val prev: Partition, val seed: Long) class PartitionwiseSampledRDD[T: ClassTag, U: ClassTag]( prev: RDD[T], sampler: RandomSampler[T, U], - seed: Long = System.nanoTime) + @transient seed: Long = System.nanoTime) extends RDD[U](prev) { override def getPartitions: Array[Partition] = { diff --git a/core/src/main/scala/org/apache/spark/rdd/ZippedWithIndexRDD.scala b/core/src/main/scala/org/apache/spark/rdd/ZippedWithIndexRDD.scala index 5e08a469ee7bd..38dc114d80812 100644 --- a/core/src/main/scala/org/apache/spark/rdd/ZippedWithIndexRDD.scala +++ b/core/src/main/scala/org/apache/spark/rdd/ZippedWithIndexRDD.scala @@ -37,7 +37,7 @@ class ZippedWithIndexRDDPartition(val prev: Partition, val startIndex: Long) * @tparam T parent RDD item type */ private[spark] -class ZippedWithIndexRDD[T: ClassTag](prev: RDD[T]) extends RDD[(T, Long)](prev) { +class ZippedWithIndexRDD[T: ClassTag](@transient prev: RDD[T]) extends RDD[(T, Long)](prev) { override def getPartitions: Array[Partition] = { val n = prev.partitions.size From 7fe7a55c820c6669c4ecccaa8599d05aec1b64be Mon Sep 17 00:00:00 2001 From: Reynold Xin Date: Wed, 12 Feb 2014 22:35:09 -0800 Subject: [PATCH 03/12] Merge pull request #592 from rxin/test. SPARK-1088: Create a script for running tests so we can have version specific testing on Jenkins. @pwendell Author: Reynold Xin Closes #592 and squashes the following commits: be02359 [Reynold Xin] SPARK-1088: Create a script for running tests so we can have version specific testing on Jenkins. --- dev/run-tests | 46 ++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 46 insertions(+) create mode 100755 dev/run-tests diff --git a/dev/run-tests b/dev/run-tests new file mode 100755 index 0000000000000..d65a397b4c8c7 --- /dev/null +++ b/dev/run-tests @@ -0,0 +1,46 @@ +#!/usr/bin/env bash + +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +# Go to the Spark project root directory +FWDIR="$(cd `dirname $0`/..; pwd)" +cd $FWDIR + +# Remove work directory +rm -rf ./work + +# Fail fast +set -e + +echo "=========================================================================" +echo "Running Scala style checks" +echo "=========================================================================" +sbt/sbt clean scalastyle + +echo "=========================================================================" +echo "Running Spark unit tests" +echo "=========================================================================" +sbt/sbt assembly test + +echo "=========================================================================" +echo "Running PySpark tests" +echo "=========================================================================" +if [ -z "$PYSPARK_PYTHON" ]; then + export PYSPARK_PYTHON=/usr/local/bin/python2.7 +fi +./python/run-tests From 6ee0ad8fba660b48ef32dfa2f015b59cd5353a6e Mon Sep 17 00:00:00 2001 From: Andrew Ash Date: Wed, 12 Feb 2014 23:23:06 -0800 Subject: [PATCH 04/12] SPARK-1073 Keep GitHub pull request title as commit summary The first line of a git commit message is the line that's used with many git tools as the most concise textual description of that message. The most common use that I see is in the short log, which is a one line per commit log of recent commits. This commit moves the line Merge pull request #%s from %s. Lower into the message to reserve the first line of the resulting commit for the much more important pull request title. http://tbaggery.com/2008/04/19/a-note-about-git-commit-messages.html Author: Andrew Ash Closes #574 from ash211/gh-pr-merge-title and squashes the following commits: b240823 [Andrew Ash] More merge_message improvements d2986db [Andrew Ash] Keep GitHub pull request title as commit summary --- dev/merge_spark_pr.py | 19 ++++++++++--------- 1 file changed, 10 insertions(+), 9 deletions(-) diff --git a/dev/merge_spark_pr.py b/dev/merge_spark_pr.py index 03f8fc28938e8..93621c96daf2d 100755 --- a/dev/merge_spark_pr.py +++ b/dev/merge_spark_pr.py @@ -96,19 +96,20 @@ def merge_pr(pr_num, target_ref): commits = run_cmd(['git', 'log', 'HEAD..%s' % pr_branch_name, '--pretty=format:%h [%an] %s']).split("\n\n") - merge_message = "Merge pull request #%s from %s.\n\n%s\n\n%s" % ( - pr_num, pr_repo_desc, title, body) - merge_message_parts = merge_message.split("\n\n") merge_message_flags = [] - for p in merge_message_parts: - merge_message_flags = merge_message_flags + ["-m", p] + for p in [title, body]: + merge_message_flags += ["-m", p] + authors = "\n".join(["Author: %s" % a for a in distinct_authors]) - merge_message_flags = merge_message_flags + ["-m", authors] - merge_message_flags = merge_message_flags + [ - "-m", "Closes #%s and squashes the following commits:" % pr_num] + + merge_message_flags += ["-m", authors] + + # The string "Closes #%s" string is required for GitHub to correctly close the PR + merge_message_flags += ["-m", + "Closes #%s from %s and squashes the following commits:" % (pr_num, pr_repo_desc)] for c in commits: - merge_message_flags = merge_message_flags + ["-m", c] + merge_message_flags += ["-m", c] run_cmd(['git', 'commit', '--author="%s"' % primary_author] + merge_message_flags) From a3bb86179e452d348f7e8bd3859befb3ff1f4df1 Mon Sep 17 00:00:00 2001 From: Bijay Bisht Date: Wed, 12 Feb 2014 23:42:10 -0800 Subject: [PATCH 05/12] Ported hadoopClient jar for < 1.0.1 fix #522 got messed after i rewrote the branch hadoop_jar_name. So created a new one. Author: Bijay Bisht Closes #584 from bijaybisht/hadoop_jar_name_on_0.9.0 and squashes the following commits: 1b6fb3c [Bijay Bisht] Ported hadoopClient jar for < 1.0.1 fix (cherry picked from commit 8093de1bb319e86dcf0d6d8d97b043a2bc1aa8f2) Signed-off-by: Patrick Wendell --- project/SparkBuild.scala | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/project/SparkBuild.scala b/project/SparkBuild.scala index bb79f0cd73e2b..74bad66cfd018 100644 --- a/project/SparkBuild.scala +++ b/project/SparkBuild.scala @@ -86,6 +86,7 @@ object SparkBuild extends Build { case None => DEFAULT_YARN case Some(v) => v.toBoolean } + lazy val hadoopClient = if (hadoopVersion.startsWith("0.20.") || hadoopVersion == "1.0.0") "hadoop-core" else "hadoop-client" // Conditionally include the yarn sub-project lazy val yarnAlpha = Project("yarn-alpha", file("yarn/alpha"), settings = yarnAlphaSettings) dependsOn(core) @@ -269,7 +270,7 @@ object SparkBuild extends Build { "org.apache.mesos" % "mesos" % "0.13.0", "net.java.dev.jets3t" % "jets3t" % "0.7.1", "org.apache.derby" % "derby" % "10.4.2.0" % "test", - "org.apache.hadoop" % "hadoop-client" % hadoopVersion excludeAll(excludeJackson, excludeNetty, excludeAsm, excludeCglib), + "org.apache.hadoop" % hadoopClient % hadoopVersion excludeAll(excludeJackson, excludeNetty, excludeAsm, excludeCglib), "org.apache.avro" % "avro" % "1.7.4", "org.apache.avro" % "avro-ipc" % "1.7.4" excludeAll(excludeNetty), "org.apache.zookeeper" % "zookeeper" % "3.4.5" excludeAll(excludeNetty), @@ -373,7 +374,7 @@ object SparkBuild extends Build { def yarnEnabledSettings = Seq( libraryDependencies ++= Seq( // Exclude rule required for all ? - "org.apache.hadoop" % "hadoop-client" % hadoopVersion excludeAll(excludeJackson, excludeNetty, excludeAsm, excludeCglib), + "org.apache.hadoop" % hadoopClient % hadoopVersion excludeAll(excludeJackson, excludeNetty, excludeAsm, excludeCglib), "org.apache.hadoop" % "hadoop-yarn-api" % hadoopVersion excludeAll(excludeJackson, excludeNetty, excludeAsm, excludeCglib), "org.apache.hadoop" % "hadoop-yarn-common" % hadoopVersion excludeAll(excludeJackson, excludeNetty, excludeAsm, excludeCglib), "org.apache.hadoop" % "hadoop-yarn-client" % hadoopVersion excludeAll(excludeJackson, excludeNetty, excludeAsm, excludeCglib) From 5fa53c02fc89af7328a659045c954d72bf0b8664 Mon Sep 17 00:00:00 2001 From: Christian Lundgren Date: Thu, 13 Feb 2014 12:44:21 -0800 Subject: [PATCH 06/12] Add c3 instance types to Spark EC2 The number of disks for the c3 instance types taken from here: http://docs.aws.amazon.com/AWSEC2/latest/UserGuide/InstanceStorage.html#StorageOnInstanceTypes Author: Christian Lundgren Closes #595 from chrisavl/branch-0.9 and squashes the following commits: c8af5f9 [Christian Lundgren] Add c3 instance types to Spark EC2 (cherry picked from commit 19b4bb2b444f1dbc4592bf3d58b17652e0ae6d6b) Signed-off-by: Patrick Wendell --- ec2/spark_ec2.py | 14 ++++++++++++-- 1 file changed, 12 insertions(+), 2 deletions(-) diff --git a/ec2/spark_ec2.py b/ec2/spark_ec2.py index e7cb5ab3ff9b0..eea63fea2af4f 100755 --- a/ec2/spark_ec2.py +++ b/ec2/spark_ec2.py @@ -189,7 +189,12 @@ def get_spark_ami(opts): "i2.xlarge": "hvm", "i2.2xlarge": "hvm", "i2.4xlarge": "hvm", - "i2.8xlarge": "hvm" + "i2.8xlarge": "hvm", + "c3.large": "pvm", + "c3.xlarge": "pvm", + "c3.2xlarge": "pvm", + "c3.4xlarge": "pvm", + "c3.8xlarge": "pvm" } if opts.instance_type in instance_types: instance_type = instance_types[opts.instance_type] @@ -486,7 +491,12 @@ def get_num_disks(instance_type): "i2.xlarge": 1, "i2.2xlarge": 2, "i2.4xlarge": 4, - "i2.8xlarge": 8 + "i2.8xlarge": 8, + "c3.large": 2, + "c3.xlarge": 2, + "c3.2xlarge": 2, + "c3.4xlarge": 2, + "c3.8xlarge": 2 } if instance_type in disks_by_instance: return disks_by_instance[instance_type] From 2414ed310ef6424894c37e8c2e8c461cbf880c78 Mon Sep 17 00:00:00 2001 From: Shivaram Venkataraman Date: Thu, 13 Feb 2014 14:26:06 -0800 Subject: [PATCH 07/12] Merge pull request #598 from shivaram/master. Update spark_ec2 to use 0.9.0 by default Backports change from branch-0.9 Author: Shivaram Venkataraman Closes #598 and squashes the following commits: f6d3ed0 [Shivaram Venkataraman] Update spark_ec2 to use 0.9.0 by default Backports change from branch-0.9 --- ec2/spark_ec2.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/ec2/spark_ec2.py b/ec2/spark_ec2.py index eea63fea2af4f..e88f80aa62627 100755 --- a/ec2/spark_ec2.py +++ b/ec2/spark_ec2.py @@ -70,7 +70,7 @@ def parse_args(): "slaves across multiple (an additional $0.01/Gb for bandwidth" + "between zones applies)") parser.add_option("-a", "--ami", help="Amazon Machine Image ID to use") - parser.add_option("-v", "--spark-version", default="0.8.0", + parser.add_option("-v", "--spark-version", default="0.9.0", help="Version of Spark to use: 'X.Y.Z' or a specific git hash") parser.add_option("--spark-git-repo", default="https://github.com/apache/incubator-spark", @@ -157,7 +157,7 @@ def is_active(instance): # Return correct versions of Spark and Shark, given the supplied Spark version def get_spark_shark_version(opts): - spark_shark_map = {"0.7.3": "0.7.1", "0.8.0": "0.8.0"} + spark_shark_map = {"0.7.3": "0.7.1", "0.8.0": "0.8.0", "0.8.1": "0.8.1", "0.9.0": "0.9.0"} version = opts.spark_version.replace("v", "") if version not in spark_shark_map: print >> stderr, "Don't know about Spark version: %s" % version From eec4bd1a1731dc84a8de70a2a12251ee134f2296 Mon Sep 17 00:00:00 2001 From: Andrew Ash Date: Fri, 14 Feb 2014 10:01:01 -0800 Subject: [PATCH 08/12] Typo: Standlone -> Standalone Author: Andrew Ash Closes #601 from ash211/typo and squashes the following commits: 9cd43ac [Andrew Ash] Change docs references to metrics.properties, not metrics.conf 3813ff1 [Andrew Ash] Typo: mulitcast -> multicast 873bd2f [Andrew Ash] Typo: Standlone -> Standalone --- conf/metrics.properties.template | 2 +- docs/monitoring.md | 6 +++--- docs/spark-standalone.md | 2 +- 3 files changed, 5 insertions(+), 5 deletions(-) diff --git a/conf/metrics.properties.template b/conf/metrics.properties.template index 1c3d94e1b0831..30bcab0c93302 100644 --- a/conf/metrics.properties.template +++ b/conf/metrics.properties.template @@ -67,7 +67,7 @@ # period 10 Poll period # unit seconds Units of poll period # ttl 1 TTL of messages sent by Ganglia -# mode multicast Ganglia network mode ('unicast' or 'mulitcast') +# mode multicast Ganglia network mode ('unicast' or 'multicast') # org.apache.spark.metrics.sink.JmxSink diff --git a/docs/monitoring.md b/docs/monitoring.md index 0d5eb7065e9f0..e9b1d2b2f4ffb 100644 --- a/docs/monitoring.md +++ b/docs/monitoring.md @@ -19,7 +19,7 @@ You can access this interface by simply opening `http://:4040` in a If multiple SparkContexts are running on the same host, they will bind to succesive ports beginning with 4040 (4041, 4042, etc). -Spark's Standlone Mode cluster manager also has its own +Spark's Standalone Mode cluster manager also has its own [web UI](spark-standalone.html#monitoring-and-logging). Note that in both of these UIs, the tables are sortable by clicking their headers, @@ -31,7 +31,7 @@ Spark has a configurable metrics system based on the [Coda Hale Metrics Library](http://metrics.codahale.com/). This allows users to report Spark metrics to a variety of sinks including HTTP, JMX, and CSV files. The metrics system is configured via a configuration file that Spark expects to be present -at `$SPARK_HOME/conf/metrics.conf`. A custom file location can be specified via the +at `$SPARK_HOME/conf/metrics.properties`. A custom file location can be specified via the `spark.metrics.conf` [configuration property](configuration.html#spark-properties). Spark's metrics are decoupled into different _instances_ corresponding to Spark components. Within each instance, you can configure a @@ -54,7 +54,7 @@ Each instance can report to zero or more _sinks_. Sinks are contained in the * `GraphiteSink`: Sends metrics to a Graphite node. The syntax of the metrics configuration file is defined in an example configuration file, -`$SPARK_HOME/conf/metrics.conf.template`. +`$SPARK_HOME/conf/metrics.properties.template`. # Advanced Instrumentation diff --git a/docs/spark-standalone.md b/docs/spark-standalone.md index 3388c14ec4d48..51fb3a4f7f8c5 100644 --- a/docs/spark-standalone.md +++ b/docs/spark-standalone.md @@ -10,7 +10,7 @@ In addition to running on the Mesos or YARN cluster managers, Spark also provide # Installing Spark Standalone to a Cluster -To install Spark Standlone mode, you simply place a compiled version of Spark on each node on the cluster. You can obtain pre-built versions of Spark with each release or [build it yourself](index.html#building). +To install Spark Standalone mode, you simply place a compiled version of Spark on each node on the cluster. You can obtain pre-built versions of Spark with each release or [build it yourself](index.html#building). # Starting a Cluster Manually From 1cad3813879cf6a968cfbf427da37fbb4f39dc86 Mon Sep 17 00:00:00 2001 From: CodingCat Date: Sun, 16 Feb 2014 12:25:38 -0800 Subject: [PATCH 09/12] [SPARK-1092] print warning information if user use SPARK_MEM to regulate executor memory usage https://spark-project.atlassian.net/browse/SPARK-1092?jql=project%20%3D%20SPARK print warning information if user set SPARK_MEM to regulate memory usage of executors ---- OUTDATED: Currently, users will usually set SPARK_MEM to control the memory usage of driver programs, (in spark-class) 91 JAVA_OPTS="$OUR_JAVA_OPTS" 92 JAVA_OPTS="$JAVA_OPTS -Djava.library.path=$SPARK_LIBRARY_PATH" 93 JAVA_OPTS="$JAVA_OPTS -Xms$SPARK_MEM -Xmx$SPARK_MEM" if they didn't set spark.executor.memory, the value in this environment variable will also affect the memory usage of executors, because the following lines in SparkContext privatespark val executorMemory = conf.getOption("spark.executor.memory") .orElse(Option(System.getenv("SPARK_MEM"))) .map(Utils.memoryStringToMb) .getOrElse(512) also since SPARK_MEM has been (proposed to) deprecated in SPARK-929 (https://spark-project.atlassian.net/browse/SPARK-929) and the corresponding PR (https://github.com/apache/incubator-spark/pull/104) we should remove this line Author: CodingCat Closes #602 from CodingCat/clean_spark_mem and squashes the following commits: 302bb28 [CodingCat] print warning information if user use SPARK_MEM to regulate executor memory usage --- core/src/main/scala/org/apache/spark/SparkContext.scala | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala b/core/src/main/scala/org/apache/spark/SparkContext.scala index 25f7a5ed1c250..5a6d06b66e85b 100644 --- a/core/src/main/scala/org/apache/spark/SparkContext.scala +++ b/core/src/main/scala/org/apache/spark/SparkContext.scala @@ -170,6 +170,11 @@ class SparkContext( .map(Utils.memoryStringToMb) .getOrElse(512) + if (!conf.contains("spark.executor.memory") && sys.env.contains("SPARK_MEM")) { + logWarning("Using SPARK_MEM to set amount of memory to use per executor process is " + + "deprecated, instead use spark.executor.memory") + } + // Environment variables to pass to our executors private[spark] val executorEnvs = HashMap[String, String]() // Note: SPARK_MEM is included for Mesos, but overwritten for standalone mode in ExecutorRunner From 73cfdcfe71c3fdd4a9c5e71c8568f25371dab9bf Mon Sep 17 00:00:00 2001 From: Bijay Bisht Date: Sun, 16 Feb 2014 16:52:57 -0800 Subject: [PATCH 10/12] fix for https://spark-project.atlassian.net/browse/SPARK-1052 Author: Bijay Bisht Closes #568 from bijaybisht/SPARK-1052 and squashes the following commits: da70395 [Bijay Bisht] fix for https://spark-project.atlassian.net/browse/SPARK-1052 - comments incorporated fdb1d94 [Bijay Bisht] fix for https://spark-project.atlassian.net/browse/SPARK-1052 (cherry picked from commit e797c1abd9692f1b7ec290e4c83d31fd106e6b05) Signed-off-by: Aaron Davidson --- .../scheduler/cluster/mesos/MesosSchedulerBackend.scala | 9 ++------- 1 file changed, 2 insertions(+), 7 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerBackend.scala index 49781485d9f96..fef291eea0257 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerBackend.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerBackend.scala @@ -130,13 +130,8 @@ private[spark] class MesosSchedulerBackend( private def createExecArg(): Array[Byte] = { if (execArgs == null) { val props = new HashMap[String, String] - val iterator = System.getProperties.entrySet.iterator - while (iterator.hasNext) { - val entry = iterator.next - val (key, value) = (entry.getKey.toString, entry.getValue.toString) - if (key.startsWith("spark.")) { - props(key) = value - } + for ((key,value) <- sc.conf.getAll) { + props(key) = value } // Serialize the map as an array of (String, String) pairs execArgs = Utils.serialize(props.toArray) From 5af4477c2b191f1ffd9814192d7017e85cf95191 Mon Sep 17 00:00:00 2001 From: Punya Biswal Date: Sun, 16 Feb 2014 18:55:59 -0800 Subject: [PATCH 11/12] Add subtractByKey to the JavaPairRDD wrapper Author: Punya Biswal Closes #600 from punya/subtractByKey-java and squashes the following commits: e961913 [Punya Biswal] Hide implicit ClassTags from Java API c5d317b [Punya Biswal] Add subtractByKey to the JavaPairRDD wrapper --- .../apache/spark/api/java/JavaPairRDD.scala | 23 +++++++++++++++++++ 1 file changed, 23 insertions(+) diff --git a/core/src/main/scala/org/apache/spark/api/java/JavaPairRDD.scala b/core/src/main/scala/org/apache/spark/api/java/JavaPairRDD.scala index 5b1bf9476e4d5..cd0aea0cb3d1f 100644 --- a/core/src/main/scala/org/apache/spark/api/java/JavaPairRDD.scala +++ b/core/src/main/scala/org/apache/spark/api/java/JavaPairRDD.scala @@ -277,6 +277,29 @@ class JavaPairRDD[K, V](val rdd: RDD[(K, V)])(implicit val kClassTag: ClassTag[K def subtract(other: JavaPairRDD[K, V], p: Partitioner): JavaPairRDD[K, V] = fromRDD(rdd.subtract(other, p)) + /** + * Return an RDD with the pairs from `this` whose keys are not in `other`. + * + * Uses `this` partitioner/partition size, because even if `other` is huge, the resulting + * RDD will be <= us. + */ + def subtractByKey[W](other: JavaPairRDD[K, W]): JavaPairRDD[K, V] = { + implicit val cmw: ClassTag[W] = implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[W]] + fromRDD(rdd.subtractByKey(other)) + } + + /** Return an RDD with the pairs from `this` whose keys are not in `other`. */ + def subtractByKey[W](other: JavaPairRDD[K, W], numPartitions: Int): JavaPairRDD[K, V] = { + implicit val cmw: ClassTag[W] = implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[W]] + fromRDD(rdd.subtractByKey(other, numPartitions)) + } + + /** Return an RDD with the pairs from `this` whose keys are not in `other`. */ + def subtractByKey[W](other: JavaPairRDD[K, W], p: Partitioner): JavaPairRDD[K, V] = { + implicit val cmw: ClassTag[W] = implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[W]] + fromRDD(rdd.subtractByKey(other, p)) + } + /** * Return a copy of the RDD partitioned using the specified partitioner. */ From c0795cf481d47425ec92f4fd0780e2e0b3fdda85 Mon Sep 17 00:00:00 2001 From: Andrew Ash Date: Mon, 17 Feb 2014 09:51:55 -0800 Subject: [PATCH 12/12] Worker registration logging fix Author: Andrew Ash Closes #608 from ash211/patch-7 and squashes the following commits: bd85f2a [Andrew Ash] Worker registration logging fix --- core/src/main/scala/org/apache/spark/deploy/master/Master.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/core/src/main/scala/org/apache/spark/deploy/master/Master.scala b/core/src/main/scala/org/apache/spark/deploy/master/Master.scala index 0bb9a9a937ff0..e44f90c1412e2 100644 --- a/core/src/main/scala/org/apache/spark/deploy/master/Master.scala +++ b/core/src/main/scala/org/apache/spark/deploy/master/Master.scala @@ -169,7 +169,7 @@ private[spark] class Master(host: String, port: Int, webUiPort: Int) extends Act case RegisterWorker(id, workerHost, workerPort, cores, memory, workerUiPort, publicAddress) => { logInfo("Registering worker %s:%d with %d cores, %s RAM".format( - host, workerPort, cores, Utils.megabytesToString(memory))) + workerHost, workerPort, cores, Utils.megabytesToString(memory))) if (state == RecoveryState.STANDBY) { // ignore, don't send response } else if (idToWorker.contains(id)) {