diff --git a/README.md b/README.md index e2d1dcb5672ff..9c2e32b90f162 100644 --- a/README.md +++ b/README.md @@ -39,17 +39,22 @@ And run the following command, which should also return 1000: ## Example Programs Spark also comes with several sample programs in the `examples` directory. -To run one of them, use `./bin/run-example `. For example: +To run one of them, use `./bin/run-example [params]`. For example: - ./bin/run-example org.apache.spark.examples.SparkLR local[2] + ./bin/run-example org.apache.spark.examples.SparkLR -will run the Logistic Regression example locally on 2 CPUs. +will run the Logistic Regression example locally. -Each of the example programs prints usage help if no params are given. +You can set the MASTER environment variable when running examples to submit +examples to a cluster. This can be a mesos:// or spark:// URL, +"yarn-cluster" or "yarn-client" to run on YARN, and "local" to run +locally with one thread, or "local[N]" to run locally with N threads. You +can also use an abbreviated class name if the class is in the `examples` +package. For instance: -All of the Spark samples take a `` parameter that is the cluster URL -to connect to. This can be a mesos:// or spark:// URL, or "local" to run -locally with one thread, or "local[N]" to run locally with N threads. + MASTER=spark://host:7077 ./bin/run-example SparkPi + +Many of the example programs print usage help if no params are given. ## Running Tests diff --git a/bin/pyspark b/bin/pyspark index f5558853e8a4e..10e35e0f1734e 100755 --- a/bin/pyspark +++ b/bin/pyspark @@ -31,7 +31,7 @@ if [ ! -f "$FWDIR/RELEASE" ]; then ls "$FWDIR"/assembly/target/scala-$SCALA_VERSION/spark-assembly*hadoop*.jar >& /dev/null if [[ $? != 0 ]]; then echo "Failed to find Spark assembly in $FWDIR/assembly/target" >&2 - echo "You need to build Spark with sbt/sbt assembly before running this program" >&2 + echo "You need to build Spark before running this program" >&2 exit 1 fi fi diff --git a/bin/run-example b/bin/run-example index d8a94f2e31e07..146951ac0ee56 100755 --- a/bin/run-example +++ b/bin/run-example @@ -17,28 +17,10 @@ # limitations under the License. # -cygwin=false -case "`uname`" in - CYGWIN*) cygwin=true;; -esac - SCALA_VERSION=2.10 -# Figure out where the Scala framework is installed FWDIR="$(cd `dirname $0`/..; pwd)" - -# Export this as SPARK_HOME export SPARK_HOME="$FWDIR" - -. $FWDIR/bin/load-spark-env.sh - -if [ -z "$1" ]; then - echo "Usage: run-example []" >&2 - exit 1 -fi - -# Figure out the JAR file that our examples were packaged into. This includes a bit of a hack -# to avoid the -sources and -doc packages that are built by publish-local. EXAMPLES_DIR="$FWDIR"/examples if [ -f "$FWDIR/RELEASE" ]; then @@ -49,46 +31,29 @@ fi if [[ -z $SPARK_EXAMPLES_JAR ]]; then echo "Failed to find Spark examples assembly in $FWDIR/lib or $FWDIR/examples/target" >&2 - echo "You need to build Spark with sbt/sbt assembly before running this program" >&2 + echo "You need to build Spark before running this program" >&2 exit 1 fi +EXAMPLE_MASTER=${MASTER:-"local[*]"} -# Since the examples JAR ideally shouldn't include spark-core (that dependency should be -# "provided"), also add our standard Spark classpath, built using compute-classpath.sh. -CLASSPATH=`$FWDIR/bin/compute-classpath.sh` -CLASSPATH="$SPARK_EXAMPLES_JAR:$CLASSPATH" - -if $cygwin; then - CLASSPATH=`cygpath -wp $CLASSPATH` - export SPARK_EXAMPLES_JAR=`cygpath -w $SPARK_EXAMPLES_JAR` -fi - -# Find java binary -if [ -n "${JAVA_HOME}" ]; then - RUNNER="${JAVA_HOME}/bin/java" -else - if [ `command -v java` ]; then - RUNNER="java" - else - echo "JAVA_HOME is not set" >&2 - exit 1 - fi -fi - -# Set JAVA_OPTS to be able to load native libraries and to set heap size -JAVA_OPTS="$SPARK_JAVA_OPTS" -# Load extra JAVA_OPTS from conf/java-opts, if it exists -if [ -e "$FWDIR/conf/java-opts" ] ; then - JAVA_OPTS="$JAVA_OPTS `cat $FWDIR/conf/java-opts`" +if [ -n "$1" ]; then + EXAMPLE_CLASS="$1" + shift +else + echo "usage: ./bin/run-example [example-args]" + echo " - set MASTER=XX to use a specific master" + echo " - can use abbreviated example class name (e.g. SparkPi, mllib.MovieLensALS)" + echo + exit -1 fi -export JAVA_OPTS -if [ "$SPARK_PRINT_LAUNCH_COMMAND" == "1" ]; then - echo -n "Spark Command: " - echo "$RUNNER" -cp "$CLASSPATH" $JAVA_OPTS "$@" - echo "========================================" - echo +if [[ ! $EXAMPLE_CLASS == org.apache.spark.examples* ]]; then + EXAMPLE_CLASS="org.apache.spark.examples.$EXAMPLE_CLASS" fi -exec "$RUNNER" -cp "$CLASSPATH" $JAVA_OPTS "$@" +./bin/spark-submit \ + --master $EXAMPLE_MASTER \ + --class $EXAMPLE_CLASS \ + $SPARK_EXAMPLES_JAR \ + "$@" diff --git a/bin/spark-class b/bin/spark-class index 72f8b9bf9a495..6480ccb58d6aa 100755 --- a/bin/spark-class +++ b/bin/spark-class @@ -114,7 +114,7 @@ if [ ! -f "$FWDIR/RELEASE" ]; then jars_list=$(ls "$FWDIR"/assembly/target/scala-$SCALA_VERSION/ | grep "spark-assembly.*hadoop.*.jar") if [ "$num_jars" -eq "0" ]; then echo "Failed to find Spark assembly in $FWDIR/assembly/target/scala-$SCALA_VERSION/" >&2 - echo "You need to build Spark with 'sbt/sbt assembly' before running this program." >&2 + echo "You need to build Spark before running this program." >&2 exit 1 fi if [ "$num_jars" -gt "1" ]; then diff --git a/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala b/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala index e39723f38347c..16de6f7cdb100 100644 --- a/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala +++ b/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala @@ -160,6 +160,7 @@ object SparkSubmit { // each deploy mode; we iterate through these below val options = List[OptionAssigner]( OptionAssigner(args.master, ALL_CLUSTER_MGRS, false, sysProp = "spark.master"), + OptionAssigner(args.name, ALL_CLUSTER_MGRS, false, sysProp = "spark.app.name"), OptionAssigner(args.driverExtraClassPath, STANDALONE | YARN, true, sysProp = "spark.driver.extraClassPath"), OptionAssigner(args.driverExtraJavaOptions, STANDALONE | YARN, true, @@ -167,7 +168,7 @@ object SparkSubmit { OptionAssigner(args.driverExtraLibraryPath, STANDALONE | YARN, true, sysProp = "spark.driver.extraLibraryPath"), OptionAssigner(args.driverMemory, YARN, true, clOption = "--driver-memory"), - OptionAssigner(args.name, YARN, true, clOption = "--name"), + OptionAssigner(args.name, YARN, true, clOption = "--name", sysProp = "spark.app.name"), OptionAssigner(args.queue, YARN, true, clOption = "--queue"), OptionAssigner(args.queue, YARN, false, sysProp = "spark.yarn.queue"), OptionAssigner(args.numExecutors, YARN, true, clOption = "--num-executors"), @@ -188,8 +189,7 @@ object SparkSubmit { OptionAssigner(args.jars, YARN, true, clOption = "--addJars"), OptionAssigner(args.files, LOCAL | STANDALONE | MESOS, false, sysProp = "spark.files"), OptionAssigner(args.files, LOCAL | STANDALONE | MESOS, true, sysProp = "spark.files"), - OptionAssigner(args.jars, LOCAL | STANDALONE | MESOS, false, sysProp = "spark.jars"), - OptionAssigner(args.name, LOCAL | STANDALONE | MESOS, false, sysProp = "spark.app.name") + OptionAssigner(args.jars, LOCAL | STANDALONE | MESOS, false, sysProp = "spark.jars") ) // For client mode make any added jars immediately visible on the classpath @@ -205,7 +205,8 @@ object SparkSubmit { (clusterManager & opt.clusterManager) != 0) { if (opt.clOption != null) { childArgs += (opt.clOption, opt.value) - } else if (opt.sysProp != null) { + } + if (opt.sysProp != null) { sysProps.put(opt.sysProp, opt.value) } } diff --git a/core/src/main/scala/org/apache/spark/rdd/RDD.scala b/core/src/main/scala/org/apache/spark/rdd/RDD.scala index a1ca612cc9a09..9d8d8044f07eb 100644 --- a/core/src/main/scala/org/apache/spark/rdd/RDD.scala +++ b/core/src/main/scala/org/apache/spark/rdd/RDD.scala @@ -330,9 +330,9 @@ abstract class RDD[T: ClassTag]( if (shuffle) { // include a shuffle step so that our upstream tasks are still distributed new CoalescedRDD( - new ShuffledRDD[T, Null, (T, Null)](map(x => (x, null)), + new ShuffledRDD[Int, T, (Int, T)](map(x => (Utils.random.nextInt(), x)), new HashPartitioner(numPartitions)), - numPartitions).keys + numPartitions).values } else { new CoalescedRDD(this, numPartitions) } diff --git a/core/src/main/scala/org/apache/spark/scheduler/ShuffleMapTask.scala b/core/src/main/scala/org/apache/spark/scheduler/ShuffleMapTask.scala index 4b0324f2b5447..ed0f56f1abdf5 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/ShuffleMapTask.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/ShuffleMapTask.scala @@ -57,15 +57,12 @@ private[spark] object ShuffleMapTask { } def deserializeInfo(stageId: Int, bytes: Array[Byte]): (RDD[_], ShuffleDependency[_,_]) = { - synchronized { - val loader = Thread.currentThread.getContextClassLoader - val in = new GZIPInputStream(new ByteArrayInputStream(bytes)) - val ser = SparkEnv.get.closureSerializer.newInstance() - val objIn = ser.deserializeStream(in) - val rdd = objIn.readObject().asInstanceOf[RDD[_]] - val dep = objIn.readObject().asInstanceOf[ShuffleDependency[_,_]] - (rdd, dep) - } + val in = new GZIPInputStream(new ByteArrayInputStream(bytes)) + val ser = SparkEnv.get.closureSerializer.newInstance() + val objIn = ser.deserializeStream(in) + val rdd = objIn.readObject().asInstanceOf[RDD[_]] + val dep = objIn.readObject().asInstanceOf[ShuffleDependency[_,_]] + (rdd, dep) } // Since both the JarSet and FileSet have the same format this is used for both. diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManagerMasterActor.scala b/core/src/main/scala/org/apache/spark/storage/BlockManagerMasterActor.scala index 98fa0df6ec289..6aed322eeb185 100644 --- a/core/src/main/scala/org/apache/spark/storage/BlockManagerMasterActor.scala +++ b/core/src/main/scala/org/apache/spark/storage/BlockManagerMasterActor.scala @@ -250,7 +250,7 @@ class BlockManagerMasterActor(val isLocal: Boolean, conf: SparkConf, listenerBus // Remove the block from the slave's BlockManager. // Doesn't actually wait for a confirmation and the message might get lost. // If message loss becomes frequent, we should add retry logic here. - blockManager.get.slaveActor ! RemoveBlock(blockId) + blockManager.get.slaveActor.ask(RemoveBlock(blockId))(akkaTimeout) } } } diff --git a/core/src/test/scala/org/apache/spark/deploy/SparkSubmitSuite.scala b/core/src/test/scala/org/apache/spark/deploy/SparkSubmitSuite.scala index d7e3b22ed476e..c9edb03cdeb0f 100644 --- a/core/src/test/scala/org/apache/spark/deploy/SparkSubmitSuite.scala +++ b/core/src/test/scala/org/apache/spark/deploy/SparkSubmitSuite.scala @@ -104,7 +104,7 @@ class SparkSubmitSuite extends FunSuite with ShouldMatchers { "--master", "yarn", "--executor-memory", "5g", "--executor-cores", "5", "--class", "org.SomeClass", "--jars", "one.jar,two.jar,three.jar", "--driver-memory", "4g", "--queue", "thequeue", "--files", "file1.txt,file2.txt", - "--archives", "archive1.txt,archive2.txt", "--num-executors", "6", + "--archives", "archive1.txt,archive2.txt", "--num-executors", "6", "--name", "beauty", "thejar.jar", "arg1", "arg2") val appArgs = new SparkSubmitArguments(clArgs) val (childArgs, classpath, sysProps, mainClass) = createLaunchEnv(appArgs) @@ -122,7 +122,8 @@ class SparkSubmitSuite extends FunSuite with ShouldMatchers { childArgsStr should include ("--num-executors 6") mainClass should be ("org.apache.spark.deploy.yarn.Client") classpath should have length (0) - sysProps should have size (1) + sysProps("spark.app.name") should be ("beauty") + sysProps("SPARK_SUBMIT") should be ("true") } test("handles YARN client mode") { @@ -130,8 +131,8 @@ class SparkSubmitSuite extends FunSuite with ShouldMatchers { "--master", "yarn", "--executor-memory", "5g", "--executor-cores", "5", "--class", "org.SomeClass", "--jars", "one.jar,two.jar,three.jar", "--driver-memory", "4g", "--queue", "thequeue", "--files", "file1.txt,file2.txt", - "--archives", "archive1.txt,archive2.txt", "--num-executors", "6", "thejar.jar", - "arg1", "arg2") + "--archives", "archive1.txt,archive2.txt", "--num-executors", "6", "--name", "trill", + "thejar.jar", "arg1", "arg2") val appArgs = new SparkSubmitArguments(clArgs) val (childArgs, classpath, sysProps, mainClass) = createLaunchEnv(appArgs) childArgs.mkString(" ") should be ("arg1 arg2") @@ -140,6 +141,7 @@ class SparkSubmitSuite extends FunSuite with ShouldMatchers { classpath should contain ("one.jar") classpath should contain ("two.jar") classpath should contain ("three.jar") + sysProps("spark.app.name") should be ("trill") sysProps("spark.executor.memory") should be ("5g") sysProps("spark.executor.cores") should be ("5") sysProps("spark.yarn.queue") should be ("thequeue") diff --git a/docs/building-with-maven.md b/docs/building-with-maven.md index cac01ded60d94..b6dd553bbe06b 100644 --- a/docs/building-with-maven.md +++ b/docs/building-with-maven.md @@ -96,7 +96,7 @@ Tests are run by default via the [ScalaTest Maven plugin](http://www.scalatest.o The ScalaTest plugin also supports running only a specific test suite as follows: - $ mvn -Dhadoop.version=... -Dsuites=org.apache.spark.repl.ReplSuite test + $ mvn -Dhadoop.version=... -DwildcardSuites=org.apache.spark.repl.ReplSuite test ## Continuous Compilation ## diff --git a/docs/running-on-yarn.md b/docs/running-on-yarn.md index 68183ee8b4613..c563594296802 100644 --- a/docs/running-on-yarn.md +++ b/docs/running-on-yarn.md @@ -53,7 +53,7 @@ For example: --driver-memory 4g \ --executor-memory 2g \ --executor-cores 1 - examples/target/scala-{{site.SCALA_BINARY_VERSION}}/spark-examples-assembly-{{site.SPARK_VERSION}}.jar \ + lib/spark-examples*.jar \ yarn-cluster 5 The above starts a YARN client program which starts the default Application Master. Then SparkPi will be run as a child thread of Application Master. The client will periodically poll the Application Master for status updates and display them in the console. The client will exit once your application has finished running. Refer to the "Viewing Logs" section below for how to see driver and executor logs. diff --git a/make-distribution.sh b/make-distribution.sh index 759e555b4b69a..1cc2844703fbb 100755 --- a/make-distribution.sh +++ b/make-distribution.sh @@ -40,6 +40,8 @@ # set -o pipefail +set -e + # Figure out where the Spark framework is installed FWDIR="$(cd `dirname $0`; pwd)" DISTDIR="$FWDIR/dist" diff --git a/sbin/spark-executor b/sbin/spark-executor index de5bfab563125..336549f29c9ce 100755 --- a/sbin/spark-executor +++ b/sbin/spark-executor @@ -19,5 +19,8 @@ FWDIR="$(cd `dirname $0`/..; pwd)" +export PYTHONPATH=$FWDIR/python:$PYTHONPATH +export PYTHONPATH=$FWDIR/python/lib/py4j-0.8.1-src.zip:$PYTHONPATH + echo "Running spark-executor with framework dir = $FWDIR" exec $FWDIR/bin/spark-class org.apache.spark.executor.MesosExecutorBackend diff --git a/yarn/common/src/main/scala/org/apache/spark/scheduler/cluster/YarnClientSchedulerBackend.scala b/yarn/common/src/main/scala/org/apache/spark/scheduler/cluster/YarnClientSchedulerBackend.scala index ce2dde0631ed9..2924189077b7d 100644 --- a/yarn/common/src/main/scala/org/apache/spark/scheduler/cluster/YarnClientSchedulerBackend.scala +++ b/yarn/common/src/main/scala/org/apache/spark/scheduler/cluster/YarnClientSchedulerBackend.scala @@ -35,10 +35,10 @@ private[spark] class YarnClientSchedulerBackend( private[spark] def addArg(optionName: String, envVar: String, sysProp: String, arrayBuf: ArrayBuffer[String]) { - if (System.getProperty(sysProp) != null) { - arrayBuf += (optionName, System.getProperty(sysProp)) - } else if (System.getenv(envVar) != null) { + if (System.getenv(envVar) != null) { arrayBuf += (optionName, System.getenv(envVar)) + } else if (sc.getConf.contains(sysProp)) { + arrayBuf += (optionName, sc.getConf.get(sysProp)) } }