Skip to content

Commit

Permalink
Make Utils.deleteRecursively try to delete all paths even when an exc…
Browse files Browse the repository at this point in the history
…eption occurs; use one shutdown hook instead of one per method call to delete temp dirs
  • Loading branch information
srowen committed Oct 6, 2014
1 parent 3a0faa4 commit da0146d
Showing 1 changed file with 36 additions and 17 deletions.
53 changes: 36 additions & 17 deletions core/src/main/scala/org/apache/spark/util/Utils.scala
Original file line number Diff line number Diff line change
Expand Up @@ -168,6 +168,20 @@ private[spark] object Utils extends Logging {
private val shutdownDeletePaths = new scala.collection.mutable.HashSet[String]()
private val shutdownDeleteTachyonPaths = new scala.collection.mutable.HashSet[String]()

// Add a shutdown hook to delete the temp dirs when the JVM exits
Runtime.getRuntime.addShutdownHook(new Thread("delete Spark temp dirs") {
override def run(): Unit = Utils.logUncaughtExceptions {
logDebug("Shutdown hook called")
shutdownDeletePaths.foreach { dirPath =>
try {
Utils.deleteRecursively(new File(dirPath))
} catch {
case e: Exception => logError(s"Exception while deleting Spark temp dir: $dirPath", e)
}
}
}
})

// Register the path to be deleted via shutdown hook
def registerShutdownDeleteDir(file: File) {
val absolutePath = file.getAbsolutePath()
Expand Down Expand Up @@ -251,15 +265,8 @@ private[spark] object Utils extends Logging {
} catch { case e: IOException => ; }
}

dir.deleteOnExit()
registerShutdownDeleteDir(dir)

// Add a shutdown hook to delete the temp dir when the JVM exits
Runtime.getRuntime.addShutdownHook(new Thread("delete Spark temp dir " + dir) {
override def run() {
// Attempt to delete if some patch which is parent of this is not already registered.
if (! hasRootAsShutdownDeleteDir(dir)) Utils.deleteRecursively(dir)
}
})
dir
}

Expand Down Expand Up @@ -666,15 +673,27 @@ private[spark] object Utils extends Logging {
*/
def deleteRecursively(file: File) {
if (file != null) {
if (file.isDirectory() && !isSymlink(file)) {
for (child <- listFilesSafely(file)) {
deleteRecursively(child)
try {
if (file.isDirectory && !isSymlink(file)) {
var savedIOException: IOException = null
for (child <- listFilesSafely(file)) {
try {
deleteRecursively(child)
} catch {
// In case of multiple exceptions, only last one will be thrown
case ioe: IOException => savedIOException = ioe
}
}
if (savedIOException != null) {
throw savedIOException
}
}
}
if (!file.delete()) {
// Delete can also fail if the file simply did not exist
if (file.exists()) {
throw new IOException("Failed to delete: " + file.getAbsolutePath)
} finally {
if (!file.delete()) {
// Delete can also fail if the file simply did not exist
if (file.exists()) {
throw new IOException("Failed to delete: " + file.getAbsolutePath)
}
}
}
}
Expand Down Expand Up @@ -713,7 +732,7 @@ private[spark] object Utils extends Logging {
*/
def doesDirectoryContainAnyNewFiles(dir: File, cutoff: Long): Boolean = {
if (!dir.isDirectory) {
throw new IllegalArgumentException("$dir is not a directory!")
throw new IllegalArgumentException(s"$dir is not a directory!")
}
val filesAndDirs = dir.listFiles()
val cutoffTimeInMillis = System.currentTimeMillis - (cutoff * 1000)
Expand Down

0 comments on commit da0146d

Please sign in to comment.