Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

SPARK-2664. Deal with --conf options in spark-submit that relate to fl... #1665

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 7 additions & 4 deletions core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala
Original file line number Diff line number Diff line change
Expand Up @@ -184,7 +184,7 @@ object SparkSubmit {
OptionAssigner(args.archives, YARN, CLIENT, sysProp = "spark.yarn.dist.archives"),

// Yarn cluster only
OptionAssigner(args.name, YARN, CLUSTER, clOption = "--name", sysProp = "spark.app.name"),
OptionAssigner(args.name, YARN, CLUSTER, clOption = "--name"),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why does this one change?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The spark.app.name already gets set above with:
OptionAssigner(args.name, ALL_CLUSTER_MGRS, ALL_DEPLOY_MODES, sysProp = "spark.app.name"),
so this is just removing a redundancy.

For reasons that are too boring to be worth going into, this was more thematically affiliated to the rest of the changes before I rebased it. Not strictly related to this patch anymore, so I can fix it separately if you'd like.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No problem, it's fine to have it here, I was just not sure why it was needed.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It was first added in #699 to fix a YARN app name issue, but in #1538 we made spark.app.name available for all deploy modes and all cluster managers, which includes yarn-cluster. With these changes setting spark.app.name again here becomes redundant.

OptionAssigner(args.driverMemory, YARN, CLUSTER, clOption = "--driver-memory"),
OptionAssigner(args.queue, YARN, CLUSTER, clOption = "--queue"),
OptionAssigner(args.numExecutors, YARN, CLUSTER, clOption = "--num-executors"),
Expand Down Expand Up @@ -268,14 +268,17 @@ object SparkSubmit {
}
}

// Properties given with --conf are superceded by other options, but take precedence over
// properties in the defaults file.
for ((k, v) <- args.sparkProperties) {
sysProps.getOrElseUpdate(k, v)
}

// Read from default spark properties, if any
for ((k, v) <- args.getDefaultSparkProperties) {
sysProps.getOrElseUpdate(k, v)
}

// Spark properties included on command line take precedence
sysProps ++= args.sparkProperties

(childArgs, childClasspath, sysProps, childMainClass)
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ private[spark] class SparkSubmitArguments(args: Seq[String]) {
val sparkProperties: HashMap[String, String] = new HashMap[String, String]()

parseOpts(args.toList)
loadDefaults()
mergeSparkProperties()
checkRequiredArguments()

/** Return default present in the currently defined defaults file. */
Expand All @@ -79,9 +79,11 @@ private[spark] class SparkSubmitArguments(args: Seq[String]) {
defaultProperties
}

/** Fill in any undefined values based on the current properties file or built-in defaults. */
private def loadDefaults(): Unit = {

/**
* Fill in any undefined values based on the default properties file or options passed in through
* the '--conf' flag.
*/
private def mergeSparkProperties(): Unit = {
// Use common defaults file, if not specified by user
if (propertiesFile == null) {
sys.env.get("SPARK_HOME").foreach { sparkHome =>
Expand All @@ -94,18 +96,20 @@ private[spark] class SparkSubmitArguments(args: Seq[String]) {
}
}

val defaultProperties = getDefaultSparkProperties
val properties = getDefaultSparkProperties
properties.putAll(sparkProperties)

// Use properties file as fallback for values which have a direct analog to
// arguments in this script.
master = Option(master).getOrElse(defaultProperties.get("spark.master").orNull)
master = Option(master).getOrElse(properties.get("spark.master").orNull)
executorMemory = Option(executorMemory)
.getOrElse(defaultProperties.get("spark.executor.memory").orNull)
.getOrElse(properties.get("spark.executor.memory").orNull)
executorCores = Option(executorCores)
.getOrElse(defaultProperties.get("spark.executor.cores").orNull)
.getOrElse(properties.get("spark.executor.cores").orNull)
totalExecutorCores = Option(totalExecutorCores)
.getOrElse(defaultProperties.get("spark.cores.max").orNull)
name = Option(name).getOrElse(defaultProperties.get("spark.app.name").orNull)
jars = Option(jars).getOrElse(defaultProperties.get("spark.jars").orNull)
.getOrElse(properties.get("spark.cores.max").orNull)
name = Option(name).getOrElse(properties.get("spark.app.name").orNull)
jars = Option(jars).getOrElse(properties.get("spark.jars").orNull)

// This supports env vars in older versions of Spark
master = Option(master).getOrElse(System.getenv("MASTER"))
Expand Down
16 changes: 16 additions & 0 deletions core/src/test/scala/org/apache/spark/deploy/SparkSubmitSuite.scala
Original file line number Diff line number Diff line change
Expand Up @@ -253,6 +253,22 @@ class SparkSubmitSuite extends FunSuite with Matchers {
sysProps("spark.shuffle.spill") should be ("false")
}

test("handles confs with flag equivalents") {
val clArgs = Seq(
"--deploy-mode", "cluster",
"--executor-memory", "5g",
"--class", "org.SomeClass",
"--conf", "spark.executor.memory=4g",
"--conf", "spark.master=yarn",
"thejar.jar",
"arg1", "arg2")
val appArgs = new SparkSubmitArguments(clArgs)
val (_, _, sysProps, mainClass) = createLaunchEnv(appArgs)
sysProps("spark.executor.memory") should be ("5g")
sysProps("spark.master") should be ("yarn-cluster")
mainClass should be ("org.apache.spark.deploy.yarn.Client")
}

test("launch simple application with spark-submit") {
val unusedJar = TestUtils.createJarWithClasses(Seq.empty)
val args = Seq(
Expand Down