From 3bb75e8a46191ba0c8c18875426857d5ddef3445 Mon Sep 17 00:00:00 2001 From: Andrew Or Date: Thu, 8 May 2014 21:25:55 -0700 Subject: [PATCH 1/5] Allow SparkSubmit --jars to take effect in yarn-client mode --- .../org/apache/spark/deploy/SparkSubmit.scala | 15 ++++++--------- 1 file changed, 6 insertions(+), 9 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala b/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala index e39723f38347c..08a851b0f8d3a 100644 --- a/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala +++ b/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala @@ -188,7 +188,7 @@ object SparkSubmit { OptionAssigner(args.jars, YARN, true, clOption = "--addJars"), OptionAssigner(args.files, LOCAL | STANDALONE | MESOS, false, sysProp = "spark.files"), OptionAssigner(args.files, LOCAL | STANDALONE | MESOS, true, sysProp = "spark.files"), - OptionAssigner(args.jars, LOCAL | STANDALONE | MESOS, false, sysProp = "spark.jars"), + OptionAssigner(args.jars, ALL_CLUSTER_MGRS, false, sysProp = "spark.jars"), OptionAssigner(args.name, LOCAL | STANDALONE | MESOS, false, sysProp = "spark.app.name") ) @@ -211,15 +211,12 @@ object SparkSubmit { } } - // For standalone mode, add the application jar automatically so the user doesn't have to - // call sc.addJar. TODO: Standalone mode in the cluster - if (clusterManager == STANDALONE) { - var jars = sysProps.get("spark.jars").map(x => x.split(",").toSeq).getOrElse(Seq()) - if (args.primaryResource != RESERVED_JAR_NAME) { - jars = jars ++ Seq(args.primaryResource) - } - sysProps.put("spark.jars", jars.mkString(",")) + // Add the application jar automatically so the user doesn't have to call sc.addJar + var jars = sysProps.get("spark.jars").map(x => x.split(",").toSeq).getOrElse(Seq()) + if (args.primaryResource != RESERVED_JAR_NAME) { + jars = jars ++ Seq(args.primaryResource) } + sysProps.put("spark.jars", jars.mkString(",")) if (deployOnCluster && clusterManager == STANDALONE) { if (args.supervise) { From 013d8405dfb6774e1969dfec0cf9441563a1d409 Mon Sep 17 00:00:00 2001 From: Andrew Or Date: Thu, 8 May 2014 21:55:26 -0700 Subject: [PATCH 2/5] Fix tests --- .../apache/spark/deploy/SparkSubmitSuite.scala | 18 +++++++++++------- 1 file changed, 11 insertions(+), 7 deletions(-) diff --git a/core/src/test/scala/org/apache/spark/deploy/SparkSubmitSuite.scala b/core/src/test/scala/org/apache/spark/deploy/SparkSubmitSuite.scala index c9edb03cdeb0f..73a89f4acb0ed 100644 --- a/core/src/test/scala/org/apache/spark/deploy/SparkSubmitSuite.scala +++ b/core/src/test/scala/org/apache/spark/deploy/SparkSubmitSuite.scala @@ -123,6 +123,7 @@ class SparkSubmitSuite extends FunSuite with ShouldMatchers { mainClass should be ("org.apache.spark.deploy.yarn.Client") classpath should have length (0) sysProps("spark.app.name") should be ("beauty") + sysProps("spark.jars") should be ("thejar.jar") sysProps("SPARK_SUBMIT") should be ("true") } @@ -142,6 +143,7 @@ class SparkSubmitSuite extends FunSuite with ShouldMatchers { classpath should contain ("two.jar") classpath should contain ("three.jar") sysProps("spark.app.name") should be ("trill") + sysProps("spark.jars") should be ("one.jar,two.jar,three.jar,thejar.jar") sysProps("spark.executor.memory") should be ("5g") sysProps("spark.executor.cores") should be ("5") sysProps("spark.yarn.queue") should be ("thequeue") @@ -192,15 +194,17 @@ class SparkSubmitSuite extends FunSuite with ShouldMatchers { } test("launch simple application with spark-submit") { - runSparkSubmit( - Seq( - "--class", SimpleApplicationTest.getClass.getName.stripSuffix("$"), - "--name", "testApp", - "--master", "local", - "unUsed.jar")) + val unusedJar = TestUtils.createJarWithClasses(Seq.empty) + val args = Seq( + "--class", SimpleApplicationTest.getClass.getName.stripSuffix("$"), + "--name", "testApp", + "--master", "local", + unusedJar.toString) + runSparkSubmit(args) } test("spark submit includes jars passed in through --jar") { + val unusedJar = TestUtils.createJarWithClasses(Seq.empty) val jar1 = TestUtils.createJarWithClasses(Seq("SparkSubmitClassA")) val jar2 = TestUtils.createJarWithClasses(Seq("SparkSubmitClassB")) val jarsString = Seq(jar1, jar2).map(j => j.toString).mkString(",") @@ -209,7 +213,7 @@ class SparkSubmitSuite extends FunSuite with ShouldMatchers { "--name", "testApp", "--master", "local-cluster[2,1,512]", "--jars", jarsString, - "unused.jar") + unusedJar.toString) runSparkSubmit(args) } From 269f9f397d821abd337982f0af7f90844695b5bf Mon Sep 17 00:00:00 2001 From: Andrew Or Date: Thu, 8 May 2014 22:06:32 -0700 Subject: [PATCH 3/5] Fix format --- .../org/apache/spark/deploy/SparkSubmit.scala | 12 ++- .../spark/deploy/SparkSubmitSuite.scala | 92 +++++++++++++------ 2 files changed, 73 insertions(+), 31 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala b/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala index ef6c50f01b3c7..a0623527f85be 100644 --- a/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala +++ b/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala @@ -67,8 +67,7 @@ object SparkSubmit { private[spark] def printWarning(str: String) = printStream.println("Warning: " + str) /** - * @return - * a tuple containing the arguments for the child, a list of classpath + * @return a tuple containing the arguments for the child, a list of classpath * entries for the child, a list of system propertes, a list of env vars * and the main class for the child */ @@ -248,9 +247,12 @@ object SparkSubmit { (childArgs, childClasspath, sysProps, childMainClass) } - private def launch(childArgs: ArrayBuffer[String], childClasspath: ArrayBuffer[String], - sysProps: Map[String, String], childMainClass: String, verbose: Boolean = false) - { + private def launch( + childArgs: ArrayBuffer[String], + childClasspath: ArrayBuffer[String], + sysProps: Map[String, String], + childMainClass: String, + verbose: Boolean = false) { if (verbose) { printStream.println(s"Main class:\n$childMainClass") printStream.println(s"Arguments:\n${childArgs.mkString("\n")}") diff --git a/core/src/test/scala/org/apache/spark/deploy/SparkSubmitSuite.scala b/core/src/test/scala/org/apache/spark/deploy/SparkSubmitSuite.scala index 73a89f4acb0ed..498f7ce41338f 100644 --- a/core/src/test/scala/org/apache/spark/deploy/SparkSubmitSuite.scala +++ b/core/src/test/scala/org/apache/spark/deploy/SparkSubmitSuite.scala @@ -87,25 +87,40 @@ class SparkSubmitSuite extends FunSuite with ShouldMatchers { } test("handles arguments with --key=val") { - val clArgs = Seq("--jars=one.jar,two.jar,three.jar", "--name=myApp") + val clArgs = Seq( + "--jars=one.jar,two.jar,three.jar", + "--name=myApp") val appArgs = new SparkSubmitArguments(clArgs) appArgs.jars should be ("one.jar,two.jar,three.jar") appArgs.name should be ("myApp") } test("handles arguments to user program") { - val clArgs = Seq("--name", "myApp", "--class", "Foo", "userjar.jar", "some", "--weird", "args") + val clArgs = Seq( + "--name", "myApp", + "--class", "Foo", + "userjar.jar", "some", + "--weird", "args") val appArgs = new SparkSubmitArguments(clArgs) appArgs.childArgs should be (Seq("some", "--weird", "args")) } test("handles YARN cluster mode") { - val clArgs = Seq("--deploy-mode", "cluster", - "--master", "yarn", "--executor-memory", "5g", "--executor-cores", "5", - "--class", "org.SomeClass", "--jars", "one.jar,two.jar,three.jar", - "--driver-memory", "4g", "--queue", "thequeue", "--files", "file1.txt,file2.txt", - "--archives", "archive1.txt,archive2.txt", "--num-executors", "6", "--name", "beauty", - "thejar.jar", "arg1", "arg2") + val clArgs = Seq( + "--deploy-mode", "cluster", + "--master", "yarn", + "--executor-memory", "5g", + "--executor-cores", "5", + "--class", "org.SomeClass", + "--jars", "one.jar,two.jar,three.jar", + "--driver-memory", "4g", + "--queue", "thequeue", + "--files", "file1.txt,file2.txt", + "--archives", "archive1.txt,archive2.txt", + "--num-executors", "6", + "--name", "beauty", + "thejar.jar", + "arg1", "arg2") val appArgs = new SparkSubmitArguments(clArgs) val (childArgs, classpath, sysProps, mainClass) = createLaunchEnv(appArgs) val childArgsStr = childArgs.mkString(" ") @@ -128,12 +143,21 @@ class SparkSubmitSuite extends FunSuite with ShouldMatchers { } test("handles YARN client mode") { - val clArgs = Seq("--deploy-mode", "client", - "--master", "yarn", "--executor-memory", "5g", "--executor-cores", "5", - "--class", "org.SomeClass", "--jars", "one.jar,two.jar,three.jar", - "--driver-memory", "4g", "--queue", "thequeue", "--files", "file1.txt,file2.txt", - "--archives", "archive1.txt,archive2.txt", "--num-executors", "6", "--name", "trill", - "thejar.jar", "arg1", "arg2") + val clArgs = Seq( + "--deploy-mode", "client", + "--master", "yarn", + "--executor-memory", "5g", + "--executor-cores", "5", + "--class", "org.SomeClass", + "--jars", "one.jar,two.jar,three.jar", + "--driver-memory", "4g", + "--queue", "thequeue", + "--files", "file1.txt,file2.txt", + "--archives", "archive1.txt,archive2.txt", + "--num-executors", "6", + "--name", "trill", + "thejar.jar", + "arg1", "arg2") val appArgs = new SparkSubmitArguments(clArgs) val (childArgs, classpath, sysProps, mainClass) = createLaunchEnv(appArgs) childArgs.mkString(" ") should be ("arg1 arg2") @@ -154,9 +178,15 @@ class SparkSubmitSuite extends FunSuite with ShouldMatchers { } test("handles standalone cluster mode") { - val clArgs = Seq("--deploy-mode", "cluster", - "--master", "spark://h:p", "--class", "org.SomeClass", - "--supervise", "--driver-memory", "4g", "--driver-cores", "5", "thejar.jar", "arg1", "arg2") + val clArgs = Seq( + "--deploy-mode", "cluster", + "--master", "spark://h:p", + "--class", "org.SomeClass", + "--supervise", + "--driver-memory", "4g", + "--driver-cores", "5", + "thejar.jar", + "arg1", "arg2") val appArgs = new SparkSubmitArguments(clArgs) val (childArgs, classpath, sysProps, mainClass) = createLaunchEnv(appArgs) val childArgsStr = childArgs.mkString(" ") @@ -168,9 +198,15 @@ class SparkSubmitSuite extends FunSuite with ShouldMatchers { } test("handles standalone client mode") { - val clArgs = Seq("--deploy-mode", "client", - "--master", "spark://h:p", "--executor-memory", "5g", "--total-executor-cores", "5", - "--class", "org.SomeClass", "--driver-memory", "4g", "thejar.jar", "arg1", "arg2") + val clArgs = Seq( + "--deploy-mode", "client", + "--master", "spark://h:p", + "--executor-memory", "5g", + "--total-executor-cores", "5", + "--class", "org.SomeClass", + "--driver-memory", "4g", + "thejar.jar", + "arg1", "arg2") val appArgs = new SparkSubmitArguments(clArgs) val (childArgs, classpath, sysProps, mainClass) = createLaunchEnv(appArgs) childArgs.mkString(" ") should be ("arg1 arg2") @@ -181,9 +217,15 @@ class SparkSubmitSuite extends FunSuite with ShouldMatchers { } test("handles mesos client mode") { - val clArgs = Seq("--deploy-mode", "client", - "--master", "mesos://h:p", "--executor-memory", "5g", "--total-executor-cores", "5", - "--class", "org.SomeClass", "--driver-memory", "4g", "thejar.jar", "arg1", "arg2") + val clArgs = Seq( + "--deploy-mode", "client", + "--master", "mesos://h:p", + "--executor-memory", "5g", + "--total-executor-cores", "5", + "--class", "org.SomeClass", + "--driver-memory", "4g", + "thejar.jar", + "arg1", "arg2") val appArgs = new SparkSubmitArguments(clArgs) val (childArgs, classpath, sysProps, mainClass) = createLaunchEnv(appArgs) childArgs.mkString(" ") should be ("arg1 arg2") @@ -231,7 +273,7 @@ object JarCreationTest { def main(args: Array[String]) { val conf = new SparkConf() val sc = new SparkContext(conf) - val result = sc.makeRDD(1 to 100, 10).mapPartitions{ x => + val result = sc.makeRDD(1 to 100, 10).mapPartitions { x => var foundClasses = false try { Class.forName("SparkSubmitClassA", true, Thread.currentThread().getContextClassLoader) @@ -252,7 +294,6 @@ object SimpleApplicationTest { def main(args: Array[String]) { val conf = new SparkConf() val sc = new SparkContext(conf) - val configs = Seq("spark.master", "spark.app.name") for (config <- configs) { val masterValue = conf.get(config) @@ -270,6 +311,5 @@ object SimpleApplicationTest { s"Master had $config=$masterValue but executor had $config=$executorValue") } } - } } From c92c5bf7b12e7da4a138c6bcc4cc7e05cd87e8e1 Mon Sep 17 00:00:00 2001 From: Andrew Or Date: Fri, 9 May 2014 10:43:45 -0700 Subject: [PATCH 4/5] Minor cleanups --- core/src/main/scala/org/apache/spark/SparkContext.scala | 2 +- .../scala/org/apache/spark/deploy/yarn/ClientArguments.scala | 4 +--- 2 files changed, 2 insertions(+), 4 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala b/core/src/main/scala/org/apache/spark/SparkContext.scala index 9d7c2c8d3d630..0f94e2a998932 100644 --- a/core/src/main/scala/org/apache/spark/SparkContext.scala +++ b/core/src/main/scala/org/apache/spark/SparkContext.scala @@ -917,7 +917,7 @@ class SparkContext(config: SparkConf) extends Logging { if (SparkHadoopUtil.get.isYarnMode() && (master == "yarn-standalone" || master == "yarn-cluster")) { // In order for this to work in yarn-cluster mode the user must specify the - // --addjars option to the client to upload the file into the distributed cache + // --addJars option to the client to upload the file into the distributed cache // of the AM to make it show up in the current working directory. val fileName = new Path(uri.getPath).getName() try { diff --git a/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ClientArguments.scala b/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ClientArguments.scala index 3e4c739e34fe9..b2c413b6d267c 100644 --- a/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ClientArguments.scala +++ b/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ClientArguments.scala @@ -20,7 +20,7 @@ package org.apache.spark.deploy.yarn import scala.collection.mutable.{ArrayBuffer, HashMap} import org.apache.spark.SparkConf -import org.apache.spark.scheduler.{InputFormatInfo, SplitInfo} +import org.apache.spark.scheduler.InputFormatInfo import org.apache.spark.util.IntParam import org.apache.spark.util.MemoryParam @@ -40,9 +40,7 @@ class ClientArguments(val args: Array[String], val sparkConf: SparkConf) { var amMemory: Int = 512 // MB var amClass: String = "org.apache.spark.deploy.yarn.ApplicationMaster" var appName: String = "Spark" - // TODO var inputFormatInfo: List[InputFormatInfo] = null - // TODO(harvey) var priority = 0 parseArgs(args.toList) From c27bf6ce83637d3a034ccc15803e19a844ae33bc Mon Sep 17 00:00:00 2001 From: Andrew Or Date: Fri, 9 May 2014 11:44:51 -0700 Subject: [PATCH 5/5] For yarn-cluster and python, do not add primaryResource to spark.jar These scenarios already distribute the primary resource properly. See code comments for more detail. --- .../org/apache/spark/deploy/SparkSubmit.scala | 17 ++++++++++++----- .../apache/spark/deploy/SparkSubmitSuite.scala | 4 ++-- 2 files changed, 14 insertions(+), 7 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala b/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala index a0623527f85be..c6d3cbd2e728b 100644 --- a/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala +++ b/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala @@ -114,13 +114,16 @@ object SparkSubmit { val sysProps = new HashMap[String, String]() var childMainClass = "" + val isPython = args.isPython + val isYarnCluster = clusterManager == YARN && deployOnCluster + if (clusterManager == MESOS && deployOnCluster) { printErrorAndExit("Cannot currently run driver on the cluster in Mesos") } // If we're running a Python app, set the Java class to run to be our PythonRunner, add // Python files to deployment list, and pass the main file and Python path to PythonRunner - if (args.isPython) { + if (isPython) { if (deployOnCluster) { printErrorAndExit("Cannot currently run Python driver programs on cluster") } @@ -212,11 +215,15 @@ object SparkSubmit { } // Add the application jar automatically so the user doesn't have to call sc.addJar - var jars = sysProps.get("spark.jars").map(x => x.split(",").toSeq).getOrElse(Seq()) - if (args.primaryResource != RESERVED_JAR_NAME) { - jars = jars ++ Seq(args.primaryResource) + // For YARN cluster mode, the jar is already distributed on each node as "app.jar" + // For python files, the primary resource is already distributed as a regular file + if (!isYarnCluster && !isPython) { + var jars = sysProps.get("spark.jars").map(x => x.split(",").toSeq).getOrElse(Seq()) + if (args.primaryResource != RESERVED_JAR_NAME) { + jars = jars ++ Seq(args.primaryResource) + } + sysProps.put("spark.jars", jars.mkString(",")) } - sysProps.put("spark.jars", jars.mkString(",")) // Standalone cluster specific configurations if (deployOnCluster && clusterManager == STANDALONE) { diff --git a/core/src/test/scala/org/apache/spark/deploy/SparkSubmitSuite.scala b/core/src/test/scala/org/apache/spark/deploy/SparkSubmitSuite.scala index 498f7ce41338f..6c0deede53784 100644 --- a/core/src/test/scala/org/apache/spark/deploy/SparkSubmitSuite.scala +++ b/core/src/test/scala/org/apache/spark/deploy/SparkSubmitSuite.scala @@ -99,7 +99,8 @@ class SparkSubmitSuite extends FunSuite with ShouldMatchers { val clArgs = Seq( "--name", "myApp", "--class", "Foo", - "userjar.jar", "some", + "userjar.jar", + "some", "--weird", "args") val appArgs = new SparkSubmitArguments(clArgs) appArgs.childArgs should be (Seq("some", "--weird", "args")) @@ -138,7 +139,6 @@ class SparkSubmitSuite extends FunSuite with ShouldMatchers { mainClass should be ("org.apache.spark.deploy.yarn.Client") classpath should have length (0) sysProps("spark.app.name") should be ("beauty") - sysProps("spark.jars") should be ("thejar.jar") sysProps("SPARK_SUBMIT") should be ("true") }