Skip to content

Commit

Permalink
Fix cluster mode, restore SPARK_LOG4J_CONF.
Browse files Browse the repository at this point in the history
Also add documentation about logging to the Yarn guide.

In cluster mode, the change modifies some code added in fb98488
to treat both client and cluster modes as mostly the same. Previously,
cluster mode was only forwarding system properties that started
with "spark", which caused it to ignore anything that SparkSubmit
sets directly in the SparkConf object.
  • Loading branch information
Marcelo Vanzin committed Jun 19, 2014
1 parent 1dfbb40 commit e5c682d
Show file tree
Hide file tree
Showing 2 changed files with 32 additions and 18 deletions.
15 changes: 14 additions & 1 deletion docs/running-on-yarn.md
Original file line number Diff line number Diff line change
Expand Up @@ -165,7 +165,20 @@ all environment variables used for launching each container. This process is use
classpath problems in particular. (Note that enabling this requires admin privileges on cluster
settings and a restart of all node managers. Thus, this is not applicable to hosted clusters).

# Important Notes
To use a custom log4j configuration for the application master or executors, there are two options:

- upload a custom log4j.properties using spark-submit, by adding it to the "--files" list of files
to be uploaded with the application.
- add "-Dlog4j.configuration=<location of configuration file>" to "spark.driver.extraJavaOptions"
(for the driver) or "spark.executor.extraJavaOptions" (for executors). Note that if using a file,
the "file:" protocol should be explicitly provided, and the file needs to exist locally on all
the nodes.

Note that for the first option, both executors and the application master will share the same
log4j configuration, which may cause issues when they run on the same node (e.g. trying to write
to the same log file).

# Important notes

- Before Hadoop 2.2, YARN does not support cores in container resource requests. Thus, when running against an earlier version, the numbers of cores given via command line arguments cannot be passed to YARN. Whether core requests are honored in scheduling decisions depends on which scheduler is in use and how it is configured.
- The local directories used by Spark executors will be the local directories configured for YARN (Hadoop YARN config `yarn.nodemanager.local-dirs`). If the user specifies `spark.local.dir`, it will be ignored.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -213,9 +213,18 @@ trait ClientBase extends Logging {

val statCache: Map[URI, FileStatus] = HashMap[URI, FileStatus]()

val oldLog4jConf = Option(System.getenv("SPARK_LOG4J_CONF"))
if (oldLog4jConf.isDefined) {
logWarning(
"SPARK_LOG4J_CONF detected in the system environment. This variable has been " +
"deprecated. Please refer to the \"Launching Spark on YARN\" documentation " +
"for alternatives.")
}

List(
(ClientBase.SPARK_JAR, ClientBase.sparkJar(sparkConf), ClientBase.CONF_SPARK_JAR),
(ClientBase.APP_JAR, args.userJar, ClientBase.CONF_SPARK_USER_JAR)
(ClientBase.APP_JAR, args.userJar, ClientBase.CONF_SPARK_USER_JAR),
("log4j.properties", oldLog4jConf.getOrElse(null), null)
).foreach { case(destName, _localPath, confKey) =>
val localPath: String = if (_localPath != null) _localPath.trim() else ""
if (! localPath.isEmpty()) {
Expand All @@ -225,7 +234,7 @@ trait ClientBase extends Logging {
val destPath = copyRemoteFile(dst, qualifyForLocal(localURI), replication, setPermissions)
distCacheMgr.addResource(fs, conf, destPath, localResources, LocalResourceType.FILE,
destName, statCache)
} else {
} else if (confKey != null) {
sparkConf.set(confKey, localPath)
}
}
Expand Down Expand Up @@ -348,20 +357,13 @@ trait ClientBase extends Logging {
sparkConf.set("spark.driver.extraJavaOptions", opts)
}

// Forward the Spark configuration to the application master / executors.
// TODO: it might be nicer to pass these as an internal environment variable rather than
// as Java options, due to complications with string parsing of nested quotes.
if (args.amClass == classOf[ExecutorLauncher].getName) {
// If we are being launched in client mode, forward the spark-conf options
// onto the executor launcher
for ((k, v) <- sparkConf.getAll) {
javaOpts += "-D" + k + "=" + "\\\"" + v + "\\\""
}
} else {
// If we are being launched in standalone mode, capture and forward any spark
// system properties (e.g. set by spark-class).
for ((k, v) <- sys.props.filterKeys(_.startsWith("spark"))) {
javaOpts += "-D" + k + "=" + "\\\"" + v + "\\\""
}
for ((k, v) <- sparkConf.getAll) {
javaOpts += "-D" + k + "=" + "\\\"" + v + "\\\""
}
if (args.amClass == classOf[ApplicationMaster].getName) {
sys.props.get("spark.driver.extraJavaOptions").foreach(opts => javaOpts += opts)
sys.props.get("spark.driver.libraryPath").foreach(p => javaOpts += s"-Djava.library.path=$p")
}
Expand Down Expand Up @@ -522,11 +524,10 @@ object ClientBase extends Logging {
}
}
} else {
val userJar = conf.getOption(CONF_SPARK_USER_JAR).getOrElse(null)
val userJar = conf.get(CONF_SPARK_USER_JAR, null)
addFileToClasspath(userJar, APP_JAR, env)

val cachedSecondaryJarLinks =
conf.getOption(CONF_SPARK_YARN_SECONDARY_JARS).getOrElse("").split(",")
val cachedSecondaryJarLinks = conf.get(CONF_SPARK_YARN_SECONDARY_JARS, "").split(",")
cachedSecondaryJarLinks.foreach(jar => addFileToClasspath(jar, null, env))
}
}
Expand Down

0 comments on commit e5c682d

Please sign in to comment.