From f61071b6fc63c976d1fc4dbee3473e4e76f12539 Mon Sep 17 00:00:00 2001 From: Kousuke Saruta Date: Sat, 9 May 2015 19:05:30 +0900 Subject: [PATCH 1/3] Fixed cleanup problem --- .../org/apache/spark/deploy/yarn/Client.scala | 25 +++++++++++++++++-- 1 file changed, 23 insertions(+), 2 deletions(-) diff --git a/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala b/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala index 20ecaf092e3f8..99f7c3f2d218d 100644 --- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala +++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala @@ -17,7 +17,7 @@ package org.apache.spark.deploy.yarn -import java.io.{ByteArrayInputStream, DataInputStream, File, FileOutputStream} +import java.io.{ByteArrayInputStream, DataInputStream, File, FileOutputStream, IOException} import java.net.{InetAddress, UnknownHostException, URI, URISyntaxException} import java.nio.ByteBuffer import java.security.PrivilegedExceptionAction @@ -515,7 +515,28 @@ private[spark] class Client( logInfo("Setting up container launch context for our AM") val appId = newAppResponse.getApplicationId val appStagingDir = getAppStagingDir(appId) - val localResources = prepareLocalResources(appStagingDir) + var localResources: Map[String, LocalResource] = null + try { + localResources = prepareLocalResources(appStagingDir) + } catch { + case e: Throwable => + var stagingDirPath: Path = null + try { + val preserveFiles = sparkConf.getBoolean("spark.yarn.preserve.staging.files", false) + if (!preserveFiles) { + stagingDirPath = new Path(appStagingDir) + logInfo("Deleting staging directory " + stagingDirPath) + val fs = FileSystem.get(hadoopConf) + fs.delete(stagingDirPath, true) + } + } catch { + case ioe: IOException => + logError("Failed to cleanup staging dir " + stagingDirPath, ioe) + } finally { + throw e + } + } + val launchEnv = setupLaunchEnv(appStagingDir) val amContainer = Records.newRecord(classOf[ContainerLaunchContext]) amContainer.setLocalResources(localResources) From 882f921215cd063015322e9498ef3614362633f7 Mon Sep 17 00:00:00 2001 From: Kousuke Saruta Date: Wed, 13 May 2015 11:41:09 +0900 Subject: [PATCH 2/3] Wrapped Client#submitApplication with try/catch blocks in order to delete resources on error --- .../org/apache/spark/deploy/yarn/Client.scala | 92 +++++++++---------- 1 file changed, 46 insertions(+), 46 deletions(-) diff --git a/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala b/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala index f77eb55c87a98..195c35a592f9d 100644 --- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala +++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala @@ -91,30 +91,51 @@ private[spark] class Client( * available in the alpha API. */ def submitApplication(): ApplicationId = { - // Setup the credentials before doing anything else, so we have don't have issues at any point. - setupCredentials() - yarnClient.init(yarnConf) - yarnClient.start() - - logInfo("Requesting a new application from cluster with %d NodeManagers" - .format(yarnClient.getYarnClusterMetrics.getNumNodeManagers)) - - // Get a new application from our RM - val newApp = yarnClient.createApplication() - val newAppResponse = newApp.getNewApplicationResponse() - val appId = newAppResponse.getApplicationId() - - // Verify whether the cluster has enough resources for our AM - verifyClusterResources(newAppResponse) - - // Set up the appropriate contexts to launch our AM - val containerContext = createContainerLaunchContext(newAppResponse) - val appContext = createApplicationSubmissionContext(newApp, containerContext) - - // Finally, submit and monitor the application - logInfo(s"Submitting application ${appId.getId} to ResourceManager") - yarnClient.submitApplication(appContext) - appId + var appId: ApplicationId = null + try { + // Setup the credentials before doing anything else, so we have don't have issues at any point. + setupCredentials() + yarnClient.init(yarnConf) + yarnClient.start() + + logInfo("Requesting a new application from cluster with %d NodeManagers" + .format(yarnClient.getYarnClusterMetrics.getNumNodeManagers)) + + // Get a new application from our RM + val newApp = yarnClient.createApplication() + val newAppResponse = newApp.getNewApplicationResponse() + appId = newAppResponse.getApplicationId() + + // Verify whether the cluster has enough resources for our AM + verifyClusterResources(newAppResponse) + + // Set up the appropriate contexts to launch our AM + val containerContext = createContainerLaunchContext(newAppResponse) + val appContext = createApplicationSubmissionContext(newApp, containerContext) + + // Finally, submit and monitor the application + logInfo(s"Submitting application ${appId.getId} to ResourceManager") + yarnClient.submitApplication(appContext) + appId + } catch { + case e: Throwable => + if (appId != null) { + val appStagingDir = getAppStagingDir(appId) + try { + val preserveFiles = sparkConf.getBoolean("spark.yarn.preserve.staging.files", false) + val stagingDirPath = new Path(appStagingDir) + val fs = FileSystem.get(hadoopConf) + if (!preserveFiles && fs.exists(stagingDirPath)) { + logInfo("Deleting staging directory " + stagingDirPath) + fs.delete(stagingDirPath, true) + } + } catch { + case ioe: IOException => + logWarning("Failed to cleanup staging dir " + appStagingDir, ioe) + } + } + throw e + } } /** @@ -526,28 +547,7 @@ private[spark] class Client( logInfo("Setting up container launch context for our AM") val appId = newAppResponse.getApplicationId val appStagingDir = getAppStagingDir(appId) - var localResources: Map[String, LocalResource] = null - try { - localResources = prepareLocalResources(appStagingDir) - } catch { - case e: Throwable => - var stagingDirPath: Path = null - try { - val preserveFiles = sparkConf.getBoolean("spark.yarn.preserve.staging.files", false) - if (!preserveFiles) { - stagingDirPath = new Path(appStagingDir) - logInfo("Deleting staging directory " + stagingDirPath) - val fs = FileSystem.get(hadoopConf) - fs.delete(stagingDirPath, true) - } - } catch { - case ioe: IOException => - logError("Failed to cleanup staging dir " + stagingDirPath, ioe) - } finally { - throw e - } - } - + val localResources = prepareLocalResources(appStagingDir) val launchEnv = setupLaunchEnv(appStagingDir) val amContainer = Records.newRecord(classOf[ContainerLaunchContext]) amContainer.setLocalResources(localResources) From caef9f40d135136b3c163da58922192a6443a935 Mon Sep 17 00:00:00 2001 From: Kousuke Saruta Date: Wed, 13 May 2015 11:51:40 +0900 Subject: [PATCH 3/3] Fixed style --- yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala b/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala index 195c35a592f9d..7e023f2d92578 100644 --- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala +++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala @@ -93,7 +93,8 @@ private[spark] class Client( def submitApplication(): ApplicationId = { var appId: ApplicationId = null try { - // Setup the credentials before doing anything else, so we have don't have issues at any point. + // Setup the credentials before doing anything else, + // so we have don't have issues at any point. setupCredentials() yarnClient.init(yarnConf) yarnClient.start()