Skip to content

Commit

Permalink
Fixed cleanup problem
Browse files Browse the repository at this point in the history
  • Loading branch information
sarutak committed May 9, 2015
1 parent 14502d5 commit f61071b
Showing 1 changed file with 23 additions and 2 deletions.
25 changes: 23 additions & 2 deletions yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@

package org.apache.spark.deploy.yarn

import java.io.{ByteArrayInputStream, DataInputStream, File, FileOutputStream}
import java.io.{ByteArrayInputStream, DataInputStream, File, FileOutputStream, IOException}
import java.net.{InetAddress, UnknownHostException, URI, URISyntaxException}
import java.nio.ByteBuffer
import java.security.PrivilegedExceptionAction
Expand Down Expand Up @@ -515,7 +515,28 @@ private[spark] class Client(
logInfo("Setting up container launch context for our AM")
val appId = newAppResponse.getApplicationId
val appStagingDir = getAppStagingDir(appId)
val localResources = prepareLocalResources(appStagingDir)
var localResources: Map[String, LocalResource] = null
try {
localResources = prepareLocalResources(appStagingDir)
} catch {
case e: Throwable =>
var stagingDirPath: Path = null
try {
val preserveFiles = sparkConf.getBoolean("spark.yarn.preserve.staging.files", false)
if (!preserveFiles) {
stagingDirPath = new Path(appStagingDir)
logInfo("Deleting staging directory " + stagingDirPath)
val fs = FileSystem.get(hadoopConf)
fs.delete(stagingDirPath, true)
}
} catch {
case ioe: IOException =>
logError("Failed to cleanup staging dir " + stagingDirPath, ioe)
} finally {
throw e
}
}

val launchEnv = setupLaunchEnv(appStagingDir)
val amContainer = Records.newRecord(classOf[ContainerLaunchContext])
amContainer.setLocalResources(localResources)
Expand Down

0 comments on commit f61071b

Please sign in to comment.