diff --git a/core/src/main/scala/org/apache/spark/HttpFileServer.scala b/core/src/main/scala/org/apache/spark/HttpFileServer.scala index 0e3750fdde415..3ad08ce0f110b 100644 --- a/core/src/main/scala/org/apache/spark/HttpFileServer.scala +++ b/core/src/main/scala/org/apache/spark/HttpFileServer.scala @@ -46,6 +46,15 @@ private[spark] class HttpFileServer(securityManager: SecurityManager) extends Lo def stop() { httpServer.stop() + + // If we only stop sc, but the driver process still run as a services then we need to delete + // the tmp dir, if not, it will create too many tmp dirs + try { + Utils.deleteRecursively(baseDir) + } catch { + case e: Exception => + logWarning(s"Exception while deleting Spark temp dir: ${baseDir.getAbsolutePath}", e) + } } def addFile(file: File) : String = { diff --git a/core/src/main/scala/org/apache/spark/SparkEnv.scala b/core/src/main/scala/org/apache/spark/SparkEnv.scala index 19d507c0cf860..b6b96da76af47 100644 --- a/core/src/main/scala/org/apache/spark/SparkEnv.scala +++ b/core/src/main/scala/org/apache/spark/SparkEnv.scala @@ -76,6 +76,8 @@ class SparkEnv ( // (e.g., HadoopFileRDD uses this to cache JobConfs and InputFormats). private[spark] val hadoopJobMetadata = new MapMaker().softValues().makeMap[String, Any]() + private var driverTmpDirToDelete: Option[String] = None + private[spark] def stop() { pythonWorkers.foreach { case(key, worker) => worker.stop() } httpFileServer.stop() @@ -90,6 +92,22 @@ class SparkEnv ( // down, but let's call it anyway in case it gets fixed in a later release // UPDATE: In Akka 2.1.x, this hangs if there are remote actors, so we can't call it. // actorSystem.awaitTermination() + + // If we only stop sc, but the driver process still run as a services then we need to delete + // the tmp dir, if not, it will create too many tmp dirs. + // We only need to delete the tmp dir create by driver, because sparkFilesDir is point to the + // current working dir in executor which we do not need to delete. + driverTmpDirToDelete match { + case Some(path) => { + try { + Utils.deleteRecursively(new File(path)) + } catch { + case e: Exception => + logWarning(s"Exception while deleting Spark temp dir: $path", e) + } + } + case None => // We just need to delete tmp dir created by driver, so do nothing on executor + } } private[spark] @@ -248,7 +266,7 @@ object SparkEnv extends Logging { "levels using the RDD.persist() method instead.") } - new SparkEnv( + val envInstance = new SparkEnv( executorId, actorSystem, serializer, @@ -264,6 +282,15 @@ object SparkEnv extends Logging { sparkFilesDir, metricsSystem, conf) + + // Add a reference to tmp dir created by driver, we will delete this tmp dir when stop() is + // called, and we only need to do it for driver. Because driver may run as a service, and if we + // don't delete this tmp dir when sc is stopped, then will create too many tmp dirs. + if (isDriver) { + envInstance.driverTmpDirToDelete = Some(sparkFilesDir) + } + + envInstance } /**