Skip to content

Commit

Permalink
SPARK-2664. Deal with --conf options in spark-submit that relate to…
Browse files Browse the repository at this point in the history
… flags
  • Loading branch information
sryza committed Jul 30, 2014
1 parent fc47bb6 commit 0518c63
Show file tree
Hide file tree
Showing 3 changed files with 38 additions and 15 deletions.
11 changes: 7 additions & 4 deletions core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala
Original file line number Diff line number Diff line change
Expand Up @@ -184,7 +184,7 @@ object SparkSubmit {
OptionAssigner(args.archives, YARN, CLIENT, sysProp = "spark.yarn.dist.archives"),

// Yarn cluster only
OptionAssigner(args.name, YARN, CLUSTER, clOption = "--name", sysProp = "spark.app.name"),
OptionAssigner(args.name, YARN, CLUSTER, clOption = "--name"),
OptionAssigner(args.driverMemory, YARN, CLUSTER, clOption = "--driver-memory"),
OptionAssigner(args.queue, YARN, CLUSTER, clOption = "--queue"),
OptionAssigner(args.numExecutors, YARN, CLUSTER, clOption = "--num-executors"),
Expand Down Expand Up @@ -268,14 +268,17 @@ object SparkSubmit {
}
}

// Properties given with --conf are superceded by other options, but take precedence over
// properties in the defaults file.
for ((k, v) <- args.sparkProperties) {
sysProps.getOrElseUpdate(k, v)
}

// Read from default spark properties, if any
for ((k, v) <- args.getDefaultSparkProperties) {
sysProps.getOrElseUpdate(k, v)
}

// Spark properties included on command line take precedence
sysProps ++= args.sparkProperties

(childArgs, childClasspath, sysProps, childMainClass)
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ private[spark] class SparkSubmitArguments(args: Seq[String]) {
val sparkProperties: HashMap[String, String] = new HashMap[String, String]()

parseOpts(args.toList)
loadDefaults()
mergeSparkProperties()
checkRequiredArguments()

/** Return default present in the currently defined defaults file. */
Expand All @@ -79,9 +79,11 @@ private[spark] class SparkSubmitArguments(args: Seq[String]) {
defaultProperties
}

/** Fill in any undefined values based on the current properties file or built-in defaults. */
private def loadDefaults(): Unit = {

/**
* Fill in any undefined values based on the default properties file or options passed in through
* the '--conf' flag.
*/
private def mergeSparkProperties(): Unit = {
// Use common defaults file, if not specified by user
if (propertiesFile == null) {
sys.env.get("SPARK_HOME").foreach { sparkHome =>
Expand All @@ -94,18 +96,20 @@ private[spark] class SparkSubmitArguments(args: Seq[String]) {
}
}

val defaultProperties = getDefaultSparkProperties
val properties = getDefaultSparkProperties
properties.putAll(sparkProperties)

// Use properties file as fallback for values which have a direct analog to
// arguments in this script.
master = Option(master).getOrElse(defaultProperties.get("spark.master").orNull)
master = Option(master).getOrElse(properties.get("spark.master").orNull)
executorMemory = Option(executorMemory)
.getOrElse(defaultProperties.get("spark.executor.memory").orNull)
.getOrElse(properties.get("spark.executor.memory").orNull)
executorCores = Option(executorCores)
.getOrElse(defaultProperties.get("spark.executor.cores").orNull)
.getOrElse(properties.get("spark.executor.cores").orNull)
totalExecutorCores = Option(totalExecutorCores)
.getOrElse(defaultProperties.get("spark.cores.max").orNull)
name = Option(name).getOrElse(defaultProperties.get("spark.app.name").orNull)
jars = Option(jars).getOrElse(defaultProperties.get("spark.jars").orNull)
.getOrElse(properties.get("spark.cores.max").orNull)
name = Option(name).getOrElse(properties.get("spark.app.name").orNull)
jars = Option(jars).getOrElse(properties.get("spark.jars").orNull)

// This supports env vars in older versions of Spark
master = Option(master).getOrElse(System.getenv("MASTER"))
Expand Down
16 changes: 16 additions & 0 deletions core/src/test/scala/org/apache/spark/deploy/SparkSubmitSuite.scala
Original file line number Diff line number Diff line change
Expand Up @@ -253,6 +253,22 @@ class SparkSubmitSuite extends FunSuite with Matchers {
sysProps("spark.shuffle.spill") should be ("false")
}

test("handles confs with flag equivalents") {
val clArgs = Seq(
"--deploy-mode", "cluster",
"--executor-memory", "5g",
"--class", "org.SomeClass",
"--conf", "spark.executor.memory=4g",
"--conf", "spark.master=yarn",
"thejar.jar",
"arg1", "arg2")
val appArgs = new SparkSubmitArguments(clArgs)
val (_, _, sysProps, mainClass) = createLaunchEnv(appArgs)
sysProps("spark.executor.memory") should be ("5g")
sysProps("spark.master") should be ("yarn-cluster")
mainClass should be ("org.apache.spark.deploy.yarn.Client")
}

test("launch simple application with spark-submit") {
val unusedJar = TestUtils.createJarWithClasses(Seq.empty)
val args = Seq(
Expand Down

0 comments on commit 0518c63

Please sign in to comment.