Skip to content

Commit

Permalink
[SPARK-7503] [YARN] Resources in .sparkStaging directory can't be cle…
Browse files Browse the repository at this point in the history
…aned up on error

When we run applications on YARN with cluster mode, uploaded resources on .sparkStaging directory can't be cleaned up in case of failure of uploading local resources.

You can see this issue by running following command.
```
bin/spark-submit --master yarn --deploy-mode cluster --class <someClassName> <non-existing-jar>
```

Author: Kousuke Saruta <[email protected]>

Closes apache#6026 from sarutak/delete-uploaded-resources-on-error and squashes the following commits:

caef9f4 [Kousuke Saruta] Fixed style
882f921 [Kousuke Saruta] Wrapped Client#submitApplication with try/catch blocks in order to delete resources on error
1786ca4 [Kousuke Saruta] Merge branch 'master' of https://github.com/apache/spark into delete-uploaded-resources-on-error
f61071b [Kousuke Saruta] Fixed cleanup problem
  • Loading branch information
sarutak authored and srowen committed May 15, 2015
1 parent fdf5bba commit c64ff80
Showing 1 changed file with 47 additions and 25 deletions.
72 changes: 47 additions & 25 deletions yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@

package org.apache.spark.deploy.yarn

import java.io.{ByteArrayInputStream, DataInputStream, File, FileOutputStream}
import java.io.{ByteArrayInputStream, DataInputStream, File, FileOutputStream, IOException}
import java.net.{InetAddress, UnknownHostException, URI, URISyntaxException}
import java.nio.ByteBuffer
import java.security.PrivilegedExceptionAction
Expand Down Expand Up @@ -91,30 +91,52 @@ private[spark] class Client(
* available in the alpha API.
*/
def submitApplication(): ApplicationId = {
// Setup the credentials before doing anything else, so we have don't have issues at any point.
setupCredentials()
yarnClient.init(yarnConf)
yarnClient.start()

logInfo("Requesting a new application from cluster with %d NodeManagers"
.format(yarnClient.getYarnClusterMetrics.getNumNodeManagers))

// Get a new application from our RM
val newApp = yarnClient.createApplication()
val newAppResponse = newApp.getNewApplicationResponse()
val appId = newAppResponse.getApplicationId()

// Verify whether the cluster has enough resources for our AM
verifyClusterResources(newAppResponse)

// Set up the appropriate contexts to launch our AM
val containerContext = createContainerLaunchContext(newAppResponse)
val appContext = createApplicationSubmissionContext(newApp, containerContext)

// Finally, submit and monitor the application
logInfo(s"Submitting application ${appId.getId} to ResourceManager")
yarnClient.submitApplication(appContext)
appId
var appId: ApplicationId = null
try {
// Setup the credentials before doing anything else,
// so we have don't have issues at any point.
setupCredentials()
yarnClient.init(yarnConf)
yarnClient.start()

logInfo("Requesting a new application from cluster with %d NodeManagers"
.format(yarnClient.getYarnClusterMetrics.getNumNodeManagers))

// Get a new application from our RM
val newApp = yarnClient.createApplication()
val newAppResponse = newApp.getNewApplicationResponse()
appId = newAppResponse.getApplicationId()

// Verify whether the cluster has enough resources for our AM
verifyClusterResources(newAppResponse)

// Set up the appropriate contexts to launch our AM
val containerContext = createContainerLaunchContext(newAppResponse)
val appContext = createApplicationSubmissionContext(newApp, containerContext)

// Finally, submit and monitor the application
logInfo(s"Submitting application ${appId.getId} to ResourceManager")
yarnClient.submitApplication(appContext)
appId
} catch {
case e: Throwable =>
if (appId != null) {
val appStagingDir = getAppStagingDir(appId)
try {
val preserveFiles = sparkConf.getBoolean("spark.yarn.preserve.staging.files", false)
val stagingDirPath = new Path(appStagingDir)
val fs = FileSystem.get(hadoopConf)
if (!preserveFiles && fs.exists(stagingDirPath)) {
logInfo("Deleting staging directory " + stagingDirPath)
fs.delete(stagingDirPath, true)
}
} catch {
case ioe: IOException =>
logWarning("Failed to cleanup staging dir " + appStagingDir, ioe)
}
}
throw e
}
}

/**
Expand Down

0 comments on commit c64ff80

Please sign in to comment.