Skip to content

Commit

Permalink
rework
Browse files Browse the repository at this point in the history
  • Loading branch information
tgravescs committed Oct 3, 2014
1 parent 85f1901 commit 24c98e3
Showing 1 changed file with 55 additions and 52 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ import org.apache.hadoop.yarn.api.records._
import org.apache.hadoop.yarn.conf.YarnConfiguration

import org.apache.spark.{Logging, SecurityManager, SparkConf, SparkContext, SparkEnv}
import org.apache.spark.SparkException
import org.apache.spark.deploy.SparkHadoopUtil
import org.apache.spark.deploy.history.HistoryServer
import org.apache.spark.scheduler.cluster.CoarseGrainedSchedulerBackend
Expand Down Expand Up @@ -136,13 +137,12 @@ private[spark] class ApplicationMaster(args: ApplicationMasterArguments,
runExecutorLauncher(securityMgr)
}
} catch {
case e: Throwable => {
case e: Exception =>
// catch everything else if not specifically handled
logError("Uncaught exception: ", e)
finish(FinalApplicationStatus.FAILED,
ApplicationMaster.EXIT_UNCAUGHT_EXCEPTION,
"Uncaught exception: " + e.getMessage())
}
}
exitCode
}
Expand Down Expand Up @@ -213,6 +213,7 @@ private[spark] class ApplicationMaster(args: ApplicationMasterArguments,

private def runDriver(securityMgr: SecurityManager): Unit = {
addAmIpFilter()
setupSystemSecurityManager()
userClassThread = startUserClass()

// This a bit hacky, but we need to wait until the spark.driver.port property has
Expand Down Expand Up @@ -258,7 +259,7 @@ private[spark] class ApplicationMaster(args: ApplicationMasterArguments,
val t = new Thread {
override def run() {
var failureCount = 0
while (!finished && !Thread.currentThread().isInterrupted()) {
while (!finished) {
try {
if (allocator.getNumExecutorsFailed >= maxNumExecutorFailures) {
finish(FinalApplicationStatus.FAILED,
Expand Down Expand Up @@ -328,7 +329,7 @@ private[spark] class ApplicationMaster(args: ApplicationMasterArguments,
sparkContextRef.synchronized {
var count = 0
val waitTime = 10000L
val numTries = sparkConf.getInt("spark.yarn.ApplicationMaster.waitTries", 10)
val numTries = sparkConf.getInt("spark.yarn.applicationMaster.waitTries", 10)
while (sparkContextRef.get() == null && count < numTries && !finished) {
logInfo("Waiting for spark context initialization ... " + count)
count = count + 1
Expand All @@ -355,7 +356,7 @@ private[spark] class ApplicationMaster(args: ApplicationMasterArguments,
// spark driver should already be up since it launched us, but we don't want to
// wait forever, so wait 100 seconds max to match the cluster mode setting.
// Leave this config unpublished for now.
val numTries = sparkConf.getInt("spark.yarn.ApplicationMaster.client.waitTries", 1000)
val numTries = sparkConf.getInt("spark.yarn.applicationMaster.waitTries", 1000)

while (!driverUp && !finished && count < numTries) {
try {
Expand All @@ -373,7 +374,7 @@ private[spark] class ApplicationMaster(args: ApplicationMasterArguments,
}

if (!driverUp) {
throw new Exception("Failed to connect to driver!")
throw new SparkException("Failed to connect to driver!")
}

sparkConf.set("spark.driver.host", driverHost)
Expand Down Expand Up @@ -403,55 +404,59 @@ private[spark] class ApplicationMaster(args: ApplicationMasterArguments,
actor ! AddWebUIFilter(amFilter, params, proxyBase)
}
}
'
/**
* This system security manager applies to the entire process.
* It's main purpose is to handle the case if the user code does a System.exit.
* This allows us to catch that and properly set the YARN application status and
* cleanup if needed.
*/
private def setupSystemSecurityManager() = {
try {
var stopped = false
System.setSecurityManager(new java.lang.SecurityManager() {
override def checkExit(paramInt: Int) {
if (!stopped) {
logInfo("In securityManager checkExit, exit code: " + paramInt)
if (paramInt == 0) {
finish(FinalApplicationStatus.SUCCEEDED, ApplicationMaster.EXIT_SUCCESS)
} else {
finish(FinalApplicationStatus.FAILED,
paramInt,
"User class exited with non-zero exit code")
}
stopped = true
}
}
// required for the checkExit to work properly
override def checkPermission(perm: java.security.Permission): Unit = {
}
})
}
catch {
case e: SecurityException =>
finish(FinalApplicationStatus.FAILED,
ApplicationMaster.EXIT_SECURITY,
"Error in setSecurityManager")
logError("Error in setSecurityManager:", e)
}
}

/**
* Start the user class, which contains the spark driver.
* Start the user class, which contains the spark driver, in a separate Thread.
* If the main routine exits cleanly or exits with System.exit(0) we
* assume it was successful, for all other cases we assume failure.
*
* Returns the user thread that was started.
*/
private def startUserClass(): Thread = {
logInfo("Starting the user JAR in a separate Thread")
System.setProperty("spark.executor.instances", args.numExecutors.toString)
var stopped = false
val mainMethod = Class.forName(args.userClass, false,
Thread.currentThread.getContextClassLoader).getMethod("main", classOf[Array[String]])

val userThread = new Thread {
override def run() {

try {
// Note this security manager applies to the entire process, not
// just this thread. It's here to handle the case if the user code
// does System.exit
System.setSecurityManager(new java.lang.SecurityManager() {
override def checkExit(paramInt: Int) {
if (!stopped) {
logInfo("In securityManager checkExit, exit code: " + paramInt)
if (paramInt == 0) {
finish(FinalApplicationStatus.SUCCEEDED, ApplicationMaster.EXIT_SUCCESS)
} else {
finish(FinalApplicationStatus.FAILED,
paramInt,
"User class exited with non-zero exit code")
}
stopped = true
}
}

// required for the checkExit to work properly
override def checkPermission(perm: java.security.Permission): Unit = {
}
})
}
catch {
case e: SecurityException => {
finish(FinalApplicationStatus.FAILED,
ApplicationMaster.EXIT_SECURITY,
"Error in setSecurityManager")
logError("Error in setSecurityManager:", e)
}
}

try {
val mainArgs = new Array[String](args.userArgs.size)
args.userArgs.copyToArray(mainArgs, 0, args.userArgs.size)
Expand All @@ -463,14 +468,12 @@ private[spark] class ApplicationMaster(args: ApplicationMasterArguments,
e.getCause match {
case _: InterruptedException =>
// Reporter thread can interrupt to stop user class

case e: Throwable => {
case e: Throwable =>
finish(FinalApplicationStatus.FAILED,
ApplicationMaster.EXIT_EXCEPTION_USER_CLASS,
"User class threw exception: " + e.getMessage)
// re-throw to get it logged
throw e
}
}
}
}
Expand Down Expand Up @@ -512,13 +515,13 @@ object ApplicationMaster extends Logging {
val SHUTDOWN_HOOK_PRIORITY: Int = 30

// exit codes for different causes, no reason behind the values
val EXIT_SUCCESS = 0
val EXIT_UNCAUGHT_EXCEPTION = 10
val EXIT_MAX_EXECUTOR_FAILURES = 11
val EXIT_REPORTER_FAILURE = 12
val EXIT_SC_NOT_INITED = 13
val EXIT_SECURITY = 14
val EXIT_EXCEPTION_USER_CLASS = 15
private val EXIT_SUCCESS = 0
private val EXIT_UNCAUGHT_EXCEPTION = 10
private val EXIT_MAX_EXECUTOR_FAILURES = 11
private val EXIT_REPORTER_FAILURE = 12
private val EXIT_SC_NOT_INITED = 13
private val EXIT_SECURITY = 14
private val EXIT_EXCEPTION_USER_CLASS = 15

private var master: ApplicationMaster = _

Expand Down

0 comments on commit 24c98e3

Please sign in to comment.