From 1590141ee753a7fb7b0358a8ef73fef553a1e16d Mon Sep 17 00:00:00 2001 From: Andrew Or Date: Tue, 16 Sep 2014 18:20:09 -0700 Subject: [PATCH] Address review comments --- .../org/apache/spark/deploy/yarn/Client.scala | 4 ++- .../spark/deploy/yarn/ClientArguments.scala | 15 ++++----- .../apache/spark/deploy/yarn/ClientBase.scala | 32 +++++++++++++------ .../deploy/yarn/YarnSparkHadoopUtil.scala | 8 +++-- .../cluster/YarnClientSchedulerBackend.scala | 12 ++++--- .../org/apache/spark/deploy/yarn/Client.scala | 4 ++- 6 files changed, 48 insertions(+), 27 deletions(-) diff --git a/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/Client.scala b/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/Client.scala index 0e8372992cfb7..5a20532315e59 100644 --- a/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/Client.scala +++ b/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/Client.scala @@ -125,7 +125,7 @@ private[spark] class Client( Option(report.getClientToken).getOrElse("") } -private[spark] object Client { +object Client { def main(argStrings: Array[String]) { if (!sys.props.contains("SPARK_SUBMIT")) { println("WARNING: This client is deprecated and will be removed in a " + @@ -145,5 +145,7 @@ private[spark] object Client { Console.err.println(e.getMessage) System.exit(1) } + + System.exit(0) } } diff --git a/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ClientArguments.scala b/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ClientArguments.scala index b66f177218219..201b742736c6e 100644 --- a/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ClientArguments.scala +++ b/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ClientArguments.scala @@ -47,22 +47,20 @@ private[spark] class ClientArguments(args: Array[String], sparkConf: SparkConf) "spark.yarn.executor.memoryOverhead", YarnSparkHadoopUtil.DEFAULT_MEMORY_OVERHEAD) parseArgs(args.toList) - loadDefaultArgs() + loadEnvironmentArgs() validateArgs() /** Load any default arguments provided through environment variables and Spark properties. */ - private def loadDefaultArgs(): Unit = { + private def loadEnvironmentArgs(): Unit = { // For backward compatibility, SPARK_YARN_DIST_{ARCHIVES/FILES} should be resolved to hdfs://, // while spark.yarn.dist.{archives/files} should be resolved to file:// (SPARK-2051). - files = Option(files).orElse(sys.env.get("SPARK_YARN_DIST_FILES")).orNull files = Option(files) - .orElse(sparkConf.getOption("spark.yarn.dist.files")) - .map(p => Utils.resolveURIs(p)) + .orElse(sys.env.get("SPARK_YARN_DIST_FILES")) + .orElse(sparkConf.getOption("spark.yarn.dist.files").map(p => Utils.resolveURIs(p))) .orNull - archives = Option(archives).orElse(sys.env.get("SPARK_YARN_DIST_ARCHIVES")).orNull archives = Option(archives) - .orElse(sparkConf.getOption("spark.yarn.dist.archives")) - .map(p => Utils.resolveURIs(p)) + .orElse(sys.env.get("SPARK_YARN_DIST_ARCHIVES")) + .orElse(sparkConf.getOption("spark.yarn.dist.archives").map(p => Utils.resolveURIs(p))) .orNull } @@ -71,6 +69,7 @@ private[spark] class ClientArguments(args: Array[String], sparkConf: SparkConf) * This is intended to be called only after the provided arguments have been parsed. */ private def validateArgs(): Unit = { + // TODO: memory checks are outdated (SPARK-3476) Map[Boolean, String]( (numExecutors <= 0) -> "You must specify at least 1 executor!", (amMemory <= amMemoryOverhead) -> s"AM memory must be > $amMemoryOverhead MB", diff --git a/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ClientBase.scala b/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ClientBase.scala index 5dd37accaf2f9..e874ed7e3dada 100644 --- a/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ClientBase.scala +++ b/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ClientBase.scala @@ -36,7 +36,9 @@ import org.apache.hadoop.yarn.api.protocolrecords._ import org.apache.hadoop.yarn.api.records._ import org.apache.hadoop.yarn.conf.YarnConfiguration import org.apache.hadoop.yarn.util.Records + import org.apache.spark.{Logging, SecurityManager, SparkConf, SparkContext, SparkException} +import org.apache.spark.util.Utils /** * The entry point (starting in Client#main() and Client#run()) for launching Spark on YARN. @@ -217,7 +219,6 @@ private[spark] trait ClientBase extends Logging { sparkConf.set(CONF_SPARK_YARN_SECONDARY_JARS, cachedSecondaryJarLinks.mkString(",")) } - UserGroupInformation.getCurrentUser().addCredentials(credentials) localResources } @@ -360,9 +361,9 @@ private[spark] trait ClientBase extends Logging { } val amClass = if (isLaunchingDriver) { - classOf[ApplicationMaster].getName() + Utils.getFormattedClassName(ApplicationMaster) } else { - classOf[ApplicationMaster].getName().replace("ApplicationMaster", "ExecutorLauncher") + Utils.getFormattedClassName(ExecutorLauncher) } val userArgs = args.userArgs.flatMap { arg => Seq("--arg", YarnSparkHadoopUtil.escapeForShell(arg)) @@ -400,6 +401,8 @@ private[spark] trait ClientBase extends Logging { val securityManager = new SecurityManager(sparkConf) amContainer.setApplicationACLs(YarnSparkHadoopUtil.getApplicationAclsForYarn(securityManager)) setupSecurityToken(amContainer) + UserGroupInformation.getCurrentUser().addCredentials(credentials) + amContainer } @@ -409,22 +412,24 @@ private[spark] trait ClientBase extends Logging { * * @param returnOnRunning Whether to also return the application state when it is RUNNING. * @param logApplicationReport Whether to log details of the application report every iteration. + * @param shouldKeepMonitoring The condition to keep monitoring. * @return state of the application, one of FINISHED, FAILED, KILLED, and RUNNING. */ def monitorApplication( appId: ApplicationId, returnOnRunning: Boolean = false, - logApplicationReport: Boolean = true): YarnApplicationState = { + logApplicationReport: Boolean = true, + shouldKeepMonitoring: () => Boolean = () => true): YarnApplicationState = { val interval = sparkConf.getLong("spark.yarn.report.interval", 1000) - while (true) { + var firstIteration = true + while (shouldKeepMonitoring()) { Thread.sleep(interval) val report = getApplicationReport(appId) val state = report.getYarnApplicationState if (logApplicationReport) { - logInfo(s"Application report from ResourceManager for application ${appId.getId} " + - s"(state: $state)") - logDebug("\n" + + logInfo(s"Application report from ResourceManager for app ${appId.getId} (state: $state)") + val details = "\n" + s"\t full application identifier: $appId\n" + s"\t clientToken: ${getClientToken(report)}\n" + s"\t appDiagnostics: ${report.getDiagnostics}\n" + @@ -435,7 +440,14 @@ private[spark] trait ClientBase extends Logging { s"\t yarnAppState: $state\n" + s"\t distributedFinalState: ${report.getFinalApplicationStatus}\n" + s"\t appTrackingUrl: ${report.getTrackingUrl}\n" + - s"\t appUser: ${report.getUser}") + s"\t appUser: ${report.getUser}" + + // Log report details every iteration if DEBUG is enabled, otherwise only the first + if (log.isDebugEnabled) { + logDebug(details) + } else if (firstIteration) { + logInfo(details) + } } if (state == YarnApplicationState.FINISHED || @@ -447,6 +459,8 @@ private[spark] trait ClientBase extends Logging { if (returnOnRunning && state == YarnApplicationState.RUNNING) { return state } + + firstIteration = false } // Never reached, but keeps compiler happy throw new SparkException("While loop is depleted! This should never happen...") diff --git a/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/YarnSparkHadoopUtil.scala b/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/YarnSparkHadoopUtil.scala index 517b2597e3212..0b712c201904a 100644 --- a/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/YarnSparkHadoopUtil.scala +++ b/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/YarnSparkHadoopUtil.scala @@ -42,7 +42,7 @@ import org.apache.spark.util.Utils /** * Contains util methods to interact with Hadoop from spark. */ -private[spark] class YarnSparkHadoopUtil extends SparkHadoopUtil { +class YarnSparkHadoopUtil extends SparkHadoopUtil { override def transferCredentials(source: UserGroupInformation, dest: UserGroupInformation) { dest.addCredentials(source.getCredentials()) @@ -83,7 +83,7 @@ private[spark] class YarnSparkHadoopUtil extends SparkHadoopUtil { } -private[spark] object YarnSparkHadoopUtil { +object YarnSparkHadoopUtil { // Additional memory overhead - in mb. val DEFAULT_MEMORY_OVERHEAD = 384 @@ -136,7 +136,9 @@ private[spark] object YarnSparkHadoopUtil { m.appendReplacement(sb, Matcher.quoteReplacement(replace)) } m.appendTail(sb) - env(parts(0)) = sb.toString + // This treats the environment variable as path variable delimited by `File.pathSeparator` + // This is kept for backward compatibility and consistency with Hadoop's behavior + addPathToEnvironment(env, parts(0), sb.toString) } } } diff --git a/yarn/common/src/main/scala/org/apache/spark/scheduler/cluster/YarnClientSchedulerBackend.scala b/yarn/common/src/main/scala/org/apache/spark/scheduler/cluster/YarnClientSchedulerBackend.scala index 35b2f4212bcd7..77f40ba5fc785 100644 --- a/yarn/common/src/main/scala/org/apache/spark/scheduler/cluster/YarnClientSchedulerBackend.scala +++ b/yarn/common/src/main/scala/org/apache/spark/scheduler/cluster/YarnClientSchedulerBackend.scala @@ -34,10 +34,11 @@ private[spark] class YarnClientSchedulerBackend( minRegisteredRatio = 0.8 } - var client: Client = null - var appId: ApplicationId = null - var stopping: Boolean = false - var totalExpectedExecutors = 0 + private var client: Client = null + private var appId: ApplicationId = null + private var stopping: Boolean = false + private var totalExpectedExecutors = 0 + private def isStopping(): Boolean = stopping /** * Create a Yarn client to submit an application to the ResourceManager. @@ -120,7 +121,8 @@ private[spark] class YarnClientSchedulerBackend( assert(client != null && appId != null, "Application has not been submitted yet!") val t = new Thread { override def run() { - val state = client.monitorApplication(appId, logApplicationReport = false) // blocking + val state = client.monitorApplication( + appId, logApplicationReport = false, shouldKeepMonitoring = isStopping) // blocking if (state == YarnApplicationState.FINISHED || state == YarnApplicationState.KILLED || state == YarnApplicationState.FAILED) { diff --git a/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/Client.scala b/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/Client.scala index 25ace7ca33ff5..0b43e6ee20538 100644 --- a/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/Client.scala +++ b/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/Client.scala @@ -123,7 +123,7 @@ private[spark] class Client( Option(report.getClientToAMToken).map(_.toString).getOrElse("") } -private[spark] object Client { +object Client { def main(argStrings: Array[String]) { if (!sys.props.contains("SPARK_SUBMIT")) { println("WARNING: This client is deprecated and will be removed in a " + @@ -143,5 +143,7 @@ private[spark] object Client { Console.err.println(e.getMessage) System.exit(1) } + + System.exit(0) } }