diff --git a/core/src/main/scala/org/apache/spark/deploy/DeployMessage.scala b/core/src/main/scala/org/apache/spark/deploy/DeployMessage.scala index 1415d625625b1..243d8edb72ed3 100644 --- a/core/src/main/scala/org/apache/spark/deploy/DeployMessage.scala +++ b/core/src/main/scala/org/apache/spark/deploy/DeployMessage.scala @@ -88,6 +88,8 @@ private[deploy] object DeployMessages { case class KillDriver(driverId: String) extends DeployMessage + case class ApplicationFinished(id: String) + // Worker internal case object WorkDirCleanup // Sent to Worker actor periodically for cleaning up app folders @@ -176,8 +178,4 @@ private[deploy] object DeployMessages { case object SendHeartbeat - // Application finished message, used for cleanup - - case class ApplicationFinished(id: String) - } diff --git a/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala b/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala index 8b560714d374b..3a3db0fb1e548 100755 --- a/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala +++ b/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala @@ -342,12 +342,15 @@ private[spark] class Worker( // Create local dirs for the executor. These are passed to the executor via the // SPARK_LOCAL_DIRS environment variable, and deleted by the Worker when the // application finishes. - val appLocalDirs = appDirectories.get(appId).getOrElse { - Utils.getOrCreateLocalRootDirs(conf).map { dir => - Utils.createDirectory(dir).getAbsolutePath() - }.toSeq + val appLocalDirs = appDirectories.synchronized { + val dirs = appDirectories.get(appId).getOrElse { + Utils.getOrCreateLocalRootDirs(conf).map { dir => + Utils.createDirectory(dir).getAbsolutePath() + }.toSeq + } + appDirectories(appId) = dirs + dirs } - appDirectories(appId) = appLocalDirs val manager = new ExecutorRunner(appId, execId, appDesc, cores_, memory_, self, workerId, host, sparkHome, executorDir, akkaUrl, conf, appLocalDirs, @@ -469,13 +472,13 @@ private[spark] class Worker( registerWithMaster() } - private def maybeCleanupApplication(id: String): Unit = synchronized { + private def maybeCleanupApplication(id: String): Unit = appDirectories.synchronized { val shouldCleanup = finishedApps.contains(id) && !executors.values.exists(_.appId == id) if (shouldCleanup) { finishedApps -= id - appDirectories.remove(id).foreach { + appDirectories.remove(id).foreach { dirList => logInfo(s"Cleaning up local directories for application $id") - _.foreach { dir => + dirList.foreach { dir => Utils.deleteRecursively(new File(dir)) } }