Skip to content

Commit

Permalink
Correctly propagate SPARK_JAVA_OPTS to driver/executor.
Browse files Browse the repository at this point in the history
Users expected it to be possible to set spark.* config options
using SPARK_JAVA_OPTS, but that's not possible when trying to
propagate the env variable using spark.*.extraJavaOptions. So
instead, in Yarn mode, propagate the env variable itself.

Also make sure that, in cluster mode, the warning about SPARK_JAVA_OPTS
being deprecated is printed to the logs.
  • Loading branch information
Marcelo Vanzin committed Jun 19, 2014
1 parent 6a454ea commit 4e7f066
Show file tree
Hide file tree
Showing 4 changed files with 42 additions and 6 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -178,7 +178,6 @@ object Client {
System.setProperty("SPARK_YARN_MODE", "true")

val sparkConf = new SparkConf
sparkConf.validateSettings()

try {
val args = new ClientArguments(argStrings, sparkConf)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -294,8 +294,6 @@ trait ClientBase extends Logging {
// Pass SPARK_YARN_USER_ENV itself to the AM so it can use it to set up executor environments.
env("SPARK_YARN_USER_ENV") = userEnvs
}

logInfo(s"ApplicationMaster environment: $env")
env
}

Expand All @@ -320,6 +318,37 @@ trait ClientBase extends Logging {
logInfo("Setting up container launch context")
val amContainer = Records.newRecord(classOf[ContainerLaunchContext])
amContainer.setLocalResources(localResources)

// In cluster mode, if the deprecated SPARK_JAVA_OPTS is set, we need to propagate it to
// executors. But we can't just set spark.executor.extraJavaOptions, because the driver's
// SparkContext will not let that set spark* system properties, which is expected behavior for
// Yarn clients. So propagate it through the environment.
//
// Note that to warn the user about the deprecation in cluster mode, some code from
// SparkConf#validateSettings() is duplicated here (to avoid triggering the condition
// described above).
if (args.amClass == classOf[ApplicationMaster].getName) {
sys.env.get("SPARK_JAVA_OPTS").foreach { value =>
val warning =
s"""
|SPARK_JAVA_OPTS was detected (set to '$value').
|This is deprecated in Spark 1.0+.
|
|Please instead use:
| - ./spark-submit with conf/spark-defaults.conf to set defaults for an application
| - ./spark-submit with --driver-java-options to set -X options for a driver
| - spark.executor.extraJavaOptions to set -X options for executors
""".stripMargin
logWarning(warning)
for (proc <- Seq("driver", "executor")) {
val key = s"spark.$proc.extraJavaOptions"
if (sparkConf.contains(key)) {
throw new SparkException(s"Found both $key and SPARK_JAVA_OPTS. Use only the former.")
}
}
env("SPARK_JAVA_OPTS") = value
}
}
amContainer.setEnvironment(env)

val amMemory = calculateAMMemory(newApp)
Expand Down Expand Up @@ -357,8 +386,11 @@ trait ClientBase extends Logging {
for ((k, v) <- sparkConf.getAll) {
javaOpts += "-D" + k + "=" + "\\\"" + v + "\\\""
}

if (args.amClass == classOf[ApplicationMaster].getName) {
sparkConf.getOption("spark.driver.extraJavaOptions").foreach(opts => javaOpts += opts)
sparkConf.getOption("spark.driver.extraJavaOptions")
.orElse(sys.env.get("SPARK_JAVA_OPTS"))
.foreach(opts => javaOpts += opts)
sparkConf.getOption("spark.driver.libraryPath")
.foreach(p => javaOpts += s"-Djava.library.path=$p")
}
Expand All @@ -374,7 +406,10 @@ trait ClientBase extends Logging {
"1>", ApplicationConstants.LOG_DIR_EXPANSION_VAR + "/stdout",
"2>", ApplicationConstants.LOG_DIR_EXPANSION_VAR + "/stderr")

logInfo("Command for starting the Spark ApplicationMaster: " + commands)
logInfo("Yarn AM launch context:")
logInfo(s" class: ${args.amClass}")
logInfo(s" env: $env")
logInfo(s" command: ${commands.mkString(" ")}")

// TODO: it would be nicer to just make sure there are no null commands here
val printableCommands = commands.map(s => if (s == null) "null" else s).toList
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,9 @@ trait ExecutorRunnableUtil extends Logging {
sys.props.get("spark.executor.extraJavaOptions").foreach { opts =>
javaOpts += opts
}
sys.env.get("SPARK_JAVA_OPTS").foreach { opts =>
javaOpts += opts
}

javaOpts += "-Djava.io.tmpdir=" +
new Path(Environment.PWD.$(), YarnConfiguration.DEFAULT_CONTAINER_TEMP_DIR)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -185,7 +185,6 @@ object Client {
// see Client#setupLaunchEnv().
System.setProperty("SPARK_YARN_MODE", "true")
val sparkConf = new SparkConf()
sparkConf.validateSettings()

try {
val args = new ClientArguments(argStrings, sparkConf)
Expand Down

0 comments on commit 4e7f066

Please sign in to comment.