Skip to content

Commit

Permalink
Merge git://git.apache.org/incubator-spark into persist-ui
Browse files Browse the repository at this point in the history
  • Loading branch information
andrewor14 committed Feb 17, 2014
2 parents b3976b0 + c0795cf commit 4dfcd22
Show file tree
Hide file tree
Showing 14 changed files with 112 additions and 31 deletions.
2 changes: 1 addition & 1 deletion conf/metrics.properties.template
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@
# period 10 Poll period
# unit seconds Units of poll period
# ttl 1 TTL of messages sent by Ganglia
# mode multicast Ganglia network mode ('unicast' or 'mulitcast')
# mode multicast Ganglia network mode ('unicast' or 'multicast')

# org.apache.spark.metrics.sink.JmxSink

Expand Down
5 changes: 5 additions & 0 deletions core/src/main/scala/org/apache/spark/SparkContext.scala
Original file line number Diff line number Diff line change
Expand Up @@ -173,6 +173,11 @@ class SparkContext(
.map(Utils.memoryStringToMb)
.getOrElse(512)

if (!conf.contains("spark.executor.memory") && sys.env.contains("SPARK_MEM")) {
logWarning("Using SPARK_MEM to set amount of memory to use per executor process is " +
"deprecated, instead use spark.executor.memory")
}

// Environment variables to pass to our executors
private[spark] val executorEnvs = HashMap[String, String]()
// Note: SPARK_MEM is included for Mesos, but overwritten for standalone mode in ExecutorRunner
Expand Down
23 changes: 23 additions & 0 deletions core/src/main/scala/org/apache/spark/api/java/JavaPairRDD.scala
Original file line number Diff line number Diff line change
Expand Up @@ -277,6 +277,29 @@ class JavaPairRDD[K, V](val rdd: RDD[(K, V)])(implicit val kClassTag: ClassTag[K
def subtract(other: JavaPairRDD[K, V], p: Partitioner): JavaPairRDD[K, V] =
fromRDD(rdd.subtract(other, p))

/**
* Return an RDD with the pairs from `this` whose keys are not in `other`.
*
* Uses `this` partitioner/partition size, because even if `other` is huge, the resulting
* RDD will be <= us.
*/
def subtractByKey[W](other: JavaPairRDD[K, W]): JavaPairRDD[K, V] = {
implicit val cmw: ClassTag[W] = implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[W]]
fromRDD(rdd.subtractByKey(other))
}

/** Return an RDD with the pairs from `this` whose keys are not in `other`. */
def subtractByKey[W](other: JavaPairRDD[K, W], numPartitions: Int): JavaPairRDD[K, V] = {
implicit val cmw: ClassTag[W] = implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[W]]
fromRDD(rdd.subtractByKey(other, numPartitions))
}

/** Return an RDD with the pairs from `this` whose keys are not in `other`. */
def subtractByKey[W](other: JavaPairRDD[K, W], p: Partitioner): JavaPairRDD[K, V] = {
implicit val cmw: ClassTag[W] = implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[W]]
fromRDD(rdd.subtractByKey(other, p))
}

/**
* Return a copy of the RDD partitioned using the specified partitioner.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -169,7 +169,7 @@ private[spark] class Master(host: String, port: Int, webUiPort: Int) extends Act
case RegisterWorker(id, workerHost, workerPort, cores, memory, workerUiPort, publicAddress) =>
{
logInfo("Registering worker %s:%d with %d cores, %s RAM".format(
host, workerPort, cores, Utils.megabytesToString(memory)))
workerHost, workerPort, cores, Utils.megabytesToString(memory)))
if (state == RecoveryState.STANDBY) {
// ignore, don't send response
} else if (idToWorker.contains(id)) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ class PartitionwiseSampledRDDPartition(val prev: Partition, val seed: Long)
class PartitionwiseSampledRDD[T: ClassTag, U: ClassTag](
prev: RDD[T],
sampler: RandomSampler[T, U],
seed: Long = System.nanoTime)
@transient seed: Long = System.nanoTime)
extends RDD[U](prev) {

override def getPartitions: Array[Partition] = {
Expand Down
2 changes: 1 addition & 1 deletion core/src/main/scala/org/apache/spark/rdd/RDD.scala
Original file line number Diff line number Diff line change
Expand Up @@ -873,7 +873,7 @@ abstract class RDD[T: ClassTag](
* won't trigger a spark job, which is different from [[org.apache.spark.rdd.RDD#zipWithIndex]].
*/
def zipWithUniqueId(): RDD[(T, Long)] = {
val n = this.partitions.size
val n = this.partitions.size.toLong
this.mapPartitionsWithIndex { case (k, iter) =>
iter.zipWithIndex.map { case (item, i) =>
(item, i * n + k)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ class ZippedWithIndexRDDPartition(val prev: Partition, val startIndex: Long)
* @tparam T parent RDD item type
*/
private[spark]
class ZippedWithIndexRDD[T: ClassTag](prev: RDD[T]) extends RDD[(T, Long)](prev) {
class ZippedWithIndexRDD[T: ClassTag](@transient prev: RDD[T]) extends RDD[(T, Long)](prev) {

override def getPartitions: Array[Partition] = {
val n = prev.partitions.size
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -130,13 +130,8 @@ private[spark] class MesosSchedulerBackend(
private def createExecArg(): Array[Byte] = {
if (execArgs == null) {
val props = new HashMap[String, String]
val iterator = System.getProperties.entrySet.iterator
while (iterator.hasNext) {
val entry = iterator.next
val (key, value) = (entry.getKey.toString, entry.getValue.toString)
if (key.startsWith("spark.")) {
props(key) = value
}
for ((key,value) <- sc.conf.getAll) {
props(key) = value
}
// Serialize the map as an array of (String, String) pairs
execArgs = Utils.serialize(props.toArray)
Expand Down
19 changes: 10 additions & 9 deletions dev/merge_spark_pr.py
Original file line number Diff line number Diff line change
Expand Up @@ -96,19 +96,20 @@ def merge_pr(pr_num, target_ref):
commits = run_cmd(['git', 'log', 'HEAD..%s' % pr_branch_name,
'--pretty=format:%h [%an] %s']).split("\n\n")

merge_message = "Merge pull request #%s from %s.\n\n%s\n\n%s" % (
pr_num, pr_repo_desc, title, body)
merge_message_parts = merge_message.split("\n\n")
merge_message_flags = []

for p in merge_message_parts:
merge_message_flags = merge_message_flags + ["-m", p]
for p in [title, body]:
merge_message_flags += ["-m", p]

authors = "\n".join(["Author: %s" % a for a in distinct_authors])
merge_message_flags = merge_message_flags + ["-m", authors]
merge_message_flags = merge_message_flags + [
"-m", "Closes #%s and squashes the following commits:" % pr_num]

merge_message_flags += ["-m", authors]

# The string "Closes #%s" string is required for GitHub to correctly close the PR
merge_message_flags += ["-m",
"Closes #%s from %s and squashes the following commits:" % (pr_num, pr_repo_desc)]
for c in commits:
merge_message_flags = merge_message_flags + ["-m", c]
merge_message_flags += ["-m", c]

run_cmd(['git', 'commit', '--author="%s"' % primary_author] + merge_message_flags)

Expand Down
46 changes: 46 additions & 0 deletions dev/run-tests
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
#!/usr/bin/env bash

#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#

# Go to the Spark project root directory
FWDIR="$(cd `dirname $0`/..; pwd)"
cd $FWDIR

# Remove work directory
rm -rf ./work

# Fail fast
set -e

echo "========================================================================="
echo "Running Scala style checks"
echo "========================================================================="
sbt/sbt clean scalastyle

echo "========================================================================="
echo "Running Spark unit tests"
echo "========================================================================="
sbt/sbt assembly test

echo "========================================================================="
echo "Running PySpark tests"
echo "========================================================================="
if [ -z "$PYSPARK_PYTHON" ]; then
export PYSPARK_PYTHON=/usr/local/bin/python2.7
fi
./python/run-tests
6 changes: 3 additions & 3 deletions docs/monitoring.md
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ You can access this interface by simply opening `http://<driver-node>:4040` in a
If multiple SparkContexts are running on the same host, they will bind to succesive ports
beginning with 4040 (4041, 4042, etc).

Spark's Standlone Mode cluster manager also has its own
Spark's Standalone Mode cluster manager also has its own
[web UI](spark-standalone.html#monitoring-and-logging).

Note that in both of these UIs, the tables are sortable by clicking their headers,
Expand All @@ -31,7 +31,7 @@ Spark has a configurable metrics system based on the
[Coda Hale Metrics Library](http://metrics.codahale.com/).
This allows users to report Spark metrics to a variety of sinks including HTTP, JMX, and CSV
files. The metrics system is configured via a configuration file that Spark expects to be present
at `$SPARK_HOME/conf/metrics.conf`. A custom file location can be specified via the
at `$SPARK_HOME/conf/metrics.properties`. A custom file location can be specified via the
`spark.metrics.conf` [configuration property](configuration.html#spark-properties).
Spark's metrics are decoupled into different
_instances_ corresponding to Spark components. Within each instance, you can configure a
Expand All @@ -54,7 +54,7 @@ Each instance can report to zero or more _sinks_. Sinks are contained in the
* `GraphiteSink`: Sends metrics to a Graphite node.

The syntax of the metrics configuration file is defined in an example configuration file,
`$SPARK_HOME/conf/metrics.conf.template`.
`$SPARK_HOME/conf/metrics.properties.template`.

# Advanced Instrumentation

Expand Down
2 changes: 1 addition & 1 deletion docs/spark-standalone.md
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ In addition to running on the Mesos or YARN cluster managers, Spark also provide

# Installing Spark Standalone to a Cluster

To install Spark Standlone mode, you simply place a compiled version of Spark on each node on the cluster. You can obtain pre-built versions of Spark with each release or [build it yourself](index.html#building).
To install Spark Standalone mode, you simply place a compiled version of Spark on each node on the cluster. You can obtain pre-built versions of Spark with each release or [build it yourself](index.html#building).

# Starting a Cluster Manually

Expand Down
18 changes: 14 additions & 4 deletions ec2/spark_ec2.py
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ def parse_args():
"slaves across multiple (an additional $0.01/Gb for bandwidth" +
"between zones applies)")
parser.add_option("-a", "--ami", help="Amazon Machine Image ID to use")
parser.add_option("-v", "--spark-version", default="0.8.0",
parser.add_option("-v", "--spark-version", default="0.9.0",
help="Version of Spark to use: 'X.Y.Z' or a specific git hash")
parser.add_option("--spark-git-repo",
default="https://github.com/apache/incubator-spark",
Expand Down Expand Up @@ -157,7 +157,7 @@ def is_active(instance):

# Return correct versions of Spark and Shark, given the supplied Spark version
def get_spark_shark_version(opts):
spark_shark_map = {"0.7.3": "0.7.1", "0.8.0": "0.8.0"}
spark_shark_map = {"0.7.3": "0.7.1", "0.8.0": "0.8.0", "0.8.1": "0.8.1", "0.9.0": "0.9.0"}
version = opts.spark_version.replace("v", "")
if version not in spark_shark_map:
print >> stderr, "Don't know about Spark version: %s" % version
Expand Down Expand Up @@ -189,7 +189,12 @@ def get_spark_ami(opts):
"i2.xlarge": "hvm",
"i2.2xlarge": "hvm",
"i2.4xlarge": "hvm",
"i2.8xlarge": "hvm"
"i2.8xlarge": "hvm",
"c3.large": "pvm",
"c3.xlarge": "pvm",
"c3.2xlarge": "pvm",
"c3.4xlarge": "pvm",
"c3.8xlarge": "pvm"
}
if opts.instance_type in instance_types:
instance_type = instance_types[opts.instance_type]
Expand Down Expand Up @@ -486,7 +491,12 @@ def get_num_disks(instance_type):
"i2.xlarge": 1,
"i2.2xlarge": 2,
"i2.4xlarge": 4,
"i2.8xlarge": 8
"i2.8xlarge": 8,
"c3.large": 2,
"c3.xlarge": 2,
"c3.2xlarge": 2,
"c3.4xlarge": 2,
"c3.8xlarge": 2
}
if instance_type in disks_by_instance:
return disks_by_instance[instance_type]
Expand Down
5 changes: 3 additions & 2 deletions project/SparkBuild.scala
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,7 @@ object SparkBuild extends Build {
case None => DEFAULT_YARN
case Some(v) => v.toBoolean
}
lazy val hadoopClient = if (hadoopVersion.startsWith("0.20.") || hadoopVersion == "1.0.0") "hadoop-core" else "hadoop-client"

// Conditionally include the yarn sub-project
lazy val yarnAlpha = Project("yarn-alpha", file("yarn/alpha"), settings = yarnAlphaSettings) dependsOn(core)
Expand Down Expand Up @@ -269,7 +270,7 @@ object SparkBuild extends Build {
"org.apache.mesos" % "mesos" % "0.13.0",
"net.java.dev.jets3t" % "jets3t" % "0.7.1",
"org.apache.derby" % "derby" % "10.4.2.0" % "test",
"org.apache.hadoop" % "hadoop-client" % hadoopVersion excludeAll(excludeJackson, excludeNetty, excludeAsm, excludeCglib),
"org.apache.hadoop" % hadoopClient % hadoopVersion excludeAll(excludeJackson, excludeNetty, excludeAsm, excludeCglib),
"org.apache.avro" % "avro" % "1.7.4",
"org.apache.avro" % "avro-ipc" % "1.7.4" excludeAll(excludeNetty),
"org.apache.zookeeper" % "zookeeper" % "3.4.5" excludeAll(excludeNetty),
Expand Down Expand Up @@ -373,7 +374,7 @@ object SparkBuild extends Build {
def yarnEnabledSettings = Seq(
libraryDependencies ++= Seq(
// Exclude rule required for all ?
"org.apache.hadoop" % "hadoop-client" % hadoopVersion excludeAll(excludeJackson, excludeNetty, excludeAsm, excludeCglib),
"org.apache.hadoop" % hadoopClient % hadoopVersion excludeAll(excludeJackson, excludeNetty, excludeAsm, excludeCglib),
"org.apache.hadoop" % "hadoop-yarn-api" % hadoopVersion excludeAll(excludeJackson, excludeNetty, excludeAsm, excludeCglib),
"org.apache.hadoop" % "hadoop-yarn-common" % hadoopVersion excludeAll(excludeJackson, excludeNetty, excludeAsm, excludeCglib),
"org.apache.hadoop" % "hadoop-yarn-client" % hadoopVersion excludeAll(excludeJackson, excludeNetty, excludeAsm, excludeCglib)
Expand Down

0 comments on commit 4dfcd22

Please sign in to comment.