Skip to content

Commit

Permalink
[SPARK-5801] [core] Avoid creating nested directories.
Browse files Browse the repository at this point in the history
Cache the value of the local root dirs to use for storing local data,
so that the same directory is reused.

Also, to avoid an extra level of nesting, use a different env variable
to propagate the local dirs from the Worker to the executors. And make
the executor directory use a different name.
  • Loading branch information
Marcelo Vanzin committed Feb 24, 2015
1 parent 0a59e45 commit 18ee0a7
Show file tree
Hide file tree
Showing 3 changed files with 17 additions and 3 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -135,7 +135,7 @@ private[spark] class ExecutorRunner(
logInfo("Launch command: " + command.mkString("\"", "\" \"", "\""))

builder.directory(executorDir)
builder.environment.put("SPARK_LOCAL_DIRS", appLocalDirs.mkString(","))
builder.environment.put("SPARK_EXECUTOR_DIRS", appLocalDirs.mkString(File.pathSeparator))
// In case we are running this from within the Spark Shell, avoid creating a "scala"
// parent process for the executor command
builder.environment.put("SPARK_LAUNCH_WITH_SCALA", "0")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -345,11 +345,11 @@ private[spark] class Worker(
}

// Create local dirs for the executor. These are passed to the executor via the
// SPARK_LOCAL_DIRS environment variable, and deleted by the Worker when the
// SPARK_EXECUTOR_DIRS environment variable, and deleted by the Worker when the
// application finishes.
val appLocalDirs = appDirectories.get(appId).getOrElse {
Utils.getOrCreateLocalRootDirs(conf).map { dir =>
Utils.createDirectory(dir).getAbsolutePath()
Utils.createDirectory(dir, namePrefix = "executor").getAbsolutePath()
}.toSeq
}
appDirectories(appId) = appLocalDirs
Expand Down
14 changes: 14 additions & 0 deletions core/src/main/scala/org/apache/spark/util/Utils.scala
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,7 @@ private[spark] object Utils extends Logging {
val random = new Random()

private val MAX_DIR_CREATION_ATTEMPTS: Int = 10
@volatile private var localRootDirs: Array[String] = null

/** Serialize an object using Java serialization */
def serialize[T](o: T): Array[Byte] = {
Expand Down Expand Up @@ -679,12 +680,25 @@ private[spark] object Utils extends Logging {
* If no directories could be created, this will return an empty list.
*/
private[spark] def getOrCreateLocalRootDirs(conf: SparkConf): Array[String] = {
if (localRootDirs == null) {
this.synchronized {
if (localRootDirs == null) {
localRootDirs = getOrCreateLocalRootDirsImpl(conf)
}
}
}
localRootDirs
}

private def getOrCreateLocalRootDirsImpl(conf: SparkConf): Array[String] = {
if (isRunningInYarnContainer(conf)) {
// If we are in yarn mode, systems can have different disk layouts so we must set it
// to what Yarn on this system said was available. Note this assumes that Yarn has
// created the directories already, and that they are secured so that only the
// user has access to them.
getYarnLocalDirs(conf).split(",")
} else if (conf.getenv("SPARK_EXECUTOR_DIRS") != null) {
conf.getenv("SPARK_EXECUTOR_DIRS").split(File.pathSeparator)
} else {
// In non-Yarn mode (or for the driver in yarn-client mode), we cannot trust the user
// configuration to point to a secure directory. So create a subdirectory with restricted
Expand Down

0 comments on commit 18ee0a7

Please sign in to comment.