Skip to content

Commit

Permalink
add a member to store the reference of tmp dir
Browse files Browse the repository at this point in the history
  • Loading branch information
Sephiroth-Lin committed Feb 10, 2015
1 parent b2018a5 commit 1339c96
Showing 1 changed file with 19 additions and 9 deletions.
28 changes: 19 additions & 9 deletions core/src/main/scala/org/apache/spark/SparkEnv.scala
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,8 @@ class SparkEnv (
// (e.g., HadoopFileRDD uses this to cache JobConfs and InputFormats).
private[spark] val hadoopJobMetadata = new MapMaker().softValues().makeMap[String, Any]()

private var tmpFilesDir: Option[String] = None

private[spark] def stop() {
isStopped = true
pythonWorkers.foreach { case(key, worker) => worker.stop() }
Expand All @@ -98,13 +100,16 @@ class SparkEnv (
// the tmp dir, if not, it will create too many tmp dirs.
// We only need to delete the tmp dir create by driver, because sparkFilesDir is point to the
// current working dir in executor which we do not need to delete.
if (sparkFilesDir != ".") {
try {
Utils.deleteRecursively(new File(sparkFilesDir))
} catch {
case e: Exception =>
logWarning(s"Exception while deleting Spark temp dir: $sparkFilesDir", e)
tmpFilesDir match {
case Some(path) => {
try {
Utils.deleteRecursively(new File(path))
} catch {
case e: Exception =>
logWarning(s"Exception while deleting Spark temp dir: $path", e)
}
}
case None => // We just need to delete tmp dir created by driver, so do nothing on executor
}
}

Expand Down Expand Up @@ -351,8 +356,6 @@ object SparkEnv extends Logging {
// Set the sparkFiles directory, used when downloading dependencies. In local mode,
// this is a temporary directory; in distributed mode, this is the executor's current working
// directory.
// As we use this value to decide whether if we need to delete the tmp file in stop(), so if you
// want to change this code please be careful.
val sparkFilesDir: String = if (isDriver) {
Utils.createTempDir(Utils.getLocalDir(conf), "userFiles").getAbsolutePath
} else {
Expand All @@ -365,7 +368,7 @@ object SparkEnv extends Logging {
"levels using the RDD.persist() method instead.")
}

new SparkEnv(
val envInstance = new SparkEnv(
executorId,
actorSystem,
serializer,
Expand All @@ -382,6 +385,13 @@ object SparkEnv extends Logging {
metricsSystem,
shuffleMemoryManager,
conf)

// Add a reference to tmp dir created by driver
if (isDriver) {
envInstance.tmpFilesDir = Some(sparkFilesDir)
}

envInstance
}

/**
Expand Down

0 comments on commit 1339c96

Please sign in to comment.