Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-1774] Respect SparkSubmit --jars on YARN (client) #710

Closed
wants to merge 7 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion core/src/main/scala/org/apache/spark/SparkContext.scala
Original file line number Diff line number Diff line change
Expand Up @@ -917,7 +917,7 @@ class SparkContext(config: SparkConf) extends Logging {
if (SparkHadoopUtil.get.isYarnMode() &&
(master == "yarn-standalone" || master == "yarn-cluster")) {
// In order for this to work in yarn-cluster mode the user must specify the
// --addjars option to the client to upload the file into the distributed cache
// --addJars option to the client to upload the file into the distributed cache
// of the AM to make it show up in the current working directory.
val fileName = new Path(uri.getPath).getName()
try {
Expand Down
39 changes: 23 additions & 16 deletions core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala
Original file line number Diff line number Diff line change
Expand Up @@ -67,8 +67,7 @@ object SparkSubmit {
private[spark] def printWarning(str: String) = printStream.println("Warning: " + str)

/**
* @return
* a tuple containing the arguments for the child, a list of classpath
* @return a tuple containing the arguments for the child, a list of classpath
* entries for the child, a list of system propertes, a list of env vars
* and the main class for the child
*/
Expand Down Expand Up @@ -115,13 +114,16 @@ object SparkSubmit {
val sysProps = new HashMap[String, String]()
var childMainClass = ""

val isPython = args.isPython
val isYarnCluster = clusterManager == YARN && deployOnCluster

if (clusterManager == MESOS && deployOnCluster) {
printErrorAndExit("Cannot currently run driver on the cluster in Mesos")
}

// If we're running a Python app, set the Java class to run to be our PythonRunner, add
// Python files to deployment list, and pass the main file and Python path to PythonRunner
if (args.isPython) {
if (isPython) {
if (deployOnCluster) {
printErrorAndExit("Cannot currently run Python driver programs on cluster")
}
Expand Down Expand Up @@ -161,35 +163,35 @@ object SparkSubmit {
val options = List[OptionAssigner](
OptionAssigner(args.master, ALL_CLUSTER_MGRS, false, sysProp = "spark.master"),
OptionAssigner(args.name, ALL_CLUSTER_MGRS, false, sysProp = "spark.app.name"),
OptionAssigner(args.name, YARN, true, clOption = "--name", sysProp = "spark.app.name"),
OptionAssigner(args.driverExtraClassPath, STANDALONE | YARN, true,
sysProp = "spark.driver.extraClassPath"),
OptionAssigner(args.driverExtraJavaOptions, STANDALONE | YARN, true,
sysProp = "spark.driver.extraJavaOptions"),
OptionAssigner(args.driverExtraLibraryPath, STANDALONE | YARN, true,
sysProp = "spark.driver.extraLibraryPath"),
OptionAssigner(args.driverMemory, YARN, true, clOption = "--driver-memory"),
OptionAssigner(args.name, YARN, true, clOption = "--name", sysProp = "spark.app.name"),
OptionAssigner(args.driverMemory, STANDALONE, true, clOption = "--memory"),
OptionAssigner(args.driverCores, STANDALONE, true, clOption = "--cores"),
OptionAssigner(args.queue, YARN, true, clOption = "--queue"),
OptionAssigner(args.queue, YARN, false, sysProp = "spark.yarn.queue"),
OptionAssigner(args.numExecutors, YARN, true, clOption = "--num-executors"),
OptionAssigner(args.numExecutors, YARN, false, sysProp = "spark.executor.instances"),
OptionAssigner(args.executorMemory, YARN, true, clOption = "--executor-memory"),
OptionAssigner(args.executorMemory, STANDALONE | MESOS | YARN, false,
sysProp = "spark.executor.memory"),
OptionAssigner(args.driverMemory, STANDALONE, true, clOption = "--memory"),
OptionAssigner(args.driverCores, STANDALONE, true, clOption = "--cores"),
OptionAssigner(args.executorCores, YARN, true, clOption = "--executor-cores"),
OptionAssigner(args.executorCores, YARN, false, sysProp = "spark.executor.cores"),
OptionAssigner(args.totalExecutorCores, STANDALONE | MESOS, false,
sysProp = "spark.cores.max"),
OptionAssigner(args.files, YARN, false, sysProp = "spark.yarn.dist.files"),
OptionAssigner(args.files, YARN, true, clOption = "--files"),
OptionAssigner(args.files, LOCAL | STANDALONE | MESOS, false, sysProp = "spark.files"),
OptionAssigner(args.files, LOCAL | STANDALONE | MESOS, true, sysProp = "spark.files"),
OptionAssigner(args.archives, YARN, false, sysProp = "spark.yarn.dist.archives"),
OptionAssigner(args.archives, YARN, true, clOption = "--archives"),
OptionAssigner(args.jars, YARN, true, clOption = "--addJars"),
OptionAssigner(args.files, LOCAL | STANDALONE | MESOS, false, sysProp = "spark.files"),
OptionAssigner(args.files, LOCAL | STANDALONE | MESOS, true, sysProp = "spark.files"),
OptionAssigner(args.jars, LOCAL | STANDALONE | MESOS, false, sysProp = "spark.jars")
OptionAssigner(args.jars, ALL_CLUSTER_MGRS, false, sysProp = "spark.jars")
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is the line that makes SparkSubmit respect --jars for yarn-client mode

)

// For client mode make any added jars immediately visible on the classpath
Expand All @@ -212,21 +214,22 @@ object SparkSubmit {
}
}

// For standalone mode, add the application jar automatically so the user doesn't have to
// call sc.addJar. TODO: Standalone mode in the cluster
if (clusterManager == STANDALONE) {
// Add the application jar automatically so the user doesn't have to call sc.addJar
// For YARN cluster mode, the jar is already distributed on each node as "app.jar"
// For python files, the primary resource is already distributed as a regular file
if (!isYarnCluster && !isPython) {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is the line that adds the application jar automatically for all modes, except for yarn-cluster and python. (Previously, this was done only for standalone)

var jars = sysProps.get("spark.jars").map(x => x.split(",").toSeq).getOrElse(Seq())
if (args.primaryResource != RESERVED_JAR_NAME) {
jars = jars ++ Seq(args.primaryResource)
}
sysProps.put("spark.jars", jars.mkString(","))
}

// Standalone cluster specific configurations
if (deployOnCluster && clusterManager == STANDALONE) {
if (args.supervise) {
childArgs += "--supervise"
}

childMainClass = "org.apache.spark.deploy.Client"
childArgs += "launch"
childArgs += (args.master, args.primaryResource, args.mainClass)
Expand All @@ -243,16 +246,20 @@ object SparkSubmit {
}
}

// Read from default spark properties, if any
for ((k, v) <- args.getDefaultSparkProperties) {
if (!sysProps.contains(k)) sysProps(k) = v
}

(childArgs, childClasspath, sysProps, childMainClass)
}

private def launch(childArgs: ArrayBuffer[String], childClasspath: ArrayBuffer[String],
sysProps: Map[String, String], childMainClass: String, verbose: Boolean = false)
{
private def launch(
childArgs: ArrayBuffer[String],
childClasspath: ArrayBuffer[String],
sysProps: Map[String, String],
childMainClass: String,
verbose: Boolean = false) {
if (verbose) {
printStream.println(s"Main class:\n$childMainClass")
printStream.println(s"Arguments:\n${childArgs.mkString("\n")}")
Expand Down
110 changes: 77 additions & 33 deletions core/src/test/scala/org/apache/spark/deploy/SparkSubmitSuite.scala
Original file line number Diff line number Diff line change
Expand Up @@ -87,25 +87,41 @@ class SparkSubmitSuite extends FunSuite with ShouldMatchers {
}

test("handles arguments with --key=val") {
val clArgs = Seq("--jars=one.jar,two.jar,three.jar", "--name=myApp")
val clArgs = Seq(
"--jars=one.jar,two.jar,three.jar",
"--name=myApp")
val appArgs = new SparkSubmitArguments(clArgs)
appArgs.jars should be ("one.jar,two.jar,three.jar")
appArgs.name should be ("myApp")
}

test("handles arguments to user program") {
val clArgs = Seq("--name", "myApp", "--class", "Foo", "userjar.jar", "some", "--weird", "args")
val clArgs = Seq(
"--name", "myApp",
"--class", "Foo",
"userjar.jar",
"some",
"--weird", "args")
val appArgs = new SparkSubmitArguments(clArgs)
appArgs.childArgs should be (Seq("some", "--weird", "args"))
}

test("handles YARN cluster mode") {
val clArgs = Seq("--deploy-mode", "cluster",
"--master", "yarn", "--executor-memory", "5g", "--executor-cores", "5",
"--class", "org.SomeClass", "--jars", "one.jar,two.jar,three.jar",
"--driver-memory", "4g", "--queue", "thequeue", "--files", "file1.txt,file2.txt",
"--archives", "archive1.txt,archive2.txt", "--num-executors", "6", "--name", "beauty",
"thejar.jar", "arg1", "arg2")
val clArgs = Seq(
"--deploy-mode", "cluster",
"--master", "yarn",
"--executor-memory", "5g",
"--executor-cores", "5",
"--class", "org.SomeClass",
"--jars", "one.jar,two.jar,three.jar",
"--driver-memory", "4g",
"--queue", "thequeue",
"--files", "file1.txt,file2.txt",
"--archives", "archive1.txt,archive2.txt",
"--num-executors", "6",
"--name", "beauty",
"thejar.jar",
"arg1", "arg2")
val appArgs = new SparkSubmitArguments(clArgs)
val (childArgs, classpath, sysProps, mainClass) = createLaunchEnv(appArgs)
val childArgsStr = childArgs.mkString(" ")
Expand All @@ -127,12 +143,21 @@ class SparkSubmitSuite extends FunSuite with ShouldMatchers {
}

test("handles YARN client mode") {
val clArgs = Seq("--deploy-mode", "client",
"--master", "yarn", "--executor-memory", "5g", "--executor-cores", "5",
"--class", "org.SomeClass", "--jars", "one.jar,two.jar,three.jar",
"--driver-memory", "4g", "--queue", "thequeue", "--files", "file1.txt,file2.txt",
"--archives", "archive1.txt,archive2.txt", "--num-executors", "6", "--name", "trill",
"thejar.jar", "arg1", "arg2")
val clArgs = Seq(
"--deploy-mode", "client",
"--master", "yarn",
"--executor-memory", "5g",
"--executor-cores", "5",
"--class", "org.SomeClass",
"--jars", "one.jar,two.jar,three.jar",
"--driver-memory", "4g",
"--queue", "thequeue",
"--files", "file1.txt,file2.txt",
"--archives", "archive1.txt,archive2.txt",
"--num-executors", "6",
"--name", "trill",
"thejar.jar",
"arg1", "arg2")
val appArgs = new SparkSubmitArguments(clArgs)
val (childArgs, classpath, sysProps, mainClass) = createLaunchEnv(appArgs)
childArgs.mkString(" ") should be ("arg1 arg2")
Expand All @@ -142,6 +167,7 @@ class SparkSubmitSuite extends FunSuite with ShouldMatchers {
classpath should contain ("two.jar")
classpath should contain ("three.jar")
sysProps("spark.app.name") should be ("trill")
sysProps("spark.jars") should be ("one.jar,two.jar,three.jar,thejar.jar")
sysProps("spark.executor.memory") should be ("5g")
sysProps("spark.executor.cores") should be ("5")
sysProps("spark.yarn.queue") should be ("thequeue")
Expand All @@ -152,9 +178,15 @@ class SparkSubmitSuite extends FunSuite with ShouldMatchers {
}

test("handles standalone cluster mode") {
val clArgs = Seq("--deploy-mode", "cluster",
"--master", "spark://h:p", "--class", "org.SomeClass",
"--supervise", "--driver-memory", "4g", "--driver-cores", "5", "thejar.jar", "arg1", "arg2")
val clArgs = Seq(
"--deploy-mode", "cluster",
"--master", "spark://h:p",
"--class", "org.SomeClass",
"--supervise",
"--driver-memory", "4g",
"--driver-cores", "5",
"thejar.jar",
"arg1", "arg2")
val appArgs = new SparkSubmitArguments(clArgs)
val (childArgs, classpath, sysProps, mainClass) = createLaunchEnv(appArgs)
val childArgsStr = childArgs.mkString(" ")
Expand All @@ -166,9 +198,15 @@ class SparkSubmitSuite extends FunSuite with ShouldMatchers {
}

test("handles standalone client mode") {
val clArgs = Seq("--deploy-mode", "client",
"--master", "spark://h:p", "--executor-memory", "5g", "--total-executor-cores", "5",
"--class", "org.SomeClass", "--driver-memory", "4g", "thejar.jar", "arg1", "arg2")
val clArgs = Seq(
"--deploy-mode", "client",
"--master", "spark://h:p",
"--executor-memory", "5g",
"--total-executor-cores", "5",
"--class", "org.SomeClass",
"--driver-memory", "4g",
"thejar.jar",
"arg1", "arg2")
val appArgs = new SparkSubmitArguments(clArgs)
val (childArgs, classpath, sysProps, mainClass) = createLaunchEnv(appArgs)
childArgs.mkString(" ") should be ("arg1 arg2")
Expand All @@ -179,9 +217,15 @@ class SparkSubmitSuite extends FunSuite with ShouldMatchers {
}

test("handles mesos client mode") {
val clArgs = Seq("--deploy-mode", "client",
"--master", "mesos://h:p", "--executor-memory", "5g", "--total-executor-cores", "5",
"--class", "org.SomeClass", "--driver-memory", "4g", "thejar.jar", "arg1", "arg2")
val clArgs = Seq(
"--deploy-mode", "client",
"--master", "mesos://h:p",
"--executor-memory", "5g",
"--total-executor-cores", "5",
"--class", "org.SomeClass",
"--driver-memory", "4g",
"thejar.jar",
"arg1", "arg2")
val appArgs = new SparkSubmitArguments(clArgs)
val (childArgs, classpath, sysProps, mainClass) = createLaunchEnv(appArgs)
childArgs.mkString(" ") should be ("arg1 arg2")
Expand All @@ -192,15 +236,17 @@ class SparkSubmitSuite extends FunSuite with ShouldMatchers {
}

test("launch simple application with spark-submit") {
runSparkSubmit(
Seq(
"--class", SimpleApplicationTest.getClass.getName.stripSuffix("$"),
"--name", "testApp",
"--master", "local",
"unUsed.jar"))
val unusedJar = TestUtils.createJarWithClasses(Seq.empty)
val args = Seq(
"--class", SimpleApplicationTest.getClass.getName.stripSuffix("$"),
"--name", "testApp",
"--master", "local",
unusedJar.toString)
runSparkSubmit(args)
}

test("spark submit includes jars passed in through --jar") {
val unusedJar = TestUtils.createJarWithClasses(Seq.empty)
val jar1 = TestUtils.createJarWithClasses(Seq("SparkSubmitClassA"))
val jar2 = TestUtils.createJarWithClasses(Seq("SparkSubmitClassB"))
val jarsString = Seq(jar1, jar2).map(j => j.toString).mkString(",")
Expand All @@ -209,7 +255,7 @@ class SparkSubmitSuite extends FunSuite with ShouldMatchers {
"--name", "testApp",
"--master", "local-cluster[2,1,512]",
"--jars", jarsString,
"unused.jar")
unusedJar.toString)
runSparkSubmit(args)
}

Expand All @@ -227,7 +273,7 @@ object JarCreationTest {
def main(args: Array[String]) {
val conf = new SparkConf()
val sc = new SparkContext(conf)
val result = sc.makeRDD(1 to 100, 10).mapPartitions{ x =>
val result = sc.makeRDD(1 to 100, 10).mapPartitions { x =>
var foundClasses = false
try {
Class.forName("SparkSubmitClassA", true, Thread.currentThread().getContextClassLoader)
Expand All @@ -248,7 +294,6 @@ object SimpleApplicationTest {
def main(args: Array[String]) {
val conf = new SparkConf()
val sc = new SparkContext(conf)

val configs = Seq("spark.master", "spark.app.name")
for (config <- configs) {
val masterValue = conf.get(config)
Expand All @@ -266,6 +311,5 @@ object SimpleApplicationTest {
s"Master had $config=$masterValue but executor had $config=$executorValue")
}
}

}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ package org.apache.spark.deploy.yarn
import scala.collection.mutable.{ArrayBuffer, HashMap}

import org.apache.spark.SparkConf
import org.apache.spark.scheduler.{InputFormatInfo, SplitInfo}
import org.apache.spark.scheduler.InputFormatInfo
import org.apache.spark.util.IntParam
import org.apache.spark.util.MemoryParam

Expand All @@ -40,9 +40,7 @@ class ClientArguments(val args: Array[String], val sparkConf: SparkConf) {
var amMemory: Int = 512 // MB
var amClass: String = "org.apache.spark.deploy.yarn.ApplicationMaster"
var appName: String = "Spark"
// TODO
var inputFormatInfo: List[InputFormatInfo] = null
// TODO(harvey)
var priority = 0

parseArgs(args.toList)
Expand Down