Skip to content

Commit

Permalink
Clean up log messages + variable naming in ClientBase
Browse files Browse the repository at this point in the history
  • Loading branch information
andrewor14 committed Sep 10, 2014
1 parent 8766d37 commit e4779b6
Show file tree
Hide file tree
Showing 4 changed files with 61 additions and 57 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -59,8 +59,8 @@ private[spark] class Client(
init(yarnConf)
start()

logInfo("Received cluster metric info from ResourceManager, number of NodeManagers: "
+ getYarnClusterMetrics.getNumNodeManagers)
logInfo("Requesting a new application from cluster with %d NodeManagers"
.format(yarnClient.getYarnClusterMetrics.getNumNodeManagers))

// Get a new application from our RM.
val newAppResponse = getNewApplication()
Expand Down Expand Up @@ -88,7 +88,7 @@ private[spark] class Client(
: ContainerLaunchContext = {
val containerContext = super.createContainerLaunchContext(newAppResponse)
val capability = Records.newRecord(classOf[Resource])
capability.setMemory(getAMMemory(newAppResponse) + memoryOverhead)
capability.setMemory(getAMMemory(newAppResponse) + amMemoryOverhead)
containerContext.setResource(capability)
containerContext
}
Expand Down Expand Up @@ -123,7 +123,7 @@ private[spark] class Client(
override def getAMMemory(newAppResponse: GetNewApplicationResponse): Int = {
val minResMemory = newAppResponse.getMinimumResourceCapability().getMemory()
val amMemory = ((args.amMemory / minResMemory) * minResMemory) +
((if ((args.amMemory % minResMemory) == 0) 0 else minResMemory) - memoryOverhead)
((if ((args.amMemory % minResMemory) == 0) 0 else minResMemory) - amMemoryOverhead)
amMemory
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,8 +40,11 @@ private[spark] class ClientArguments(args: Array[String], sparkConf: SparkConf)
var priority = 0

// Additional memory to allocate to containers
val memoryOverhead = sparkConf.getInt(
// For now, use driver's memory overhead as our AM container's memory overhead
val amMemoryOverhead = sparkConf.getInt(
"spark.yarn.driver.memoryOverhead", YarnSparkHadoopUtil.DEFAULT_MEMORY_OVERHEAD)
val executorMemoryOverhead = sparkConf.getInt(
"spark.yarn.executor.memoryOverhead", YarnSparkHadoopUtil.DEFAULT_MEMORY_OVERHEAD)

parseArgs(args.toList)
loadDefaultArgs()
Expand Down Expand Up @@ -70,8 +73,9 @@ private[spark] class ClientArguments(args: Array[String], sparkConf: SparkConf)
private def validateArgs(): Unit = {
Map[Boolean, String](
(numExecutors <= 0) -> "You must specify at least 1 executor!",
(amMemory <= memoryOverhead) -> s"AM memory must be > $memoryOverhead MB",
(executorMemory <= memoryOverhead) -> s"Executor memory must be > $memoryOverhead MB"
(amMemory <= amMemoryOverhead) -> s"AM memory must be > $amMemoryOverhead MB",
(executorMemory <= executorMemoryOverhead) ->
s"Executor memory must be > $executorMemoryOverhead MB"
).foreach { case (errorCondition, errorMessage) =>
if (errorCondition) {
throw new IllegalArgumentException(errorMessage + "\n" + getUsageMessage())
Expand All @@ -80,7 +84,7 @@ private[spark] class ClientArguments(args: Array[String], sparkConf: SparkConf)
}

private def parseArgs(inputArgs: List[String]): Unit = {
val userArgsBuffer: ArrayBuffer[String] = new ArrayBuffer[String]()
val userArgsBuffer = new ArrayBuffer[String]()
var args = inputArgs

while (!args.isEmpty) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,61 +51,59 @@ private[spark] trait ClientBase extends Logging {
protected val sparkConf: SparkConf
protected val yarnConf: YarnConfiguration
protected val credentials = UserGroupInformation.getCurrentUser.getCredentials
protected val memoryOverhead = args.memoryOverhead // MB
protected val amMemoryOverhead = args.amMemoryOverhead // MB
protected val executorMemoryOverhead = args.executorMemoryOverhead // MB
private val distCacheMgr = new ClientDistributedCacheManager()

/**
* Verify that we have not requested more resources than is available in the cluster.
* Fail fast if we have requested more resources per container than is available in the cluster.
*/
protected def verifyClusterResources(newAppResponse: GetNewApplicationResponse) = {
protected def verifyClusterResources(newAppResponse: GetNewApplicationResponse): Unit = {
val maxMem = newAppResponse.getMaximumResourceCapability().getMemory()
logInfo(s"Max memory capability of a single resource in this cluster: $maxMem MB")

// If we have requested more than the cluster maximum for a single resource, exit.
if (args.executorMemory > maxMem) {
val errorMessage =
"Required executor memory (%d MB), is above the max threshold (%d MB) of this cluster."
.format(args.executorMemory, maxMem)
logError(errorMessage)
throw new IllegalArgumentException(errorMessage)
logInfo("Verifying our application request has not exceeded the maximum " +
s"memory capability of the cluster ($maxMem MB per container)")
val executorMem = args.executorMemory + executorMemoryOverhead
if (executorMem > maxMem) {
throw new IllegalArgumentException(s"Required executor memory ($executorMem MB) " +
s"is above the max threshold ($maxMem MB) of this cluster!")
}
val amMem = getAMMemory(newAppResponse) + memoryOverhead
val amMem = getAMMemory(newAppResponse) + amMemoryOverhead
if (amMem > maxMem) {
val errorMessage =
"Required AM memory (%d MB) is above the max threshold (%d MB) of this cluster."
.format(amMem, maxMem)
logError(errorMessage)
throw new IllegalArgumentException(errorMessage)
throw new IllegalArgumentException(s"Required AM memory ($amMem MB) " +
s"is above the max threshold ($maxMem MB) of this cluster!")
}

// We could add checks to make sure the entire cluster has enough resources but that involves
// getting all the node reports and computing ourselves.
}

/**
* Copy the file to a remote file system if needed. Exposed for testing.
* Copy the given file to a remote file system if needed. This is used for preparing
* resources for launching the ApplicationMaster container. Exposed for testing.
*/
def copyFileToRemote(
dstDir: Path,
originalPath: Path,
destDir: Path,
srcPath: Path,
replication: Short,
setPerms: Boolean = false): Path = {
val fs = FileSystem.get(hadoopConf)
val remoteFs = originalPath.getFileSystem(hadoopConf)
var newPath = originalPath
if (!compareFs(remoteFs, fs)) {
newPath = new Path(dstDir, originalPath.getName())
logInfo("Uploading " + originalPath + " to " + newPath)
FileUtil.copy(remoteFs, originalPath, fs, newPath, false, hadoopConf)
fs.setReplication(newPath, replication)
if (setPerms) fs.setPermission(newPath, new FsPermission(APP_FILE_PERMISSION))
val destFs = destDir.getFileSystem(hadoopConf)
val srcFs = srcPath.getFileSystem(hadoopConf)
var destPath = srcPath
if (!compareFs(srcFs, destFs)) {
destPath = new Path(destDir, srcPath.getName())
logInfo(s"Uploading resource $srcPath -> $destPath")
FileUtil.copy(srcFs, srcPath, destFs, destPath, false, hadoopConf)
destFs.setReplication(destPath, replication)
if (setPerms) {
destFs.setPermission(destPath, new FsPermission(APP_FILE_PERMISSION))
}
} else {
logInfo(s"Source and destination file systems are the same. Not copying $srcPath")
}
// Resolve any symlinks in the URI path so using a "current" symlink to point to a specific
// version shows the specific version in the distributed cache configuration
val qualPath = fs.makeQualified(newPath)
val fc = FileContext.getFileContext(qualPath.toUri(), hadoopConf)
val destPath = fc.resolvePath(qualPath)
destPath
val qualifiedDestPath = destFs.makeQualified(destPath)
val fc = FileContext.getFileContext(qualifiedDestPath.toUri(), hadoopConf)
fc.resolvePath(qualifiedDestPath)
}

/**
Expand All @@ -131,8 +129,9 @@ private[spark] trait ClientBase extends Logging {
* Exposed for testing.
*/
def prepareLocalResources(appStagingDir: String): HashMap[String, LocalResource] = {
// Upload Spark and the application JAR to the remote file system if necessary. Add them as
// local resources to the application master.
logInfo("Preparing resources for our AM container")
// Upload Spark and the application JAR to the remote file system if necessary,
// and add them as local resources to the application master.
val fs = FileSystem.get(hadoopConf)
val dst = new Path(fs.getHomeDirectory(), appStagingDir)
val nns = getNameNodesToAccess(sparkConf) + dst
Expand Down Expand Up @@ -215,7 +214,6 @@ private[spark] trait ClientBase extends Logging {
}
}
}
logInfo("Prepared Local resources " + localResources)
if (cachedSecondaryJarLinks.nonEmpty) {
sparkConf.set(CONF_SPARK_YARN_SECONDARY_JARS, cachedSecondaryJarLinks.mkString(","))
}
Expand All @@ -228,7 +226,7 @@ private[spark] trait ClientBase extends Logging {
* Set up the environment for launching our ApplicationMaster container.
*/
private def setupLaunchEnv(stagingDir: String): HashMap[String, String] = {
logInfo("Setting up the launch environment")
logInfo("Setting up the launch environment for our AM container")
val env = new HashMap[String, String]()
val extraCp = sparkConf.getOption("spark.driver.extraClassPath")
populateClasspath(args, yarnConf, sparkConf, env, extraCp)
Expand Down Expand Up @@ -298,7 +296,7 @@ private[spark] trait ClientBase extends Logging {
*/
protected def createContainerLaunchContext(newAppResponse: GetNewApplicationResponse)
: ContainerLaunchContext = {
logInfo("Setting up container launch context")
logInfo("Setting up container launch context for our AM")

val appId = newAppResponse.getApplicationId
val appStagingDir = getAppStagingDir(appId)
Expand Down Expand Up @@ -392,11 +390,13 @@ private[spark] trait ClientBase extends Logging {

logDebug("===============================================================================")
logDebug("Yarn AM launch context:")
logDebug(s" user class: ${args.userClass}")
logDebug(" env:")
launchEnv.foreach { case (k, v) => logDebug(s" $k -> $v") }
logDebug(" command:")
logDebug(s" ${printableCommands.mkString(" ")}")
logDebug(s" user class: ${args.userClass}")
logDebug(" env:")
launchEnv.foreach { case (k, v) => logDebug(s" $k -> $v") }
logDebug(" resources:")
localResources.foreach { case (k, v) => logDebug(s" $k -> $v")}
logDebug(" command:")
logDebug(s" ${printableCommands.mkString(" ")}")
logDebug("===============================================================================")

// send the acl settings into YARN to control who has access via YARN interfaces
Expand Down Expand Up @@ -522,7 +522,7 @@ private[spark] object ClientBase extends Logging {
* This method first looks in the SparkConf object for the CONF_SPARK_JAR key, and in the
* user environment if that is not found (for backwards compatibility).
*/
private def sparkJar(conf: SparkConf) = {
private def sparkJar(conf: SparkConf): String = {
if (conf.contains(CONF_SPARK_JAR)) {
conf.get(CONF_SPARK_JAR)
} else if (System.getenv(ENV_SPARK_JAR) != null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,8 +65,8 @@ private[spark] class Client(
yarnClient.init(yarnConf)
yarnClient.start()

logInfo("Received cluster metric info from ResourceManager, number of NodeManagers: "
+ yarnClient.getYarnClusterMetrics.getNumNodeManagers)
logInfo("Requesting a new application from cluster with %d NodeManagers"
.format(yarnClient.getYarnClusterMetrics.getNumNodeManagers))

// Get a new application from our RM.
val newApp = yarnClient.createApplication()
Expand Down Expand Up @@ -99,7 +99,7 @@ private[spark] class Client(
appContext.setAMContainerSpec(containerContext)
appContext.setApplicationType("SPARK")
val capability = Records.newRecord(classOf[Resource])
capability.setMemory(args.amMemory + memoryOverhead)
capability.setMemory(args.amMemory + amMemoryOverhead)
appContext.setResource(capability)
appContext
}
Expand Down

0 comments on commit e4779b6

Please sign in to comment.