diff --git a/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala b/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala index 9ad700635810a..86d6e69982f30 100644 --- a/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala +++ b/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala @@ -33,6 +33,7 @@ import org.apache.hadoop.yarn.api.records._ import org.apache.hadoop.yarn.conf.YarnConfiguration import org.apache.spark.{Logging, SecurityManager, SparkConf, SparkContext, SparkEnv} +import org.apache.spark.SparkException import org.apache.spark.deploy.SparkHadoopUtil import org.apache.spark.deploy.history.HistoryServer import org.apache.spark.scheduler.cluster.CoarseGrainedSchedulerBackend @@ -136,13 +137,12 @@ private[spark] class ApplicationMaster(args: ApplicationMasterArguments, runExecutorLauncher(securityMgr) } } catch { - case e: Throwable => { + case e: Exception => // catch everything else if not specifically handled logError("Uncaught exception: ", e) finish(FinalApplicationStatus.FAILED, ApplicationMaster.EXIT_UNCAUGHT_EXCEPTION, "Uncaught exception: " + e.getMessage()) - } } exitCode } @@ -213,6 +213,7 @@ private[spark] class ApplicationMaster(args: ApplicationMasterArguments, private def runDriver(securityMgr: SecurityManager): Unit = { addAmIpFilter() + setupSystemSecurityManager() userClassThread = startUserClass() // This a bit hacky, but we need to wait until the spark.driver.port property has @@ -258,7 +259,7 @@ private[spark] class ApplicationMaster(args: ApplicationMasterArguments, val t = new Thread { override def run() { var failureCount = 0 - while (!finished && !Thread.currentThread().isInterrupted()) { + while (!finished) { try { if (allocator.getNumExecutorsFailed >= maxNumExecutorFailures) { finish(FinalApplicationStatus.FAILED, @@ -328,7 +329,7 @@ private[spark] class ApplicationMaster(args: ApplicationMasterArguments, sparkContextRef.synchronized { var count = 0 val waitTime = 10000L - val numTries = sparkConf.getInt("spark.yarn.ApplicationMaster.waitTries", 10) + val numTries = sparkConf.getInt("spark.yarn.applicationMaster.waitTries", 10) while (sparkContextRef.get() == null && count < numTries && !finished) { logInfo("Waiting for spark context initialization ... " + count) count = count + 1 @@ -355,7 +356,7 @@ private[spark] class ApplicationMaster(args: ApplicationMasterArguments, // spark driver should already be up since it launched us, but we don't want to // wait forever, so wait 100 seconds max to match the cluster mode setting. // Leave this config unpublished for now. - val numTries = sparkConf.getInt("spark.yarn.ApplicationMaster.client.waitTries", 1000) + val numTries = sparkConf.getInt("spark.yarn.applicationMaster.waitTries", 1000) while (!driverUp && !finished && count < numTries) { try { @@ -373,7 +374,7 @@ private[spark] class ApplicationMaster(args: ApplicationMasterArguments, } if (!driverUp) { - throw new Exception("Failed to connect to driver!") + throw new SparkException("Failed to connect to driver!") } sparkConf.set("spark.driver.host", driverHost) @@ -403,55 +404,59 @@ private[spark] class ApplicationMaster(args: ApplicationMasterArguments, actor ! AddWebUIFilter(amFilter, params, proxyBase) } } + ' + /** + * This system security manager applies to the entire process. + * It's main purpose is to handle the case if the user code does a System.exit. + * This allows us to catch that and properly set the YARN application status and + * cleanup if needed. + */ + private def setupSystemSecurityManager() = { + try { + var stopped = false + System.setSecurityManager(new java.lang.SecurityManager() { + override def checkExit(paramInt: Int) { + if (!stopped) { + logInfo("In securityManager checkExit, exit code: " + paramInt) + if (paramInt == 0) { + finish(FinalApplicationStatus.SUCCEEDED, ApplicationMaster.EXIT_SUCCESS) + } else { + finish(FinalApplicationStatus.FAILED, + paramInt, + "User class exited with non-zero exit code") + } + stopped = true + } + } + // required for the checkExit to work properly + override def checkPermission(perm: java.security.Permission): Unit = { + } + }) + } + catch { + case e: SecurityException => + finish(FinalApplicationStatus.FAILED, + ApplicationMaster.EXIT_SECURITY, + "Error in setSecurityManager") + logError("Error in setSecurityManager:", e) + } + } /** - * Start the user class, which contains the spark driver. + * Start the user class, which contains the spark driver, in a separate Thread. * If the main routine exits cleanly or exits with System.exit(0) we * assume it was successful, for all other cases we assume failure. + * + * Returns the user thread that was started. */ private def startUserClass(): Thread = { logInfo("Starting the user JAR in a separate Thread") System.setProperty("spark.executor.instances", args.numExecutors.toString) - var stopped = false val mainMethod = Class.forName(args.userClass, false, Thread.currentThread.getContextClassLoader).getMethod("main", classOf[Array[String]]) val userThread = new Thread { override def run() { - - try { - // Note this security manager applies to the entire process, not - // just this thread. It's here to handle the case if the user code - // does System.exit - System.setSecurityManager(new java.lang.SecurityManager() { - override def checkExit(paramInt: Int) { - if (!stopped) { - logInfo("In securityManager checkExit, exit code: " + paramInt) - if (paramInt == 0) { - finish(FinalApplicationStatus.SUCCEEDED, ApplicationMaster.EXIT_SUCCESS) - } else { - finish(FinalApplicationStatus.FAILED, - paramInt, - "User class exited with non-zero exit code") - } - stopped = true - } - } - - // required for the checkExit to work properly - override def checkPermission(perm: java.security.Permission): Unit = { - } - }) - } - catch { - case e: SecurityException => { - finish(FinalApplicationStatus.FAILED, - ApplicationMaster.EXIT_SECURITY, - "Error in setSecurityManager") - logError("Error in setSecurityManager:", e) - } - } - try { val mainArgs = new Array[String](args.userArgs.size) args.userArgs.copyToArray(mainArgs, 0, args.userArgs.size) @@ -463,14 +468,12 @@ private[spark] class ApplicationMaster(args: ApplicationMasterArguments, e.getCause match { case _: InterruptedException => // Reporter thread can interrupt to stop user class - - case e: Throwable => { + case e: Throwable => finish(FinalApplicationStatus.FAILED, ApplicationMaster.EXIT_EXCEPTION_USER_CLASS, "User class threw exception: " + e.getMessage) // re-throw to get it logged throw e - } } } } @@ -512,13 +515,13 @@ object ApplicationMaster extends Logging { val SHUTDOWN_HOOK_PRIORITY: Int = 30 // exit codes for different causes, no reason behind the values - val EXIT_SUCCESS = 0 - val EXIT_UNCAUGHT_EXCEPTION = 10 - val EXIT_MAX_EXECUTOR_FAILURES = 11 - val EXIT_REPORTER_FAILURE = 12 - val EXIT_SC_NOT_INITED = 13 - val EXIT_SECURITY = 14 - val EXIT_EXCEPTION_USER_CLASS = 15 + private val EXIT_SUCCESS = 0 + private val EXIT_UNCAUGHT_EXCEPTION = 10 + private val EXIT_MAX_EXECUTOR_FAILURES = 11 + private val EXIT_REPORTER_FAILURE = 12 + private val EXIT_SC_NOT_INITED = 13 + private val EXIT_SECURITY = 14 + private val EXIT_EXCEPTION_USER_CLASS = 15 private var master: ApplicationMaster = _