Skip to content

Commit

Permalink
Browse files Browse the repository at this point in the history
  • Loading branch information
chutium committed Jul 17, 2014
2 parents 88cb37d + 9c73822 commit 094f773
Show file tree
Hide file tree
Showing 5 changed files with 191 additions and 204 deletions.
289 changes: 145 additions & 144 deletions core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala
Original file line number Diff line number Diff line change
Expand Up @@ -27,25 +27,39 @@ import org.apache.spark.executor.ExecutorURLClassLoader
import org.apache.spark.util.Utils

/**
* Scala code behind the spark-submit script. The script handles setting up the classpath with
* relevant Spark dependencies and provides a layer over the different cluster managers and deploy
* modes that Spark supports.
* Main gateway of launching a Spark application.
*
* This program handles setting up the classpath with relevant Spark dependencies and provides
* a layer over the different cluster managers and deploy modes that Spark supports.
*/
object SparkSubmit {

// Cluster managers
private val YARN = 1
private val STANDALONE = 2
private val MESOS = 4
private val LOCAL = 8
private val ALL_CLUSTER_MGRS = YARN | STANDALONE | MESOS | LOCAL

private var clusterManager: Int = LOCAL
// Deploy modes
private val CLIENT = 1
private val CLUSTER = 2
private val ALL_DEPLOY_MODES = CLIENT | CLUSTER

/**
* Special primary resource names that represent shells rather than application jars.
*/
// Special primary resource names that represent shells rather than application jars.
private val SPARK_SHELL = "spark-shell"
private val PYSPARK_SHELL = "pyspark-shell"

// Exposed for testing
private[spark] var exitFn: () => Unit = () => System.exit(-1)
private[spark] var printStream: PrintStream = System.err
private[spark] def printWarning(str: String) = printStream.println("Warning: " + str)
private[spark] def printErrorAndExit(str: String) = {
printStream.println("Error: " + str)
printStream.println("Run with --help for usage help or --verbose for debug output")
exitFn()
}

def main(args: Array[String]) {
val appArgs = new SparkSubmitArguments(args)
if (appArgs.verbose) {
Expand All @@ -55,88 +69,80 @@ object SparkSubmit {
launch(childArgs, classpath, sysProps, mainClass, appArgs.verbose)
}

// Exposed for testing
private[spark] var printStream: PrintStream = System.err
private[spark] var exitFn: () => Unit = () => System.exit(-1)

private[spark] def printErrorAndExit(str: String) = {
printStream.println("Error: " + str)
printStream.println("Run with --help for usage help or --verbose for debug output")
exitFn()
}
private[spark] def printWarning(str: String) = printStream.println("Warning: " + str)

/**
* @return a tuple containing the arguments for the child, a list of classpath
* entries for the child, a list of system properties, a list of env vars
* and the main class for the child
* @return a tuple containing
* (1) the arguments for the child process,
* (2) a list of classpath entries for the child,
* (3) a list of system properties and env vars, and
* (4) the main class for the child
*/
private[spark] def createLaunchEnv(args: SparkSubmitArguments)
: (ArrayBuffer[String], ArrayBuffer[String], Map[String, String], String) = {
if (args.master.startsWith("local")) {
clusterManager = LOCAL
} else if (args.master.startsWith("yarn")) {
clusterManager = YARN
} else if (args.master.startsWith("spark")) {
clusterManager = STANDALONE
} else if (args.master.startsWith("mesos")) {
clusterManager = MESOS
} else {
printErrorAndExit("Master must start with yarn, mesos, spark, or local")
}

// Because "yarn-cluster" and "yarn-client" encapsulate both the master
// and deploy mode, we have some logic to infer the master and deploy mode
// from each other if only one is specified, or exit early if they are at odds.
if (args.deployMode == null &&
(args.master == "yarn-standalone" || args.master == "yarn-cluster")) {
args.deployMode = "cluster"
}
if (args.deployMode == "cluster" && args.master == "yarn-client") {
printErrorAndExit("Deploy mode \"cluster\" and master \"yarn-client\" are not compatible")
}
if (args.deployMode == "client" &&
(args.master == "yarn-standalone" || args.master == "yarn-cluster")) {
printErrorAndExit("Deploy mode \"client\" and master \"" + args.master
+ "\" are not compatible")
}
if (args.deployMode == "cluster" && args.master.startsWith("yarn")) {
args.master = "yarn-cluster"
}
if (args.deployMode != "cluster" && args.master.startsWith("yarn")) {
args.master = "yarn-client"
}

val deployOnCluster = Option(args.deployMode).getOrElse("client") == "cluster"

val childClasspath = new ArrayBuffer[String]()
// Values to return
val childArgs = new ArrayBuffer[String]()
val childClasspath = new ArrayBuffer[String]()
val sysProps = new HashMap[String, String]()
var childMainClass = ""

val isPython = args.isPython
val isYarnCluster = clusterManager == YARN && deployOnCluster
// Set the cluster manager
val clusterManager: Int = args.master match {
case m if m.startsWith("yarn") => YARN
case m if m.startsWith("spark") => STANDALONE
case m if m.startsWith("mesos") => MESOS
case m if m.startsWith("local") => LOCAL
case _ => printErrorAndExit("Master must start with yarn, spark, mesos, or local"); -1
}

// For mesos, only client mode is supported
if (clusterManager == MESOS && deployOnCluster) {
printErrorAndExit("Cluster deploy mode is currently not supported for Mesos clusters.")
// Set the deploy mode; default is client mode
var deployMode: Int = args.deployMode match {
case "client" | null => CLIENT
case "cluster" => CLUSTER
case _ => printErrorAndExit("Deploy mode must be either client or cluster"); -1
}

// For standalone, only client mode is supported
if (clusterManager == STANDALONE && deployOnCluster) {
printErrorAndExit("Cluster deploy mode is currently not supported for standalone clusters.")
// Because "yarn-cluster" and "yarn-client" encapsulate both the master
// and deploy mode, we have some logic to infer the master and deploy mode
// from each other if only one is specified, or exit early if they are at odds.
if (clusterManager == YARN) {
if (args.master == "yarn-standalone") {
printWarning("\"yarn-standalone\" is deprecated. Use \"yarn-cluster\" instead.")
args.master = "yarn-cluster"
}
(args.master, args.deployMode) match {
case ("yarn-cluster", null) =>
deployMode = CLUSTER
case ("yarn-cluster", "client") =>
printErrorAndExit("Client deploy mode is not compatible with master \"yarn-cluster\"")
case ("yarn-client", "cluster") =>
printErrorAndExit("Cluster deploy mode is not compatible with master \"yarn-client\"")
case (_, mode) =>
args.master = "yarn-" + Option(mode).getOrElse("client")
}

// Make sure YARN is included in our build if we're trying to use it
if (!Utils.classIsLoadable("org.apache.spark.deploy.yarn.Client") && !Utils.isTesting) {
printErrorAndExit(
"Could not load YARN classes. " +
"This copy of Spark may not have been compiled with YARN support.")
}
}

// For shells, only client mode is applicable
if (isShell(args.primaryResource) && deployOnCluster) {
printErrorAndExit("Cluster deploy mode is not applicable to Spark shells.")
// The following modes are not supported or applicable
(clusterManager, deployMode) match {
case (MESOS, CLUSTER) =>
printErrorAndExit("Cluster deploy mode is currently not supported for Mesos clusters.")
case (STANDALONE, CLUSTER) =>
printErrorAndExit("Cluster deploy mode is currently not supported for Standalone clusters.")
case (_, CLUSTER) if args.isPython =>
printErrorAndExit("Cluster deploy mode is currently not supported for python applications.")
case (_, CLUSTER) if isShell(args.primaryResource) =>
printErrorAndExit("Cluster deploy mode is not applicable to Spark shells.")
case _ =>
}

// If we're running a python app, set the main class to our specific python runner
if (isPython) {
if (deployOnCluster) {
printErrorAndExit("Cluster deploy mode is currently not supported for python.")
}
if (args.isPython) {
if (args.primaryResource == PYSPARK_SHELL) {
args.mainClass = "py4j.GatewayServer"
args.childArgs = ArrayBuffer("--die-on-broken-pipe", "0")
Expand All @@ -152,120 +158,115 @@ object SparkSubmit {
sysProps("spark.submit.pyFiles") = PythonRunner.formatPaths(args.pyFiles).mkString(",")
}

// If we're deploying into YARN, use yarn.Client as a wrapper around the user class
if (!deployOnCluster) {
childMainClass = args.mainClass
if (isUserJar(args.primaryResource)) {
childClasspath += args.primaryResource
}
} else if (clusterManager == YARN) {
childMainClass = "org.apache.spark.deploy.yarn.Client"
childArgs += ("--jar", args.primaryResource)
childArgs += ("--class", args.mainClass)
}

// Make sure YARN is included in our build if we're trying to use it
if (clusterManager == YARN) {
if (!Utils.classIsLoadable("org.apache.spark.deploy.yarn.Client") && !Utils.isTesting) {
printErrorAndExit("Could not load YARN classes. " +
"This copy of Spark may not have been compiled with YARN support.")
}
}

// Special flag to avoid deprecation warnings at the client
sysProps("SPARK_SUBMIT") = "true"

// A list of rules to map each argument to system properties or command-line options in
// each deploy mode; we iterate through these below
val options = List[OptionAssigner](
OptionAssigner(args.master, ALL_CLUSTER_MGRS, false, sysProp = "spark.master"),
OptionAssigner(args.name, ALL_CLUSTER_MGRS, false, sysProp = "spark.app.name"),
OptionAssigner(args.name, YARN, true, clOption = "--name", sysProp = "spark.app.name"),
OptionAssigner(args.driverExtraClassPath, STANDALONE | YARN, true,

// All cluster managers
OptionAssigner(args.master, ALL_CLUSTER_MGRS, CLIENT, sysProp = "spark.master"),
OptionAssigner(args.name, ALL_CLUSTER_MGRS, CLIENT, sysProp = "spark.app.name"),
OptionAssigner(args.jars, ALL_CLUSTER_MGRS, CLIENT, sysProp = "spark.jars"),

// Standalone cluster only
OptionAssigner(args.driverMemory, STANDALONE, CLUSTER, clOption = "--memory"),
OptionAssigner(args.driverCores, STANDALONE, CLUSTER, clOption = "--cores"),

// Yarn client only
OptionAssigner(args.queue, YARN, CLIENT, sysProp = "spark.yarn.queue"),
OptionAssigner(args.numExecutors, YARN, CLIENT, sysProp = "spark.executor.instances"),
OptionAssigner(args.executorCores, YARN, CLIENT, sysProp = "spark.executor.cores"),
OptionAssigner(args.files, YARN, CLIENT, sysProp = "spark.yarn.dist.files"),
OptionAssigner(args.archives, YARN, CLIENT, sysProp = "spark.yarn.dist.archives"),

// Yarn cluster only
OptionAssigner(args.name, YARN, CLUSTER, clOption = "--name", sysProp = "spark.app.name"),
OptionAssigner(args.driverMemory, YARN, CLUSTER, clOption = "--driver-memory"),
OptionAssigner(args.queue, YARN, CLUSTER, clOption = "--queue"),
OptionAssigner(args.numExecutors, YARN, CLUSTER, clOption = "--num-executors"),
OptionAssigner(args.executorMemory, YARN, CLUSTER, clOption = "--executor-memory"),
OptionAssigner(args.executorCores, YARN, CLUSTER, clOption = "--executor-cores"),
OptionAssigner(args.files, YARN, CLUSTER, clOption = "--files"),
OptionAssigner(args.archives, YARN, CLUSTER, clOption = "--archives"),
OptionAssigner(args.jars, YARN, CLUSTER, clOption = "--addJars"),

// Other options
OptionAssigner(args.driverExtraClassPath, STANDALONE | YARN, CLUSTER,
sysProp = "spark.driver.extraClassPath"),
OptionAssigner(args.driverExtraJavaOptions, STANDALONE | YARN, true,
OptionAssigner(args.driverExtraJavaOptions, STANDALONE | YARN, CLUSTER,
sysProp = "spark.driver.extraJavaOptions"),
OptionAssigner(args.driverExtraLibraryPath, STANDALONE | YARN, true,
OptionAssigner(args.driverExtraLibraryPath, STANDALONE | YARN, CLUSTER,
sysProp = "spark.driver.extraLibraryPath"),
OptionAssigner(args.driverMemory, YARN, true, clOption = "--driver-memory"),
OptionAssigner(args.driverMemory, STANDALONE, true, clOption = "--memory"),
OptionAssigner(args.driverCores, STANDALONE, true, clOption = "--cores"),
OptionAssigner(args.queue, YARN, true, clOption = "--queue"),
OptionAssigner(args.queue, YARN, false, sysProp = "spark.yarn.queue"),
OptionAssigner(args.numExecutors, YARN, true, clOption = "--num-executors"),
OptionAssigner(args.numExecutors, YARN, false, sysProp = "spark.executor.instances"),
OptionAssigner(args.executorMemory, YARN, true, clOption = "--executor-memory"),
OptionAssigner(args.executorMemory, STANDALONE | MESOS | YARN, false,
OptionAssigner(args.executorMemory, STANDALONE | MESOS | YARN, CLIENT,
sysProp = "spark.executor.memory"),
OptionAssigner(args.executorCores, YARN, true, clOption = "--executor-cores"),
OptionAssigner(args.executorCores, YARN, false, sysProp = "spark.executor.cores"),
OptionAssigner(args.totalExecutorCores, STANDALONE | MESOS, false,
OptionAssigner(args.totalExecutorCores, STANDALONE | MESOS, CLIENT,
sysProp = "spark.cores.max"),
OptionAssigner(args.files, YARN, false, sysProp = "spark.yarn.dist.files"),
OptionAssigner(args.files, YARN, true, clOption = "--files"),
OptionAssigner(args.files, LOCAL | STANDALONE | MESOS, false, sysProp = "spark.files"),
OptionAssigner(args.files, LOCAL | STANDALONE | MESOS, true, sysProp = "spark.files"),
OptionAssigner(args.archives, YARN, false, sysProp = "spark.yarn.dist.archives"),
OptionAssigner(args.archives, YARN, true, clOption = "--archives"),
OptionAssigner(args.jars, YARN, true, clOption = "--addJars"),
OptionAssigner(args.jars, ALL_CLUSTER_MGRS, false, sysProp = "spark.jars")
OptionAssigner(args.files, LOCAL | STANDALONE | MESOS, ALL_DEPLOY_MODES,
sysProp = "spark.files")
)

// For client mode make any added jars immediately visible on the classpath
if (args.jars != null && !deployOnCluster) {
for (jar <- args.jars.split(",")) {
childClasspath += jar
// In client mode, launch the application main class directly
// In addition, add the main application jar and any added jars (if any) to the classpath
if (deployMode == CLIENT) {
childMainClass = args.mainClass
if (isUserJar(args.primaryResource)) {
childClasspath += args.primaryResource
}
if (args.jars != null) { childClasspath ++= args.jars.split(",") }
if (args.childArgs != null) { childArgs ++= args.childArgs }
}


// Map all arguments to command-line options or system properties for our chosen mode
for (opt <- options) {
if (opt.value != null && deployOnCluster == opt.deployOnCluster &&
if (opt.value != null &&
(deployMode & opt.deployMode) != 0 &&
(clusterManager & opt.clusterManager) != 0) {
if (opt.clOption != null) {
childArgs += (opt.clOption, opt.value)
}
if (opt.sysProp != null) {
sysProps.put(opt.sysProp, opt.value)
}
if (opt.clOption != null) { childArgs += (opt.clOption, opt.value) }
if (opt.sysProp != null) { sysProps.put(opt.sysProp, opt.value) }
}
}

// Add the application jar automatically so the user doesn't have to call sc.addJar
// For YARN cluster mode, the jar is already distributed on each node as "app.jar"
// For python files, the primary resource is already distributed as a regular file
if (!isYarnCluster && !isPython) {
var jars = sysProps.get("spark.jars").map(x => x.split(",").toSeq).getOrElse(Seq())
val isYarnCluster = clusterManager == YARN && deployMode == CLUSTER
if (!isYarnCluster && !args.isPython) {
var jars = sysProps.get("spark.jars").map(x => x.split(",").toSeq).getOrElse(Seq.empty)
if (isUserJar(args.primaryResource)) {
jars = jars ++ Seq(args.primaryResource)
}
sysProps.put("spark.jars", jars.mkString(","))
}

// Standalone cluster specific configurations
if (deployOnCluster && clusterManager == STANDALONE) {
// In standalone-cluster mode, use Client as a wrapper around the user class
if (clusterManager == STANDALONE && deployMode == CLUSTER) {
childMainClass = "org.apache.spark.deploy.Client"
if (args.supervise) {
childArgs += "--supervise"
}
childMainClass = "org.apache.spark.deploy.Client"
childArgs += "launch"
childArgs += (args.master, args.primaryResource, args.mainClass)
if (args.childArgs != null) {
childArgs ++= args.childArgs
}
}

// Arguments to be passed to user program
if (args.childArgs != null) {
if (!deployOnCluster || clusterManager == STANDALONE) {
childArgs ++= args.childArgs
} else if (clusterManager == YARN) {
for (arg <- args.childArgs) {
childArgs += ("--arg", arg)
}
// In yarn-cluster mode, use yarn.Client as a wrapper around the user class
if (clusterManager == YARN && deployMode == CLUSTER) {
childMainClass = "org.apache.spark.deploy.yarn.Client"
childArgs += ("--jar", args.primaryResource)
childArgs += ("--class", args.mainClass)
if (args.childArgs != null) {
args.childArgs.foreach { arg => childArgs += ("--arg", arg) }
}
}

// Read from default spark properties, if any
for ((k, v) <- args.getDefaultSparkProperties) {
if (!sysProps.contains(k)) sysProps(k) = v
sysProps.getOrElseUpdate(k, v)
}

(childArgs, childClasspath, sysProps, childMainClass)
Expand Down Expand Up @@ -364,6 +365,6 @@ object SparkSubmit {
private[spark] case class OptionAssigner(
value: String,
clusterManager: Int,
deployOnCluster: Boolean,
deployMode: Int,
clOption: String = null,
sysProp: String = null)
Loading

0 comments on commit 094f773

Please sign in to comment.