Skip to content

Commit

Permalink
Merge branch 'master' of github.com:apache/spark into yarn-docs
Browse files Browse the repository at this point in the history
Conflicts:
	docs/running-on-yarn.md
  • Loading branch information
andrewor14 committed May 9, 2014
2 parents 924f04c + bd67551 commit f8ca990
Show file tree
Hide file tree
Showing 22 changed files with 130 additions and 142 deletions.
19 changes: 12 additions & 7 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -39,17 +39,22 @@ And run the following command, which should also return 1000:
## Example Programs

Spark also comes with several sample programs in the `examples` directory.
To run one of them, use `./bin/run-example <class> <params>`. For example:
To run one of them, use `./bin/run-example <class> [params]`. For example:

./bin/run-example org.apache.spark.examples.SparkLR local[2]
./bin/run-example org.apache.spark.examples.SparkLR

will run the Logistic Regression example locally on 2 CPUs.
will run the Logistic Regression example locally.

Each of the example programs prints usage help if no params are given.
You can set the MASTER environment variable when running examples to submit
examples to a cluster. This can be a mesos:// or spark:// URL,
"yarn-cluster" or "yarn-client" to run on YARN, and "local" to run
locally with one thread, or "local[N]" to run locally with N threads. You
can also use an abbreviated class name if the class is in the `examples`
package. For instance:

All of the Spark samples take a `<master>` parameter that is the cluster URL
to connect to. This can be a mesos:// or spark:// URL, or "local" to run
locally with one thread, or "local[N]" to run locally with N threads.
MASTER=spark://host:7077 ./bin/run-example SparkPi

Many of the example programs print usage help if no params are given.

## Running Tests

Expand Down
2 changes: 1 addition & 1 deletion bin/pyspark
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ if [ ! -f "$FWDIR/RELEASE" ]; then
ls "$FWDIR"/assembly/target/scala-$SCALA_VERSION/spark-assembly*hadoop*.jar >& /dev/null
if [[ $? != 0 ]]; then
echo "Failed to find Spark assembly in $FWDIR/assembly/target" >&2
echo "You need to build Spark with sbt/sbt assembly before running this program" >&2
echo "You need to build Spark before running this program" >&2
exit 1
fi
fi
Expand Down
71 changes: 18 additions & 53 deletions bin/run-example
Original file line number Diff line number Diff line change
Expand Up @@ -17,28 +17,10 @@
# limitations under the License.
#

cygwin=false
case "`uname`" in
CYGWIN*) cygwin=true;;
esac

SCALA_VERSION=2.10

# Figure out where the Scala framework is installed
FWDIR="$(cd `dirname $0`/..; pwd)"

# Export this as SPARK_HOME
export SPARK_HOME="$FWDIR"

. $FWDIR/bin/load-spark-env.sh

if [ -z "$1" ]; then
echo "Usage: run-example <example-class> [<args>]" >&2
exit 1
fi

# Figure out the JAR file that our examples were packaged into. This includes a bit of a hack
# to avoid the -sources and -doc packages that are built by publish-local.
EXAMPLES_DIR="$FWDIR"/examples

if [ -f "$FWDIR/RELEASE" ]; then
Expand All @@ -49,46 +31,29 @@ fi

if [[ -z $SPARK_EXAMPLES_JAR ]]; then
echo "Failed to find Spark examples assembly in $FWDIR/lib or $FWDIR/examples/target" >&2
echo "You need to build Spark with sbt/sbt assembly before running this program" >&2
echo "You need to build Spark before running this program" >&2
exit 1
fi

EXAMPLE_MASTER=${MASTER:-"local[*]"}

# Since the examples JAR ideally shouldn't include spark-core (that dependency should be
# "provided"), also add our standard Spark classpath, built using compute-classpath.sh.
CLASSPATH=`$FWDIR/bin/compute-classpath.sh`
CLASSPATH="$SPARK_EXAMPLES_JAR:$CLASSPATH"

if $cygwin; then
CLASSPATH=`cygpath -wp $CLASSPATH`
export SPARK_EXAMPLES_JAR=`cygpath -w $SPARK_EXAMPLES_JAR`
fi

# Find java binary
if [ -n "${JAVA_HOME}" ]; then
RUNNER="${JAVA_HOME}/bin/java"
else
if [ `command -v java` ]; then
RUNNER="java"
else
echo "JAVA_HOME is not set" >&2
exit 1
fi
fi

# Set JAVA_OPTS to be able to load native libraries and to set heap size
JAVA_OPTS="$SPARK_JAVA_OPTS"
# Load extra JAVA_OPTS from conf/java-opts, if it exists
if [ -e "$FWDIR/conf/java-opts" ] ; then
JAVA_OPTS="$JAVA_OPTS `cat $FWDIR/conf/java-opts`"
if [ -n "$1" ]; then
EXAMPLE_CLASS="$1"
shift
else
echo "usage: ./bin/run-example <example-class> [example-args]"
echo " - set MASTER=XX to use a specific master"
echo " - can use abbreviated example class name (e.g. SparkPi, mllib.MovieLensALS)"
echo
exit -1
fi
export JAVA_OPTS

if [ "$SPARK_PRINT_LAUNCH_COMMAND" == "1" ]; then
echo -n "Spark Command: "
echo "$RUNNER" -cp "$CLASSPATH" $JAVA_OPTS "$@"
echo "========================================"
echo
if [[ ! $EXAMPLE_CLASS == org.apache.spark.examples* ]]; then
EXAMPLE_CLASS="org.apache.spark.examples.$EXAMPLE_CLASS"
fi

exec "$RUNNER" -cp "$CLASSPATH" $JAVA_OPTS "$@"
./bin/spark-submit \
--master $EXAMPLE_MASTER \
--class $EXAMPLE_CLASS \
$SPARK_EXAMPLES_JAR \
"$@"
2 changes: 1 addition & 1 deletion bin/spark-class
Original file line number Diff line number Diff line change
Expand Up @@ -114,7 +114,7 @@ if [ ! -f "$FWDIR/RELEASE" ]; then
jars_list=$(ls "$FWDIR"/assembly/target/scala-$SCALA_VERSION/ | grep "spark-assembly.*hadoop.*.jar")
if [ "$num_jars" -eq "0" ]; then
echo "Failed to find Spark assembly in $FWDIR/assembly/target/scala-$SCALA_VERSION/" >&2
echo "You need to build Spark with 'sbt/sbt assembly' before running this program." >&2
echo "You need to build Spark before running this program." >&2
exit 1
fi
if [ "$num_jars" -gt "1" ]; then
Expand Down
9 changes: 5 additions & 4 deletions core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala
Original file line number Diff line number Diff line change
Expand Up @@ -160,14 +160,15 @@ object SparkSubmit {
// each deploy mode; we iterate through these below
val options = List[OptionAssigner](
OptionAssigner(args.master, ALL_CLUSTER_MGRS, false, sysProp = "spark.master"),
OptionAssigner(args.name, ALL_CLUSTER_MGRS, false, sysProp = "spark.app.name"),
OptionAssigner(args.driverExtraClassPath, STANDALONE | YARN, true,
sysProp = "spark.driver.extraClassPath"),
OptionAssigner(args.driverExtraJavaOptions, STANDALONE | YARN, true,
sysProp = "spark.driver.extraJavaOptions"),
OptionAssigner(args.driverExtraLibraryPath, STANDALONE | YARN, true,
sysProp = "spark.driver.extraLibraryPath"),
OptionAssigner(args.driverMemory, YARN, true, clOption = "--driver-memory"),
OptionAssigner(args.name, YARN, true, clOption = "--name"),
OptionAssigner(args.name, YARN, true, clOption = "--name", sysProp = "spark.app.name"),
OptionAssigner(args.queue, YARN, true, clOption = "--queue"),
OptionAssigner(args.queue, YARN, false, sysProp = "spark.yarn.queue"),
OptionAssigner(args.numExecutors, YARN, true, clOption = "--num-executors"),
Expand All @@ -188,8 +189,7 @@ object SparkSubmit {
OptionAssigner(args.jars, YARN, true, clOption = "--addJars"),
OptionAssigner(args.files, LOCAL | STANDALONE | MESOS, false, sysProp = "spark.files"),
OptionAssigner(args.files, LOCAL | STANDALONE | MESOS, true, sysProp = "spark.files"),
OptionAssigner(args.jars, LOCAL | STANDALONE | MESOS, false, sysProp = "spark.jars"),
OptionAssigner(args.name, LOCAL | STANDALONE | MESOS, false, sysProp = "spark.app.name")
OptionAssigner(args.jars, LOCAL | STANDALONE | MESOS, false, sysProp = "spark.jars")
)

// For client mode make any added jars immediately visible on the classpath
Expand All @@ -205,7 +205,8 @@ object SparkSubmit {
(clusterManager & opt.clusterManager) != 0) {
if (opt.clOption != null) {
childArgs += (opt.clOption, opt.value)
} else if (opt.sysProp != null) {
}
if (opt.sysProp != null) {
sysProps.put(opt.sysProp, opt.value)
}
}
Expand Down
4 changes: 2 additions & 2 deletions core/src/main/scala/org/apache/spark/rdd/RDD.scala
Original file line number Diff line number Diff line change
Expand Up @@ -330,9 +330,9 @@ abstract class RDD[T: ClassTag](
if (shuffle) {
// include a shuffle step so that our upstream tasks are still distributed
new CoalescedRDD(
new ShuffledRDD[T, Null, (T, Null)](map(x => (x, null)),
new ShuffledRDD[Int, T, (Int, T)](map(x => (Utils.random.nextInt(), x)),
new HashPartitioner(numPartitions)),
numPartitions).keys
numPartitions).values
} else {
new CoalescedRDD(this, numPartitions)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,15 +57,12 @@ private[spark] object ShuffleMapTask {
}

def deserializeInfo(stageId: Int, bytes: Array[Byte]): (RDD[_], ShuffleDependency[_,_]) = {
synchronized {
val loader = Thread.currentThread.getContextClassLoader
val in = new GZIPInputStream(new ByteArrayInputStream(bytes))
val ser = SparkEnv.get.closureSerializer.newInstance()
val objIn = ser.deserializeStream(in)
val rdd = objIn.readObject().asInstanceOf[RDD[_]]
val dep = objIn.readObject().asInstanceOf[ShuffleDependency[_,_]]
(rdd, dep)
}
val in = new GZIPInputStream(new ByteArrayInputStream(bytes))
val ser = SparkEnv.get.closureSerializer.newInstance()
val objIn = ser.deserializeStream(in)
val rdd = objIn.readObject().asInstanceOf[RDD[_]]
val dep = objIn.readObject().asInstanceOf[ShuffleDependency[_,_]]
(rdd, dep)
}

// Since both the JarSet and FileSet have the same format this is used for both.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -250,7 +250,7 @@ class BlockManagerMasterActor(val isLocal: Boolean, conf: SparkConf, listenerBus
// Remove the block from the slave's BlockManager.
// Doesn't actually wait for a confirmation and the message might get lost.
// If message loss becomes frequent, we should add retry logic here.
blockManager.get.slaveActor ! RemoveBlock(blockId)
blockManager.get.slaveActor.ask(RemoveBlock(blockId))(akkaTimeout)
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,7 @@ class SparkSubmitSuite extends FunSuite with ShouldMatchers {
"--master", "yarn", "--executor-memory", "5g", "--executor-cores", "5",
"--class", "org.SomeClass", "--jars", "one.jar,two.jar,three.jar",
"--driver-memory", "4g", "--queue", "thequeue", "--files", "file1.txt,file2.txt",
"--archives", "archive1.txt,archive2.txt", "--num-executors", "6",
"--archives", "archive1.txt,archive2.txt", "--num-executors", "6", "--name", "beauty",
"thejar.jar", "arg1", "arg2")
val appArgs = new SparkSubmitArguments(clArgs)
val (childArgs, classpath, sysProps, mainClass) = createLaunchEnv(appArgs)
Expand All @@ -122,16 +122,17 @@ class SparkSubmitSuite extends FunSuite with ShouldMatchers {
childArgsStr should include ("--num-executors 6")
mainClass should be ("org.apache.spark.deploy.yarn.Client")
classpath should have length (0)
sysProps should have size (1)
sysProps("spark.app.name") should be ("beauty")
sysProps("SPARK_SUBMIT") should be ("true")
}

test("handles YARN client mode") {
val clArgs = Seq("--deploy-mode", "client",
"--master", "yarn", "--executor-memory", "5g", "--executor-cores", "5",
"--class", "org.SomeClass", "--jars", "one.jar,two.jar,three.jar",
"--driver-memory", "4g", "--queue", "thequeue", "--files", "file1.txt,file2.txt",
"--archives", "archive1.txt,archive2.txt", "--num-executors", "6", "thejar.jar",
"arg1", "arg2")
"--archives", "archive1.txt,archive2.txt", "--num-executors", "6", "--name", "trill",
"thejar.jar", "arg1", "arg2")
val appArgs = new SparkSubmitArguments(clArgs)
val (childArgs, classpath, sysProps, mainClass) = createLaunchEnv(appArgs)
childArgs.mkString(" ") should be ("arg1 arg2")
Expand All @@ -140,6 +141,7 @@ class SparkSubmitSuite extends FunSuite with ShouldMatchers {
classpath should contain ("one.jar")
classpath should contain ("two.jar")
classpath should contain ("three.jar")
sysProps("spark.app.name") should be ("trill")
sysProps("spark.executor.memory") should be ("5g")
sysProps("spark.executor.cores") should be ("5")
sysProps("spark.yarn.queue") should be ("thequeue")
Expand Down
2 changes: 1 addition & 1 deletion docs/building-with-maven.md
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,7 @@ Tests are run by default via the [ScalaTest Maven plugin](http://www.scalatest.o

The ScalaTest plugin also supports running only a specific test suite as follows:

$ mvn -Dhadoop.version=... -Dsuites=org.apache.spark.repl.ReplSuite test
$ mvn -Dhadoop.version=... -DwildcardSuites=org.apache.spark.repl.ReplSuite test


## Continuous Compilation ##
Expand Down
8 changes: 4 additions & 4 deletions docs/mllib-basics.md
Original file line number Diff line number Diff line change
Expand Up @@ -184,28 +184,28 @@ After loading, the feature indices are converted to zero-based.
<div class="codetabs">
<div data-lang="scala" markdown="1">

[`MLUtils.loadLibSVMData`](api/mllib/index.html#org.apache.spark.mllib.util.MLUtils$) reads training
[`MLUtils.loadLibSVMFile`](api/mllib/index.html#org.apache.spark.mllib.util.MLUtils$) reads training
examples stored in LIBSVM format.

{% highlight scala %}
import org.apache.spark.mllib.regression.LabeledPoint
import org.apache.spark.mllib.util.MLUtils
import org.apache.spark.rdd.RDD

val training: RDD[LabeledPoint] = MLUtils.loadLibSVMData(sc, "mllib/data/sample_libsvm_data.txt")
val training: RDD[LabeledPoint] = MLUtils.loadLibSVMFile(sc, "mllib/data/sample_libsvm_data.txt")
{% endhighlight %}
</div>

<div data-lang="java" markdown="1">
[`MLUtils.loadLibSVMData`](api/mllib/index.html#org.apache.spark.mllib.util.MLUtils$) reads training
[`MLUtils.loadLibSVMFile`](api/mllib/index.html#org.apache.spark.mllib.util.MLUtils$) reads training
examples stored in LIBSVM format.

{% highlight java %}
import org.apache.spark.mllib.regression.LabeledPoint;
import org.apache.spark.mllib.util.MLUtils;
import org.apache.spark.rdd.RDDimport;

RDD<LabeledPoint> training = MLUtils.loadLibSVMData(jsc, "mllib/data/sample_libsvm_data.txt");
RDD<LabeledPoint> training = MLUtils.loadLibSVMFile(jsc, "mllib/data/sample_libsvm_data.txt");
{% endhighlight %}
</div>
</div>
Expand Down
2 changes: 1 addition & 1 deletion docs/mllib-linear-methods.md
Original file line number Diff line number Diff line change
Expand Up @@ -186,7 +186,7 @@ import org.apache.spark.mllib.linalg.Vectors
import org.apache.spark.mllib.util.MLUtils

// Load training data in LIBSVM format.
val data = MLUtils.loadLibSVMData(sc, "mllib/data/sample_libsvm_data.txt")
val data = MLUtils.loadLibSVMFile(sc, "mllib/data/sample_libsvm_data.txt")

// Split data into training (60%) and test (40%).
val splits = data.randomSplit(Array(0.6, 0.4), seed = 11L)
Expand Down
2 changes: 1 addition & 1 deletion docs/running-on-yarn.md
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ For example:
--driver-memory 4g \
--executor-memory 2g \
--executor-cores 1
examples/target/scala-{{site.SCALA_BINARY_VERSION}}/spark-examples-assembly-{{site.SPARK_VERSION}}.jar \
lib/spark-examples*.jar \
10

The above starts a YARN client program which starts the default Application Master. Then SparkPi will be run as a child thread of Application Master. The client will periodically poll the Application Master for status updates and display them in the console. The client will exit once your application has finished running. Refer to the "Viewing Logs" section below for how to see driver and executor logs.
Expand Down
2 changes: 2 additions & 0 deletions make-distribution.sh
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,8 @@
#

set -o pipefail
set -e

# Figure out where the Spark framework is installed
FWDIR="$(cd `dirname $0`; pwd)"
DISTDIR="$FWDIR/dist"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -136,7 +136,11 @@ object Vectors {
new DenseVector(v.toArray) // Can't use underlying array directly, so make a new one
}
case v: BSV[Double] =>
new SparseVector(v.length, v.index, v.data)
if (v.index.length == v.used) {
new SparseVector(v.length, v.index, v.data)
} else {
new SparseVector(v.length, v.index.slice(0, v.used), v.data.slice(0, v.used))
}
case v: BV[_] =>
sys.error("Unsupported Breeze vector type: " + v.getClass.getName)
}
Expand Down
Loading

0 comments on commit f8ca990

Please sign in to comment.