Skip to content

Commit

Permalink
Address review comments
Browse files Browse the repository at this point in the history
  • Loading branch information
andrewor14 committed Sep 17, 2014
1 parent 45ccdea commit 1590141
Show file tree
Hide file tree
Showing 6 changed files with 48 additions and 27 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -125,7 +125,7 @@ private[spark] class Client(
Option(report.getClientToken).getOrElse("")
}

private[spark] object Client {
object Client {
def main(argStrings: Array[String]) {
if (!sys.props.contains("SPARK_SUBMIT")) {
println("WARNING: This client is deprecated and will be removed in a " +
Expand All @@ -145,5 +145,7 @@ private[spark] object Client {
Console.err.println(e.getMessage)
System.exit(1)
}

System.exit(0)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -47,22 +47,20 @@ private[spark] class ClientArguments(args: Array[String], sparkConf: SparkConf)
"spark.yarn.executor.memoryOverhead", YarnSparkHadoopUtil.DEFAULT_MEMORY_OVERHEAD)

parseArgs(args.toList)
loadDefaultArgs()
loadEnvironmentArgs()
validateArgs()

/** Load any default arguments provided through environment variables and Spark properties. */
private def loadDefaultArgs(): Unit = {
private def loadEnvironmentArgs(): Unit = {
// For backward compatibility, SPARK_YARN_DIST_{ARCHIVES/FILES} should be resolved to hdfs://,
// while spark.yarn.dist.{archives/files} should be resolved to file:// (SPARK-2051).
files = Option(files).orElse(sys.env.get("SPARK_YARN_DIST_FILES")).orNull
files = Option(files)
.orElse(sparkConf.getOption("spark.yarn.dist.files"))
.map(p => Utils.resolveURIs(p))
.orElse(sys.env.get("SPARK_YARN_DIST_FILES"))
.orElse(sparkConf.getOption("spark.yarn.dist.files").map(p => Utils.resolveURIs(p)))
.orNull
archives = Option(archives).orElse(sys.env.get("SPARK_YARN_DIST_ARCHIVES")).orNull
archives = Option(archives)
.orElse(sparkConf.getOption("spark.yarn.dist.archives"))
.map(p => Utils.resolveURIs(p))
.orElse(sys.env.get("SPARK_YARN_DIST_ARCHIVES"))
.orElse(sparkConf.getOption("spark.yarn.dist.archives").map(p => Utils.resolveURIs(p)))
.orNull
}

Expand All @@ -71,6 +69,7 @@ private[spark] class ClientArguments(args: Array[String], sparkConf: SparkConf)
* This is intended to be called only after the provided arguments have been parsed.
*/
private def validateArgs(): Unit = {
// TODO: memory checks are outdated (SPARK-3476)
Map[Boolean, String](
(numExecutors <= 0) -> "You must specify at least 1 executor!",
(amMemory <= amMemoryOverhead) -> s"AM memory must be > $amMemoryOverhead MB",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,9 @@ import org.apache.hadoop.yarn.api.protocolrecords._
import org.apache.hadoop.yarn.api.records._
import org.apache.hadoop.yarn.conf.YarnConfiguration
import org.apache.hadoop.yarn.util.Records

import org.apache.spark.{Logging, SecurityManager, SparkConf, SparkContext, SparkException}
import org.apache.spark.util.Utils

/**
* The entry point (starting in Client#main() and Client#run()) for launching Spark on YARN.
Expand Down Expand Up @@ -217,7 +219,6 @@ private[spark] trait ClientBase extends Logging {
sparkConf.set(CONF_SPARK_YARN_SECONDARY_JARS, cachedSecondaryJarLinks.mkString(","))
}

UserGroupInformation.getCurrentUser().addCredentials(credentials)
localResources
}

Expand Down Expand Up @@ -360,9 +361,9 @@ private[spark] trait ClientBase extends Logging {
}
val amClass =
if (isLaunchingDriver) {
classOf[ApplicationMaster].getName()
Utils.getFormattedClassName(ApplicationMaster)
} else {
classOf[ApplicationMaster].getName().replace("ApplicationMaster", "ExecutorLauncher")
Utils.getFormattedClassName(ExecutorLauncher)
}
val userArgs = args.userArgs.flatMap { arg =>
Seq("--arg", YarnSparkHadoopUtil.escapeForShell(arg))
Expand Down Expand Up @@ -400,6 +401,8 @@ private[spark] trait ClientBase extends Logging {
val securityManager = new SecurityManager(sparkConf)
amContainer.setApplicationACLs(YarnSparkHadoopUtil.getApplicationAclsForYarn(securityManager))
setupSecurityToken(amContainer)
UserGroupInformation.getCurrentUser().addCredentials(credentials)

amContainer
}

Expand All @@ -409,22 +412,24 @@ private[spark] trait ClientBase extends Logging {
*
* @param returnOnRunning Whether to also return the application state when it is RUNNING.
* @param logApplicationReport Whether to log details of the application report every iteration.
* @param shouldKeepMonitoring The condition to keep monitoring.
* @return state of the application, one of FINISHED, FAILED, KILLED, and RUNNING.
*/
def monitorApplication(
appId: ApplicationId,
returnOnRunning: Boolean = false,
logApplicationReport: Boolean = true): YarnApplicationState = {
logApplicationReport: Boolean = true,
shouldKeepMonitoring: () => Boolean = () => true): YarnApplicationState = {
val interval = sparkConf.getLong("spark.yarn.report.interval", 1000)
while (true) {
var firstIteration = true
while (shouldKeepMonitoring()) {
Thread.sleep(interval)
val report = getApplicationReport(appId)
val state = report.getYarnApplicationState

if (logApplicationReport) {
logInfo(s"Application report from ResourceManager for application ${appId.getId} " +
s"(state: $state)")
logDebug("\n" +
logInfo(s"Application report from ResourceManager for app ${appId.getId} (state: $state)")
val details = "\n" +
s"\t full application identifier: $appId\n" +
s"\t clientToken: ${getClientToken(report)}\n" +
s"\t appDiagnostics: ${report.getDiagnostics}\n" +
Expand All @@ -435,7 +440,14 @@ private[spark] trait ClientBase extends Logging {
s"\t yarnAppState: $state\n" +
s"\t distributedFinalState: ${report.getFinalApplicationStatus}\n" +
s"\t appTrackingUrl: ${report.getTrackingUrl}\n" +
s"\t appUser: ${report.getUser}")
s"\t appUser: ${report.getUser}"

// Log report details every iteration if DEBUG is enabled, otherwise only the first
if (log.isDebugEnabled) {
logDebug(details)
} else if (firstIteration) {
logInfo(details)
}
}

if (state == YarnApplicationState.FINISHED ||
Expand All @@ -447,6 +459,8 @@ private[spark] trait ClientBase extends Logging {
if (returnOnRunning && state == YarnApplicationState.RUNNING) {
return state
}

firstIteration = false
}
// Never reached, but keeps compiler happy
throw new SparkException("While loop is depleted! This should never happen...")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ import org.apache.spark.util.Utils
/**
* Contains util methods to interact with Hadoop from spark.
*/
private[spark] class YarnSparkHadoopUtil extends SparkHadoopUtil {
class YarnSparkHadoopUtil extends SparkHadoopUtil {

override def transferCredentials(source: UserGroupInformation, dest: UserGroupInformation) {
dest.addCredentials(source.getCredentials())
Expand Down Expand Up @@ -83,7 +83,7 @@ private[spark] class YarnSparkHadoopUtil extends SparkHadoopUtil {

}

private[spark] object YarnSparkHadoopUtil {
object YarnSparkHadoopUtil {
// Additional memory overhead - in mb.
val DEFAULT_MEMORY_OVERHEAD = 384

Expand Down Expand Up @@ -136,7 +136,9 @@ private[spark] object YarnSparkHadoopUtil {
m.appendReplacement(sb, Matcher.quoteReplacement(replace))
}
m.appendTail(sb)
env(parts(0)) = sb.toString
// This treats the environment variable as path variable delimited by `File.pathSeparator`
// This is kept for backward compatibility and consistency with Hadoop's behavior
addPathToEnvironment(env, parts(0), sb.toString)
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,10 +34,11 @@ private[spark] class YarnClientSchedulerBackend(
minRegisteredRatio = 0.8
}

var client: Client = null
var appId: ApplicationId = null
var stopping: Boolean = false
var totalExpectedExecutors = 0
private var client: Client = null
private var appId: ApplicationId = null
private var stopping: Boolean = false
private var totalExpectedExecutors = 0
private def isStopping(): Boolean = stopping

/**
* Create a Yarn client to submit an application to the ResourceManager.
Expand Down Expand Up @@ -120,7 +121,8 @@ private[spark] class YarnClientSchedulerBackend(
assert(client != null && appId != null, "Application has not been submitted yet!")
val t = new Thread {
override def run() {
val state = client.monitorApplication(appId, logApplicationReport = false) // blocking
val state = client.monitorApplication(
appId, logApplicationReport = false, shouldKeepMonitoring = isStopping) // blocking
if (state == YarnApplicationState.FINISHED ||
state == YarnApplicationState.KILLED ||
state == YarnApplicationState.FAILED) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -123,7 +123,7 @@ private[spark] class Client(
Option(report.getClientToAMToken).map(_.toString).getOrElse("")
}

private[spark] object Client {
object Client {
def main(argStrings: Array[String]) {
if (!sys.props.contains("SPARK_SUBMIT")) {
println("WARNING: This client is deprecated and will be removed in a " +
Expand All @@ -143,5 +143,7 @@ private[spark] object Client {
Console.err.println(e.getMessage)
System.exit(1)
}

System.exit(0)
}
}

0 comments on commit 1590141

Please sign in to comment.