Skip to content

Commit

Permalink
Merge remote-tracking branch 'upstream/master' into writeSupportFollowup
Browse files Browse the repository at this point in the history
  • Loading branch information
yhuai committed Feb 10, 2015
2 parents d91ecb8 + 6cc96cf commit 22cfa70
Show file tree
Hide file tree
Showing 63 changed files with 2,480 additions and 728 deletions.
83 changes: 77 additions & 6 deletions core/src/main/scala/org/apache/spark/SparkConf.scala
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
package org.apache.spark

import java.util.concurrent.ConcurrentHashMap
import java.util.concurrent.atomic.AtomicBoolean

import scala.collection.JavaConverters._
import scala.collection.mutable.LinkedHashSet
Expand Down Expand Up @@ -67,7 +68,7 @@ class SparkConf(loadDefaults: Boolean) extends Cloneable with Logging {
if (value == null) {
throw new NullPointerException("null value for " + key)
}
settings.put(key, value)
settings.put(translateConfKey(key, warn = true), value)
this
}

Expand Down Expand Up @@ -139,7 +140,7 @@ class SparkConf(loadDefaults: Boolean) extends Cloneable with Logging {

/** Set a parameter if it isn't already configured */
def setIfMissing(key: String, value: String): SparkConf = {
settings.putIfAbsent(key, value)
settings.putIfAbsent(translateConfKey(key, warn = true), value)
this
}

Expand Down Expand Up @@ -175,7 +176,7 @@ class SparkConf(loadDefaults: Boolean) extends Cloneable with Logging {

/** Get a parameter as an Option */
def getOption(key: String): Option[String] = {
Option(settings.get(key))
Option(settings.get(translateConfKey(key)))
}

/** Get all parameters as a list of pairs */
Expand Down Expand Up @@ -228,7 +229,7 @@ class SparkConf(loadDefaults: Boolean) extends Cloneable with Logging {
def getAppId: String = get("spark.app.id")

/** Does the configuration contain a given parameter? */
def contains(key: String): Boolean = settings.containsKey(key)
def contains(key: String): Boolean = settings.containsKey(translateConfKey(key))

/** Copy this object */
override def clone: SparkConf = {
Expand Down Expand Up @@ -285,7 +286,7 @@ class SparkConf(loadDefaults: Boolean) extends Cloneable with Logging {
// Validate memory fractions
val memoryKeys = Seq(
"spark.storage.memoryFraction",
"spark.shuffle.memoryFraction",
"spark.shuffle.memoryFraction",
"spark.shuffle.safetyFraction",
"spark.storage.unrollFraction",
"spark.storage.safetyFraction")
Expand Down Expand Up @@ -351,9 +352,20 @@ class SparkConf(loadDefaults: Boolean) extends Cloneable with Logging {
def toDebugString: String = {
getAll.sorted.map{case (k, v) => k + "=" + v}.mkString("\n")
}

}

private[spark] object SparkConf {
private[spark] object SparkConf extends Logging {

private val deprecatedConfigs: Map[String, DeprecatedConfig] = {
val configs = Seq(
DeprecatedConfig("spark.files.userClassPathFirst", "spark.executor.userClassPathFirst",
"1.3"),
DeprecatedConfig("spark.yarn.user.classpath.first", null, "1.3",
"Use spark.{driver,executor}.userClassPathFirst instead."))
configs.map { x => (x.oldName, x) }.toMap
}

/**
* Return whether the given config is an akka config (e.g. akka.actor.provider).
* Note that this does not include spark-specific akka configs (e.g. spark.akka.timeout).
Expand All @@ -380,4 +392,63 @@ private[spark] object SparkConf {
def isSparkPortConf(name: String): Boolean = {
(name.startsWith("spark.") && name.endsWith(".port")) || name.startsWith("spark.port.")
}

/**
* Translate the configuration key if it is deprecated and has a replacement, otherwise just
* returns the provided key.
*
* @param userKey Configuration key from the user / caller.
* @param warn Whether to print a warning if the key is deprecated. Warnings will be printed
* only once for each key.
*/
def translateConfKey(userKey: String, warn: Boolean = false): String = {
deprecatedConfigs.get(userKey)
.map { deprecatedKey =>
if (warn) {
deprecatedKey.warn()
}
deprecatedKey.newName.getOrElse(userKey)
}.getOrElse(userKey)
}

/**
* Holds information about keys that have been deprecated or renamed.
*
* @param oldName Old configuration key.
* @param newName New configuration key, or `null` if key has no replacement, in which case the
* deprecated key will be used (but the warning message will still be printed).
* @param version Version of Spark where key was deprecated.
* @param deprecationMessage Message to include in the deprecation warning; mandatory when
* `newName` is not provided.
*/
private case class DeprecatedConfig(
oldName: String,
_newName: String,
version: String,
deprecationMessage: String = null) {

private val warned = new AtomicBoolean(false)
val newName = Option(_newName)

if (newName == null && (deprecationMessage == null || deprecationMessage.isEmpty())) {
throw new IllegalArgumentException("Need new config name or deprecation message.")
}

def warn(): Unit = {
if (warned.compareAndSet(false, true)) {
if (newName != null) {
val message = Option(deprecationMessage).getOrElse(
s"Please use the alternative '$newName' instead.")
logWarning(
s"The configuration option '$oldName' has been replaced as of Spark $version and " +
s"may be removed in the future. $message")
} else {
logWarning(
s"The configuration option '$oldName' has been deprecated as of Spark $version and " +
s"may be removed in the future. $deprecationMessage")
}
}
}

}
}
19 changes: 18 additions & 1 deletion core/src/main/scala/org/apache/spark/TestUtils.scala
Original file line number Diff line number Diff line change
Expand Up @@ -17,12 +17,13 @@

package org.apache.spark

import java.io.{File, FileInputStream, FileOutputStream}
import java.io.{ByteArrayInputStream, File, FileInputStream, FileOutputStream}
import java.net.{URI, URL}
import java.util.jar.{JarEntry, JarOutputStream}

import scala.collection.JavaConversions._

import com.google.common.base.Charsets.UTF_8
import com.google.common.io.{ByteStreams, Files}
import javax.tools.{JavaFileObject, SimpleJavaFileObject, ToolProvider}

Expand Down Expand Up @@ -59,6 +60,22 @@ private[spark] object TestUtils {
createJar(files1 ++ files2, jarFile)
}

/**
* Create a jar file containing multiple files. The `files` map contains a mapping of
* file names in the jar file to their contents.
*/
def createJarWithFiles(files: Map[String, String], dir: File = null): URL = {
val tempDir = Option(dir).getOrElse(Utils.createTempDir())
val jarFile = File.createTempFile("testJar", ".jar", tempDir)
val jarStream = new JarOutputStream(new FileOutputStream(jarFile))
files.foreach { case (k, v) =>
val entry = new JarEntry(k)
jarStream.putNextEntry(entry)
ByteStreams.copy(new ByteArrayInputStream(v.getBytes(UTF_8)), jarStream)
}
jarStream.close()
jarFile.toURI.toURL
}

/**
* Create a jar file that contains this set of files. All files will be located at the root
Expand Down
5 changes: 3 additions & 2 deletions core/src/main/scala/org/apache/spark/deploy/Client.scala
Original file line number Diff line number Diff line change
Expand Up @@ -68,8 +68,9 @@ private class ClientActor(driverArgs: ClientArguments, conf: SparkConf)
.map(Utils.splitCommandString).getOrElse(Seq.empty)
val sparkJavaOpts = Utils.sparkJavaOpts(conf)
val javaOpts = sparkJavaOpts ++ extraJavaOpts
val command = new Command(mainClass, Seq("{{WORKER_URL}}", driverArgs.mainClass) ++
driverArgs.driverOptions, sys.env, classPathEntries, libraryPathEntries, javaOpts)
val command = new Command(mainClass,
Seq("{{WORKER_URL}}", "{{USER_JAR}}", driverArgs.mainClass) ++ driverArgs.driverOptions,
sys.env, classPathEntries, libraryPathEntries, javaOpts)

val driverDescription = new DriverDescription(
driverArgs.jarUrl,
Expand Down
8 changes: 4 additions & 4 deletions core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ import org.apache.ivy.plugins.resolver.{ChainResolver, IBiblioResolver}

import org.apache.spark.deploy.rest._
import org.apache.spark.executor._
import org.apache.spark.util.Utils
import org.apache.spark.util.{ChildFirstURLClassLoader, MutableURLClassLoader, Utils}

/**
* Whether to submit, kill, or request the status of an application.
Expand Down Expand Up @@ -467,11 +467,11 @@ object SparkSubmit {
}

val loader =
if (sysProps.getOrElse("spark.files.userClassPathFirst", "false").toBoolean) {
new ChildExecutorURLClassLoader(new Array[URL](0),
if (sysProps.getOrElse("spark.driver.userClassPathFirst", "false").toBoolean) {
new ChildFirstURLClassLoader(new Array[URL](0),
Thread.currentThread.getContextClassLoader)
} else {
new ExecutorURLClassLoader(new Array[URL](0),
new MutableURLClassLoader(new Array[URL](0),
Thread.currentThread.getContextClassLoader)
}
Thread.currentThread.setContextClassLoader(loader)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -196,7 +196,7 @@ private[spark] class MasterPage(parent: MasterWebUI) extends WebUIPage("") {
<td sorttable_customkey={driver.desc.mem.toString}>
{Utils.megabytesToString(driver.desc.mem.toLong)}
</td>
<td>{driver.desc.command.arguments(1)}</td>
<td>{driver.desc.command.arguments(2)}</td>
</tr>
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -392,7 +392,7 @@ private class SubmitRequestServlet(
val javaOpts = sparkJavaOpts ++ extraJavaOpts
val command = new Command(
"org.apache.spark.deploy.worker.DriverWrapper",
Seq("{{WORKER_URL}}", mainClass) ++ appArgs, // args to the DriverWrapper
Seq("{{WORKER_URL}}", "{{USER_JAR}}", mainClass) ++ appArgs, // args to the DriverWrapper
environmentVariables, extraClassPath, extraLibraryPath, javaOpts)
val actualDriverMemory = driverMemory.map(Utils.memoryStringToMb).getOrElse(DEFAULT_MEMORY)
val actualDriverCores = driverCores.map(_.toInt).getOrElse(DEFAULT_CORES)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -74,10 +74,15 @@ private[spark] class DriverRunner(
val driverDir = createWorkingDirectory()
val localJarFilename = downloadUserJar(driverDir)

// Make sure user application jar is on the classpath
def substituteVariables(argument: String): String = argument match {
case "{{WORKER_URL}}" => workerUrl
case "{{USER_JAR}}" => localJarFilename
case other => other
}

// TODO: If we add ability to submit multiple jars they should also be added here
val builder = CommandUtils.buildProcessBuilder(driverDesc.command, driverDesc.mem,
sparkHome.getAbsolutePath, substituteVariables, Seq(localJarFilename))
sparkHome.getAbsolutePath, substituteVariables)
launchDriver(builder, driverDir, driverDesc.supervise)
}
catch {
Expand Down Expand Up @@ -111,12 +116,6 @@ private[spark] class DriverRunner(
}
}

/** Replace variables in a command argument passed to us */
private def substituteVariables(argument: String): String = argument match {
case "{{WORKER_URL}}" => workerUrl
case other => other
}

/**
* Creates the working directory for this driver.
* Will throw an exception if there are errors preparing the directory.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,32 +17,44 @@

package org.apache.spark.deploy.worker

import java.io.File

import akka.actor._

import org.apache.spark.{SecurityManager, SparkConf}
import org.apache.spark.util.{AkkaUtils, Utils}
import org.apache.spark.util.{AkkaUtils, ChildFirstURLClassLoader, MutableURLClassLoader, Utils}

/**
* Utility object for launching driver programs such that they share fate with the Worker process.
*/
object DriverWrapper {
def main(args: Array[String]) {
args.toList match {
case workerUrl :: mainClass :: extraArgs =>
case workerUrl :: userJar :: mainClass :: extraArgs =>
val conf = new SparkConf()
val (actorSystem, _) = AkkaUtils.createActorSystem("Driver",
Utils.localHostName(), 0, conf, new SecurityManager(conf))
actorSystem.actorOf(Props(classOf[WorkerWatcher], workerUrl), name = "workerWatcher")

val currentLoader = Thread.currentThread.getContextClassLoader
val userJarUrl = new File(userJar).toURI().toURL()
val loader =
if (sys.props.getOrElse("spark.driver.userClassPathFirst", "false").toBoolean) {
new ChildFirstURLClassLoader(Array(userJarUrl), currentLoader)
} else {
new MutableURLClassLoader(Array(userJarUrl), currentLoader)
}
Thread.currentThread.setContextClassLoader(loader)

// Delegate to supplied main class
val clazz = Class.forName(args(1))
val clazz = Class.forName(mainClass, true, loader)
val mainMethod = clazz.getMethod("main", classOf[Array[String]])
mainMethod.invoke(null, extraArgs.toArray[String])

actorSystem.shutdown()

case _ =>
System.err.println("Usage: DriverWrapper <workerUrl> <driverMainClass> [options]")
System.err.println("Usage: DriverWrapper <workerUrl> <userJar> <driverMainClass> [options]")
System.exit(-1)
}
}
Expand Down
Loading

0 comments on commit 22cfa70

Please sign in to comment.