Skip to content

Commit

Permalink
Review feedback.
Browse files Browse the repository at this point in the history
  • Loading branch information
Marcelo Vanzin committed Dec 22, 2014
1 parent c0e5ea5 commit 50eb4b9
Show file tree
Hide file tree
Showing 2 changed files with 13 additions and 12 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,8 @@ private[deploy] object DeployMessages {

case class KillDriver(driverId: String) extends DeployMessage

case class ApplicationFinished(id: String)

// Worker internal

case object WorkDirCleanup // Sent to Worker actor periodically for cleaning up app folders
Expand Down Expand Up @@ -176,8 +178,4 @@ private[deploy] object DeployMessages {

case object SendHeartbeat

// Application finished message, used for cleanup

case class ApplicationFinished(id: String)

}
19 changes: 11 additions & 8 deletions core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala
Original file line number Diff line number Diff line change
Expand Up @@ -342,12 +342,15 @@ private[spark] class Worker(
// Create local dirs for the executor. These are passed to the executor via the
// SPARK_LOCAL_DIRS environment variable, and deleted by the Worker when the
// application finishes.
val appLocalDirs = appDirectories.get(appId).getOrElse {
Utils.getOrCreateLocalRootDirs(conf).map { dir =>
Utils.createDirectory(dir).getAbsolutePath()
}.toSeq
val appLocalDirs = appDirectories.synchronized {
val dirs = appDirectories.get(appId).getOrElse {
Utils.getOrCreateLocalRootDirs(conf).map { dir =>
Utils.createDirectory(dir).getAbsolutePath()
}.toSeq
}
appDirectories(appId) = dirs
dirs
}
appDirectories(appId) = appLocalDirs

val manager = new ExecutorRunner(appId, execId, appDesc, cores_, memory_,
self, workerId, host, sparkHome, executorDir, akkaUrl, conf, appLocalDirs,
Expand Down Expand Up @@ -469,13 +472,13 @@ private[spark] class Worker(
registerWithMaster()
}

private def maybeCleanupApplication(id: String): Unit = synchronized {
private def maybeCleanupApplication(id: String): Unit = appDirectories.synchronized {
val shouldCleanup = finishedApps.contains(id) && !executors.values.exists(_.appId == id)
if (shouldCleanup) {
finishedApps -= id
appDirectories.remove(id).foreach {
appDirectories.remove(id).foreach { dirList =>
logInfo(s"Cleaning up local directories for application $id")
_.foreach { dir =>
dirList.foreach { dir =>
Utils.deleteRecursively(new File(dir))
}
}
Expand Down

0 comments on commit 50eb4b9

Please sign in to comment.