Skip to content


[SPARK-2423] Clean up SparkSubmit for readability
Browse files Browse the repository at this point in the history
It is currently non-trivial to trace through how different combinations of cluster managers (e.g. yarn) and deploy modes (e.g. cluster) are processed in SparkSubmit. Moving forward, it will be easier to extend SparkSubmit if we first re-organize the code by grouping related logic together.

This is a precursor to fixing standalone-cluster mode, which is currently broken (SPARK-2260).

Author: Andrew Or <[email protected]>

Closes apache#1349 from andrewor14/submit-cleanup and squashes the following commits:

8f99200 [Andrew Or] script -> program (minor)
30f2e65 [Andrew Or] Merge branch 'master' of into submit-cleanup
fe484a1 [Andrew Or] Move deploy mode checks after yarn code
7167824 [Andrew Or] Re-order config options and update comments
0b01ff8 [Andrew Or] Clean up SparkSubmit for readability
  • Loading branch information
andrewor14 authored and conviva-zz committed Sep 4, 2014
1 parent 3d44eac commit fcb1881
Showing 1 changed file with 145 additions and 144 deletions.
289 changes: 145 additions & 144 deletions core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala
Original file line number Diff line number Diff line change
Expand Up @@ -27,25 +27,39 @@ import org.apache.spark.executor.ExecutorURLClassLoader
import org.apache.spark.util.Utils

* Scala code behind the spark-submit script. The script handles setting up the classpath with
* relevant Spark dependencies and provides a layer over the different cluster managers and deploy
* modes that Spark supports.
* Main gateway of launching a Spark application.
* This program handles setting up the classpath with relevant Spark dependencies and provides
* a layer over the different cluster managers and deploy modes that Spark supports.
object SparkSubmit {

// Cluster managers
private val YARN = 1
private val STANDALONE = 2
private val MESOS = 4
private val LOCAL = 8

private var clusterManager: Int = LOCAL
// Deploy modes
private val CLIENT = 1
private val CLUSTER = 2

* Special primary resource names that represent shells rather than application jars.
// Special primary resource names that represent shells rather than application jars.
private val SPARK_SHELL = "spark-shell"
private val PYSPARK_SHELL = "pyspark-shell"

// Exposed for testing
private[spark] var exitFn: () => Unit = () => System.exit(-1)
private[spark] var printStream: PrintStream = System.err
private[spark] def printWarning(str: String) = printStream.println("Warning: " + str)
private[spark] def printErrorAndExit(str: String) = {
printStream.println("Error: " + str)
printStream.println("Run with --help for usage help or --verbose for debug output")

def main(args: Array[String]) {
val appArgs = new SparkSubmitArguments(args)
if (appArgs.verbose) {
Expand All @@ -55,88 +69,80 @@ object SparkSubmit {
launch(childArgs, classpath, sysProps, mainClass, appArgs.verbose)

// Exposed for testing
private[spark] var printStream: PrintStream = System.err
private[spark] var exitFn: () => Unit = () => System.exit(-1)

private[spark] def printErrorAndExit(str: String) = {
printStream.println("Error: " + str)
printStream.println("Run with --help for usage help or --verbose for debug output")
private[spark] def printWarning(str: String) = printStream.println("Warning: " + str)

* @return a tuple containing the arguments for the child, a list of classpath
* entries for the child, a list of system properties, a list of env vars
* and the main class for the child
* @return a tuple containing
* (1) the arguments for the child process,
* (2) a list of classpath entries for the child,
* (3) a list of system properties and env vars, and
* (4) the main class for the child
private[spark] def createLaunchEnv(args: SparkSubmitArguments)
: (ArrayBuffer[String], ArrayBuffer[String], Map[String, String], String) = {
if (args.master.startsWith("local")) {
clusterManager = LOCAL
} else if (args.master.startsWith("yarn")) {
clusterManager = YARN
} else if (args.master.startsWith("spark")) {
clusterManager = STANDALONE
} else if (args.master.startsWith("mesos")) {
clusterManager = MESOS
} else {
printErrorAndExit("Master must start with yarn, mesos, spark, or local")

// Because "yarn-cluster" and "yarn-client" encapsulate both the master
// and deploy mode, we have some logic to infer the master and deploy mode
// from each other if only one is specified, or exit early if they are at odds.
if (args.deployMode == null &&
(args.master == "yarn-standalone" || args.master == "yarn-cluster")) {
args.deployMode = "cluster"
if (args.deployMode == "cluster" && args.master == "yarn-client") {
printErrorAndExit("Deploy mode \"cluster\" and master \"yarn-client\" are not compatible")
if (args.deployMode == "client" &&
(args.master == "yarn-standalone" || args.master == "yarn-cluster")) {
printErrorAndExit("Deploy mode \"client\" and master \"" + args.master
+ "\" are not compatible")
if (args.deployMode == "cluster" && args.master.startsWith("yarn")) {
args.master = "yarn-cluster"
if (args.deployMode != "cluster" && args.master.startsWith("yarn")) {
args.master = "yarn-client"

val deployOnCluster = Option(args.deployMode).getOrElse("client") == "cluster"

val childClasspath = new ArrayBuffer[String]()
// Values to return
val childArgs = new ArrayBuffer[String]()
val childClasspath = new ArrayBuffer[String]()
val sysProps = new HashMap[String, String]()
var childMainClass = ""

val isPython = args.isPython
val isYarnCluster = clusterManager == YARN && deployOnCluster
// Set the cluster manager
val clusterManager: Int = args.master match {
case m if m.startsWith("yarn") => YARN
case m if m.startsWith("spark") => STANDALONE
case m if m.startsWith("mesos") => MESOS
case m if m.startsWith("local") => LOCAL
case _ => printErrorAndExit("Master must start with yarn, spark, mesos, or local"); -1

// For mesos, only client mode is supported
if (clusterManager == MESOS && deployOnCluster) {
printErrorAndExit("Cluster deploy mode is currently not supported for Mesos clusters.")
// Set the deploy mode; default is client mode
var deployMode: Int = args.deployMode match {
case "client" | null => CLIENT
case "cluster" => CLUSTER
case _ => printErrorAndExit("Deploy mode must be either client or cluster"); -1

// For standalone, only client mode is supported
if (clusterManager == STANDALONE && deployOnCluster) {
printErrorAndExit("Cluster deploy mode is currently not supported for standalone clusters.")
// Because "yarn-cluster" and "yarn-client" encapsulate both the master
// and deploy mode, we have some logic to infer the master and deploy mode
// from each other if only one is specified, or exit early if they are at odds.
if (clusterManager == YARN) {
if (args.master == "yarn-standalone") {
printWarning("\"yarn-standalone\" is deprecated. Use \"yarn-cluster\" instead.")
args.master = "yarn-cluster"
(args.master, args.deployMode) match {
case ("yarn-cluster", null) =>
deployMode = CLUSTER
case ("yarn-cluster", "client") =>
printErrorAndExit("Client deploy mode is not compatible with master \"yarn-cluster\"")
case ("yarn-client", "cluster") =>
printErrorAndExit("Cluster deploy mode is not compatible with master \"yarn-client\"")
case (_, mode) =>
args.master = "yarn-" + Option(mode).getOrElse("client")

// Make sure YARN is included in our build if we're trying to use it
if (!Utils.classIsLoadable("org.apache.spark.deploy.yarn.Client") && !Utils.isTesting) {
"Could not load YARN classes. " +
"This copy of Spark may not have been compiled with YARN support.")

// For shells, only client mode is applicable
if (isShell(args.primaryResource) && deployOnCluster) {
printErrorAndExit("Cluster deploy mode is not applicable to Spark shells.")
// The following modes are not supported or applicable
(clusterManager, deployMode) match {
case (MESOS, CLUSTER) =>
printErrorAndExit("Cluster deploy mode is currently not supported for Mesos clusters.")
printErrorAndExit("Cluster deploy mode is currently not supported for Standalone clusters.")
case (_, CLUSTER) if args.isPython =>
printErrorAndExit("Cluster deploy mode is currently not supported for python applications.")
case (_, CLUSTER) if isShell(args.primaryResource) =>
printErrorAndExit("Cluster deploy mode is not applicable to Spark shells.")
case _ =>

// If we're running a python app, set the main class to our specific python runner
if (isPython) {
if (deployOnCluster) {
printErrorAndExit("Cluster deploy mode is currently not supported for python.")
if (args.isPython) {
if (args.primaryResource == PYSPARK_SHELL) {
args.mainClass = "py4j.GatewayServer"
args.childArgs = ArrayBuffer("--die-on-broken-pipe", "0")
Expand All @@ -152,120 +158,115 @@ object SparkSubmit {
sysProps("spark.submit.pyFiles") = PythonRunner.formatPaths(args.pyFiles).mkString(",")

// If we're deploying into YARN, use yarn.Client as a wrapper around the user class
if (!deployOnCluster) {
childMainClass = args.mainClass
if (isUserJar(args.primaryResource)) {
childClasspath += args.primaryResource
} else if (clusterManager == YARN) {
childMainClass = "org.apache.spark.deploy.yarn.Client"
childArgs += ("--jar", args.primaryResource)
childArgs += ("--class", args.mainClass)

// Make sure YARN is included in our build if we're trying to use it
if (clusterManager == YARN) {
if (!Utils.classIsLoadable("org.apache.spark.deploy.yarn.Client") && !Utils.isTesting) {
printErrorAndExit("Could not load YARN classes. " +
"This copy of Spark may not have been compiled with YARN support.")

// Special flag to avoid deprecation warnings at the client
sysProps("SPARK_SUBMIT") = "true"

// A list of rules to map each argument to system properties or command-line options in
// each deploy mode; we iterate through these below
val options = List[OptionAssigner](
OptionAssigner(args.master, ALL_CLUSTER_MGRS, false, sysProp = "spark.master"),
OptionAssigner(, ALL_CLUSTER_MGRS, false, sysProp = ""),
OptionAssigner(, YARN, true, clOption = "--name", sysProp = ""),
OptionAssigner(args.driverExtraClassPath, STANDALONE | YARN, true,

// All cluster managers
OptionAssigner(args.master, ALL_CLUSTER_MGRS, CLIENT, sysProp = "spark.master"),
OptionAssigner(, ALL_CLUSTER_MGRS, CLIENT, sysProp = ""),
OptionAssigner(args.jars, ALL_CLUSTER_MGRS, CLIENT, sysProp = "spark.jars"),

// Standalone cluster only
OptionAssigner(args.driverMemory, STANDALONE, CLUSTER, clOption = "--memory"),
OptionAssigner(args.driverCores, STANDALONE, CLUSTER, clOption = "--cores"),

// Yarn client only
OptionAssigner(args.queue, YARN, CLIENT, sysProp = "spark.yarn.queue"),
OptionAssigner(args.numExecutors, YARN, CLIENT, sysProp = "spark.executor.instances"),
OptionAssigner(args.executorCores, YARN, CLIENT, sysProp = "spark.executor.cores"),
OptionAssigner(args.files, YARN, CLIENT, sysProp = "spark.yarn.dist.files"),
OptionAssigner(args.archives, YARN, CLIENT, sysProp = "spark.yarn.dist.archives"),

// Yarn cluster only
OptionAssigner(, YARN, CLUSTER, clOption = "--name", sysProp = ""),
OptionAssigner(args.driverMemory, YARN, CLUSTER, clOption = "--driver-memory"),
OptionAssigner(args.queue, YARN, CLUSTER, clOption = "--queue"),
OptionAssigner(args.numExecutors, YARN, CLUSTER, clOption = "--num-executors"),
OptionAssigner(args.executorMemory, YARN, CLUSTER, clOption = "--executor-memory"),
OptionAssigner(args.executorCores, YARN, CLUSTER, clOption = "--executor-cores"),
OptionAssigner(args.files, YARN, CLUSTER, clOption = "--files"),
OptionAssigner(args.archives, YARN, CLUSTER, clOption = "--archives"),
OptionAssigner(args.jars, YARN, CLUSTER, clOption = "--addJars"),

// Other options
OptionAssigner(args.driverExtraClassPath, STANDALONE | YARN, CLUSTER,
sysProp = "spark.driver.extraClassPath"),
OptionAssigner(args.driverExtraJavaOptions, STANDALONE | YARN, true,
OptionAssigner(args.driverExtraJavaOptions, STANDALONE | YARN, CLUSTER,
sysProp = "spark.driver.extraJavaOptions"),
OptionAssigner(args.driverExtraLibraryPath, STANDALONE | YARN, true,
OptionAssigner(args.driverExtraLibraryPath, STANDALONE | YARN, CLUSTER,
sysProp = "spark.driver.extraLibraryPath"),
OptionAssigner(args.driverMemory, YARN, true, clOption = "--driver-memory"),
OptionAssigner(args.driverMemory, STANDALONE, true, clOption = "--memory"),
OptionAssigner(args.driverCores, STANDALONE, true, clOption = "--cores"),
OptionAssigner(args.queue, YARN, true, clOption = "--queue"),
OptionAssigner(args.queue, YARN, false, sysProp = "spark.yarn.queue"),
OptionAssigner(args.numExecutors, YARN, true, clOption = "--num-executors"),
OptionAssigner(args.numExecutors, YARN, false, sysProp = "spark.executor.instances"),
OptionAssigner(args.executorMemory, YARN, true, clOption = "--executor-memory"),
OptionAssigner(args.executorMemory, STANDALONE | MESOS | YARN, false,
OptionAssigner(args.executorMemory, STANDALONE | MESOS | YARN, CLIENT,
sysProp = "spark.executor.memory"),
OptionAssigner(args.executorCores, YARN, true, clOption = "--executor-cores"),
OptionAssigner(args.executorCores, YARN, false, sysProp = "spark.executor.cores"),
OptionAssigner(args.totalExecutorCores, STANDALONE | MESOS, false,
OptionAssigner(args.totalExecutorCores, STANDALONE | MESOS, CLIENT,
sysProp = "spark.cores.max"),
OptionAssigner(args.files, YARN, false, sysProp = "spark.yarn.dist.files"),
OptionAssigner(args.files, YARN, true, clOption = "--files"),
OptionAssigner(args.files, LOCAL | STANDALONE | MESOS, false, sysProp = "spark.files"),
OptionAssigner(args.files, LOCAL | STANDALONE | MESOS, true, sysProp = "spark.files"),
OptionAssigner(args.archives, YARN, false, sysProp = "spark.yarn.dist.archives"),
OptionAssigner(args.archives, YARN, true, clOption = "--archives"),
OptionAssigner(args.jars, YARN, true, clOption = "--addJars"),
OptionAssigner(args.jars, ALL_CLUSTER_MGRS, false, sysProp = "spark.jars")
OptionAssigner(args.files, LOCAL | STANDALONE | MESOS, ALL_DEPLOY_MODES,
sysProp = "spark.files")

// For client mode make any added jars immediately visible on the classpath
if (args.jars != null && !deployOnCluster) {
for (jar <- args.jars.split(",")) {
childClasspath += jar
// In client mode, launch the application main class directly
// In addition, add the main application jar and any added jars (if any) to the classpath
if (deployMode == CLIENT) {
childMainClass = args.mainClass
if (isUserJar(args.primaryResource)) {
childClasspath += args.primaryResource
if (args.jars != null) { childClasspath ++= args.jars.split(",") }
if (args.childArgs != null) { childArgs ++= args.childArgs }

// Map all arguments to command-line options or system properties for our chosen mode
for (opt <- options) {
if (opt.value != null && deployOnCluster == opt.deployOnCluster &&
if (opt.value != null &&
(deployMode & opt.deployMode) != 0 &&
(clusterManager & opt.clusterManager) != 0) {
if (opt.clOption != null) {
childArgs += (opt.clOption, opt.value)
if (opt.sysProp != null) {
sysProps.put(opt.sysProp, opt.value)
if (opt.clOption != null) { childArgs += (opt.clOption, opt.value) }
if (opt.sysProp != null) { sysProps.put(opt.sysProp, opt.value) }

// Add the application jar automatically so the user doesn't have to call sc.addJar
// For YARN cluster mode, the jar is already distributed on each node as "app.jar"
// For python files, the primary resource is already distributed as a regular file
if (!isYarnCluster && !isPython) {
var jars = sysProps.get("spark.jars").map(x => x.split(",").toSeq).getOrElse(Seq())
val isYarnCluster = clusterManager == YARN && deployMode == CLUSTER
if (!isYarnCluster && !args.isPython) {
var jars = sysProps.get("spark.jars").map(x => x.split(",").toSeq).getOrElse(Seq.empty)
if (isUserJar(args.primaryResource)) {
jars = jars ++ Seq(args.primaryResource)
sysProps.put("spark.jars", jars.mkString(","))

// Standalone cluster specific configurations
if (deployOnCluster && clusterManager == STANDALONE) {
// In standalone-cluster mode, use Client as a wrapper around the user class
if (clusterManager == STANDALONE && deployMode == CLUSTER) {
childMainClass = "org.apache.spark.deploy.Client"
if (args.supervise) {
childArgs += "--supervise"
childMainClass = "org.apache.spark.deploy.Client"
childArgs += "launch"
childArgs += (args.master, args.primaryResource, args.mainClass)
if (args.childArgs != null) {
childArgs ++= args.childArgs

// Arguments to be passed to user program
if (args.childArgs != null) {
if (!deployOnCluster || clusterManager == STANDALONE) {
childArgs ++= args.childArgs
} else if (clusterManager == YARN) {
for (arg <- args.childArgs) {
childArgs += ("--arg", arg)
// In yarn-cluster mode, use yarn.Client as a wrapper around the user class
if (clusterManager == YARN && deployMode == CLUSTER) {
childMainClass = "org.apache.spark.deploy.yarn.Client"
childArgs += ("--jar", args.primaryResource)
childArgs += ("--class", args.mainClass)
if (args.childArgs != null) {
args.childArgs.foreach { arg => childArgs += ("--arg", arg) }

// Read from default spark properties, if any
for ((k, v) <- args.getDefaultSparkProperties) {
if (!sysProps.contains(k)) sysProps(k) = v
sysProps.getOrElseUpdate(k, v)

(childArgs, childClasspath, sysProps, childMainClass)
Expand Down Expand Up @@ -364,6 +365,6 @@ object SparkSubmit {
private[spark] case class OptionAssigner(
value: String,
clusterManager: Int,
deployOnCluster: Boolean,
deployMode: Int,
clOption: String = null,
sysProp: String = null)

0 comments on commit fcb1881

Please sign in to comment.