Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-7503][YARN] Resources in .sparkStaging directory can't be cleaned up on error #6026

Closed
Closed
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
72 changes: 47 additions & 25 deletions yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@

package org.apache.spark.deploy.yarn

import java.io.{ByteArrayInputStream, DataInputStream, File, FileOutputStream}
import java.io.{ByteArrayInputStream, DataInputStream, File, FileOutputStream, IOException}
import java.net.{InetAddress, UnknownHostException, URI, URISyntaxException}
import java.nio.ByteBuffer
import java.security.PrivilegedExceptionAction
Expand Down Expand Up @@ -91,30 +91,52 @@ private[spark] class Client(
* available in the alpha API.
*/
def submitApplication(): ApplicationId = {
// Setup the credentials before doing anything else, so we have don't have issues at any point.
setupCredentials()
yarnClient.init(yarnConf)
yarnClient.start()

logInfo("Requesting a new application from cluster with %d NodeManagers"
.format(yarnClient.getYarnClusterMetrics.getNumNodeManagers))

// Get a new application from our RM
val newApp = yarnClient.createApplication()
val newAppResponse = newApp.getNewApplicationResponse()
val appId = newAppResponse.getApplicationId()

// Verify whether the cluster has enough resources for our AM
verifyClusterResources(newAppResponse)

// Set up the appropriate contexts to launch our AM
val containerContext = createContainerLaunchContext(newAppResponse)
val appContext = createApplicationSubmissionContext(newApp, containerContext)

// Finally, submit and monitor the application
logInfo(s"Submitting application ${appId.getId} to ResourceManager")
yarnClient.submitApplication(appContext)
appId
var appId: ApplicationId = null
try {
// Setup the credentials before doing anything else,
// so we have don't have issues at any point.
setupCredentials()
yarnClient.init(yarnConf)
yarnClient.start()

logInfo("Requesting a new application from cluster with %d NodeManagers"
.format(yarnClient.getYarnClusterMetrics.getNumNodeManagers))

// Get a new application from our RM
val newApp = yarnClient.createApplication()
val newAppResponse = newApp.getNewApplicationResponse()
appId = newAppResponse.getApplicationId()

// Verify whether the cluster has enough resources for our AM
verifyClusterResources(newAppResponse)

// Set up the appropriate contexts to launch our AM
val containerContext = createContainerLaunchContext(newAppResponse)
val appContext = createApplicationSubmissionContext(newApp, containerContext)

// Finally, submit and monitor the application
logInfo(s"Submitting application ${appId.getId} to ResourceManager")
yarnClient.submitApplication(appContext)
appId
} catch {
case e: Throwable =>
if (appId != null) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I suppose there's no great way to share this with similar code in ApplicationMaster.scala? maybe not. This could be made into a private method, as it is in the other similar code block, but it's not a big deal. LGTM.

val appStagingDir = getAppStagingDir(appId)
try {
val preserveFiles = sparkConf.getBoolean("spark.yarn.preserve.staging.files", false)
val stagingDirPath = new Path(appStagingDir)
val fs = FileSystem.get(hadoopConf)
if (!preserveFiles && fs.exists(stagingDirPath)) {
logInfo("Deleting staging directory " + stagingDirPath)
fs.delete(stagingDirPath, true)
}
} catch {
case ioe: IOException =>
logWarning("Failed to cleanup staging dir " + appStagingDir, ioe)
}
}
throw e
}
}

/**
Expand Down