Skip to content

Commit

Permalink
[SPARK-2260] Fix standalone-cluster mode, which was broken
Browse files Browse the repository at this point in the history
The main thing was that spark configs were not propagated to the driver, and so applications that do not specify `master` or `appName` automatically failed. This PR fixes that and a couple of miscellaneous things that are related.

One thing that may or may not be an issue is that the jars must be available on the driver node. In `standalone-cluster` mode, this effectively means these jars must be available on all the worker machines, since the driver is launched on one of them. The semantics here are not the same as `yarn-cluster` mode,  where all the relevant jars are uploaded to a distributed cache automatically and shipped to the containers. This is probably not a concern, but still worth a mention.

Author: Andrew Or <[email protected]>

Closes #1538 from andrewor14/standalone-cluster and squashes the following commits:

8c11a0d [Andrew Or] Clean up imports / comments (minor)
2678d13 [Andrew Or] Handle extraJavaOpts properly
7660547 [Andrew Or] Merge branch 'master' of github.com:apache/spark into standalone-cluster
6f64a9b [Andrew Or] Revert changes in YARN
2f2908b [Andrew Or] Fix tests
ed01491 [Andrew Or] Don't go overboard with escaping
8e105e1 [Andrew Or] Merge branch 'master' of github.com:apache/spark into standalone-cluster
b890949 [Andrew Or] Abstract usages of converting spark opts to java opts
79f63a3 [Andrew Or] Move sparkProps into javaOpts
78752f8 [Andrew Or] Fix tests
5a9c6c7 [Andrew Or] Fix line too long
c141a00 [Andrew Or] Don't display "unknown app" on driver log pages
d7e2728 [Andrew Or] Avoid deprecation warning in standalone Client
6ceb14f [Andrew Or] Allow relevant configs to propagate to standalone Driver
7f854bc [Andrew Or] Fix test
855256e [Andrew Or] Fix standalone-cluster mode
fd9da51 [Andrew Or] Formatting changes (minor)
  • Loading branch information
andrewor14 authored and pwendell committed Jul 30, 2014
1 parent 077f633 commit 4ce92cc
Show file tree
Hide file tree
Showing 16 changed files with 93 additions and 51 deletions.
22 changes: 21 additions & 1 deletion core/src/main/scala/org/apache/spark/SparkConf.scala
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,8 @@ import scala.collection.mutable.HashMap
*/
class SparkConf(loadDefaults: Boolean) extends Cloneable with Logging {

import SparkConf._

/** Create a SparkConf that loads defaults from system properties and the classpath */
def this() = this(true)

Expand Down Expand Up @@ -198,7 +200,7 @@ class SparkConf(loadDefaults: Boolean) extends Cloneable with Logging {
*
* E.g. spark.akka.option.x.y.x = "value"
*/
getAll.filter {case (k, v) => k.startsWith("akka.")}
getAll.filter { case (k, _) => isAkkaConf(k) }

/** Does the configuration contain a given parameter? */
def contains(key: String): Boolean = settings.contains(key)
Expand Down Expand Up @@ -292,3 +294,21 @@ class SparkConf(loadDefaults: Boolean) extends Cloneable with Logging {
settings.toArray.sorted.map{case (k, v) => k + "=" + v}.mkString("\n")
}
}

private[spark] object SparkConf {
/**
* Return whether the given config is an akka config (e.g. akka.actor.provider).
* Note that this does not include spark-specific akka configs (e.g. spark.akka.timeout).
*/
def isAkkaConf(name: String): Boolean = name.startsWith("akka.")

/**
* Return whether the given config should be passed to an executor on start-up.
*
* Certain akka and authentication configs are required of the executor when it connects to
* the scheduler, while the rest of the spark configs can be inherited from the driver later.
*/
def isExecutorStartupConf(name: String): Boolean = {
isAkkaConf(name) || name.startsWith("spark.akka") || name.startsWith("spark.auth")
}
}
21 changes: 11 additions & 10 deletions core/src/main/scala/org/apache/spark/deploy/Client.scala
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,6 @@

package org.apache.spark.deploy

import scala.collection.JavaConversions._
import scala.collection.mutable.Map
import scala.concurrent._

import akka.actor._
Expand Down Expand Up @@ -50,9 +48,6 @@ private class ClientActor(driverArgs: ClientArguments, conf: SparkConf) extends
// TODO: We could add an env variable here and intercept it in `sc.addJar` that would
// truncate filesystem paths similar to what YARN does. For now, we just require
// people call `addJar` assuming the jar is in the same directory.
val env = Map[String, String]()
System.getenv().foreach{case (k, v) => env(k) = v}

val mainClass = "org.apache.spark.deploy.worker.DriverWrapper"

val classPathConf = "spark.driver.extraClassPath"
Expand All @@ -65,10 +60,13 @@ private class ClientActor(driverArgs: ClientArguments, conf: SparkConf) extends
cp.split(java.io.File.pathSeparator)
}

val javaOptionsConf = "spark.driver.extraJavaOptions"
val javaOpts = sys.props.get(javaOptionsConf)
val extraJavaOptsConf = "spark.driver.extraJavaOptions"
val extraJavaOpts = sys.props.get(extraJavaOptsConf)
.map(Utils.splitCommandString).getOrElse(Seq.empty)
val sparkJavaOpts = Utils.sparkJavaOpts(conf)
val javaOpts = sparkJavaOpts ++ extraJavaOpts
val command = new Command(mainClass, Seq("{{WORKER_URL}}", driverArgs.mainClass) ++
driverArgs.driverOptions, env, classPathEntries, libraryPathEntries, javaOpts)
driverArgs.driverOptions, sys.env, classPathEntries, libraryPathEntries, javaOpts)

val driverDescription = new DriverDescription(
driverArgs.jarUrl,
Expand Down Expand Up @@ -109,6 +107,7 @@ private class ClientActor(driverArgs: ClientArguments, conf: SparkConf) extends
// Exception, if present
statusResponse.exception.map { e =>
println(s"Exception from cluster was: $e")
e.printStackTrace()
System.exit(-1)
}
System.exit(0)
Expand Down Expand Up @@ -141,8 +140,10 @@ private class ClientActor(driverArgs: ClientArguments, conf: SparkConf) extends
*/
object Client {
def main(args: Array[String]) {
println("WARNING: This client is deprecated and will be removed in a future version of Spark.")
println("Use ./bin/spark-submit with \"--master spark://host:port\"")
if (!sys.props.contains("SPARK_SUBMIT")) {
println("WARNING: This client is deprecated and will be removed in a future version of Spark")
println("Use ./bin/spark-submit with \"--master spark://host:port\"")
}

val conf = new SparkConf()
val driverArgs = new ClientArguments(args)
Expand Down
2 changes: 1 addition & 1 deletion core/src/main/scala/org/apache/spark/deploy/Command.scala
Original file line number Diff line number Diff line change
Expand Up @@ -25,5 +25,5 @@ private[spark] case class Command(
environment: Map[String, String],
classPathEntries: Seq[String],
libraryPathEntries: Seq[String],
extraJavaOptions: Option[String] = None) {
javaOpts: Seq[String]) {
}
12 changes: 5 additions & 7 deletions core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala
Original file line number Diff line number Diff line change
Expand Up @@ -136,8 +136,6 @@ object SparkSubmit {
(clusterManager, deployMode) match {
case (MESOS, CLUSTER) =>
printErrorAndExit("Cluster deploy mode is currently not supported for Mesos clusters.")
case (STANDALONE, CLUSTER) =>
printErrorAndExit("Cluster deploy mode is currently not supported for Standalone clusters.")
case (_, CLUSTER) if args.isPython =>
printErrorAndExit("Cluster deploy mode is currently not supported for python applications.")
case (_, CLUSTER) if isShell(args.primaryResource) =>
Expand Down Expand Up @@ -170,9 +168,9 @@ object SparkSubmit {
val options = List[OptionAssigner](

// All cluster managers
OptionAssigner(args.master, ALL_CLUSTER_MGRS, CLIENT, sysProp = "spark.master"),
OptionAssigner(args.name, ALL_CLUSTER_MGRS, CLIENT, sysProp = "spark.app.name"),
OptionAssigner(args.jars, ALL_CLUSTER_MGRS, CLIENT, sysProp = "spark.jars"),
OptionAssigner(args.master, ALL_CLUSTER_MGRS, ALL_DEPLOY_MODES, sysProp = "spark.master"),
OptionAssigner(args.name, ALL_CLUSTER_MGRS, ALL_DEPLOY_MODES, sysProp = "spark.app.name"),
OptionAssigner(args.jars, ALL_CLUSTER_MGRS, ALL_DEPLOY_MODES, sysProp = "spark.jars"),

// Standalone cluster only
OptionAssigner(args.driverMemory, STANDALONE, CLUSTER, clOption = "--memory"),
Expand Down Expand Up @@ -203,9 +201,9 @@ object SparkSubmit {
sysProp = "spark.driver.extraJavaOptions"),
OptionAssigner(args.driverExtraLibraryPath, STANDALONE | YARN, CLUSTER,
sysProp = "spark.driver.extraLibraryPath"),
OptionAssigner(args.executorMemory, STANDALONE | MESOS | YARN, CLIENT,
OptionAssigner(args.executorMemory, STANDALONE | MESOS | YARN, ALL_DEPLOY_MODES,
sysProp = "spark.executor.memory"),
OptionAssigner(args.totalExecutorCores, STANDALONE | MESOS, CLIENT,
OptionAssigner(args.totalExecutorCores, STANDALONE | MESOS, ALL_DEPLOY_MODES,
sysProp = "spark.cores.max"),
OptionAssigner(args.files, LOCAL | STANDALONE | MESOS, ALL_DEPLOY_MODES,
sysProp = "spark.files")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,11 +46,11 @@ private[spark] object TestClient {
def main(args: Array[String]) {
val url = args(0)
val conf = new SparkConf
val (actorSystem, port) = AkkaUtils.createActorSystem("spark", Utils.localIpAddress, 0,
val (actorSystem, _) = AkkaUtils.createActorSystem("spark", Utils.localIpAddress, 0,
conf = conf, securityManager = new SecurityManager(conf))
val desc = new ApplicationDescription(
"TestClient", Some(1), 512, Command("spark.deploy.client.TestExecutor", Seq(), Map(), Seq(),
Seq()), Some("dummy-spark-home"), "ignored")
"TestClient", Some(1), 512, Command("spark.deploy.client.TestExecutor", Seq(), Map(),
Seq(), Seq(), Seq()), Some("dummy-spark-home"), "ignored")
val listener = new TestListener
val client = new AppClient(actorSystem, Array(url), desc, listener, new SparkConf)
client.start()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,6 @@ object CommandUtils extends Logging {
*/
def buildJavaOpts(command: Command, memory: Int, sparkHome: String): Seq[String] = {
val memoryOpts = Seq(s"-Xms${memory}M", s"-Xmx${memory}M")
val extraOpts = command.extraJavaOptions.map(Utils.splitCommandString).getOrElse(Seq())

// Exists for backwards compatibility with older Spark versions
val workerLocalOpts = Option(getenv("SPARK_JAVA_OPTS")).map(Utils.splitCommandString)
Expand All @@ -62,7 +61,7 @@ object CommandUtils extends Logging {
val joined = command.libraryPathEntries.mkString(File.pathSeparator)
Seq(s"-Djava.library.path=$joined")
} else {
Seq()
Seq()
}

val permGenOpt = Seq("-XX:MaxPermSize=128m")
Expand All @@ -71,11 +70,11 @@ object CommandUtils extends Logging {
val ext = if (System.getProperty("os.name").startsWith("Windows")) ".cmd" else ".sh"
val classPath = Utils.executeAndGetOutput(
Seq(sparkHome + "/bin/compute-classpath" + ext),
extraEnvironment=command.environment)
extraEnvironment = command.environment)
val userClassPath = command.classPathEntries ++ Seq(classPath)

Seq("-cp", userClassPath.filterNot(_.isEmpty).mkString(File.pathSeparator)) ++
permGenOpt ++ libraryOpts ++ extraOpts ++ workerLocalOpts ++ memoryOpts
permGenOpt ++ libraryOpts ++ workerLocalOpts ++ command.javaOpts ++ memoryOpts
}

/** Spawn a thread that will redirect a given stream to a file */
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ import org.apache.spark.deploy.master.DriverState.DriverState

/**
* Manages the execution of one driver, including automatically restarting the driver on failure.
* This is currently only used in standalone cluster deploy mode.
*/
private[spark] class DriverRunner(
val driverId: String,
Expand Down Expand Up @@ -81,7 +82,7 @@ private[spark] class DriverRunner(
driverDesc.command.environment,
classPath,
driverDesc.command.libraryPathEntries,
driverDesc.command.extraJavaOptions)
driverDesc.command.javaOpts)
val command = CommandUtils.buildCommandSeq(newCommand, driverDesc.mem,
sparkHome.getAbsolutePath)
launchDriver(command, driverDesc.command.environment, driverDir, driverDesc.supervise)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ import org.apache.spark.util.logging.FileAppender

/**
* Manages the execution of one executor process.
* This is currently only used in standalone mode.
*/
private[spark] class ExecutorRunner(
val appId: String,
Expand Down Expand Up @@ -72,7 +73,7 @@ private[spark] class ExecutorRunner(
}

/**
* kill executor process, wait for exit and notify worker to update resource status
* Kill executor process, wait for exit and notify worker to update resource status.
*
* @param message the exception message which caused the executor's death
*/
Expand Down Expand Up @@ -114,10 +115,13 @@ private[spark] class ExecutorRunner(
}

def getCommandSeq = {
val command = Command(appDesc.command.mainClass,
appDesc.command.arguments.map(substituteVariables) ++ Seq(appId), appDesc.command.environment,
appDesc.command.classPathEntries, appDesc.command.libraryPathEntries,
appDesc.command.extraJavaOptions)
val command = Command(
appDesc.command.mainClass,
appDesc.command.arguments.map(substituteVariables) ++ Seq(appId),
appDesc.command.environment,
appDesc.command.classPathEntries,
appDesc.command.libraryPathEntries,
appDesc.command.javaOpts)
CommandUtils.buildCommandSeq(command, memory, sparkHome.getAbsolutePath)
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,15 +17,14 @@

package org.apache.spark.deploy.worker.ui

import java.io.File
import javax.servlet.http.HttpServletRequest

import scala.xml.Node

import org.apache.spark.ui.{WebUIPage, UIUtils}
import org.apache.spark.util.Utils
import org.apache.spark.Logging
import org.apache.spark.util.logging.{FileAppender, RollingFileAppender}
import org.apache.spark.util.logging.RollingFileAppender

private[spark] class LogPage(parent: WorkerWebUI) extends WebUIPage("logPage") with Logging {
private val worker = parent.worker
Expand Down Expand Up @@ -64,11 +63,11 @@ private[spark] class LogPage(parent: WorkerWebUI) extends WebUIPage("logPage") w
val offset = Option(request.getParameter("offset")).map(_.toLong)
val byteLength = Option(request.getParameter("byteLength")).map(_.toInt).getOrElse(defaultBytes)

val (logDir, params) = (appId, executorId, driverId) match {
val (logDir, params, pageName) = (appId, executorId, driverId) match {
case (Some(a), Some(e), None) =>
(s"${workDir.getPath}/$a/$e/", s"appId=$a&executorId=$e")
(s"${workDir.getPath}/$a/$e/", s"appId=$a&executorId=$e", s"$a/$e")
case (None, None, Some(d)) =>
(s"${workDir.getPath}/$d/", s"driverId=$d")
(s"${workDir.getPath}/$d/", s"driverId=$d", d)
case _ =>
throw new Exception("Request must specify either application or driver identifiers")
}
Expand Down Expand Up @@ -120,7 +119,7 @@ private[spark] class LogPage(parent: WorkerWebUI) extends WebUIPage("logPage") w
</div>
</body>
</html>
UIUtils.basicSparkPage(content, logType + " log page for " + appId.getOrElse("unknown app"))
UIUtils.basicSparkPage(content, logType + " log page for " + pageName)
}

/** Get the part of the log files given the offset and desired length of bytes */
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -98,8 +98,13 @@ private[spark] class CoarseGrainedExecutorBackend(
}

private[spark] object CoarseGrainedExecutorBackend extends Logging {
def run(driverUrl: String, executorId: String, hostname: String, cores: Int,
workerUrl: Option[String]) {

private def run(
driverUrl: String,
executorId: String,
hostname: String,
cores: Int,
workerUrl: Option[String]) {

SignalLogger.register(log)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@

package org.apache.spark.scheduler.cluster

import org.apache.spark.{Logging, SparkContext}
import org.apache.spark.{Logging, SparkConf, SparkContext}
import org.apache.spark.deploy.{ApplicationDescription, Command}
import org.apache.spark.deploy.client.{AppClient, AppClientListener}
import org.apache.spark.scheduler.{ExecutorExited, ExecutorLossReason, SlaveLost, TaskSchedulerImpl}
Expand Down Expand Up @@ -46,6 +46,7 @@ private[spark] class SparkDeploySchedulerBackend(
CoarseGrainedSchedulerBackend.ACTOR_NAME)
val args = Seq(driverUrl, "{{EXECUTOR_ID}}", "{{HOSTNAME}}", "{{CORES}}", "{{WORKER_URL}}")
val extraJavaOpts = sc.conf.getOption("spark.executor.extraJavaOptions")
.map(Utils.splitCommandString).getOrElse(Seq.empty)
val classPathEntries = sc.conf.getOption("spark.executor.extraClassPath").toSeq.flatMap { cp =>
cp.split(java.io.File.pathSeparator)
}
Expand All @@ -54,9 +55,11 @@ private[spark] class SparkDeploySchedulerBackend(
cp.split(java.io.File.pathSeparator)
}

val command = Command(
"org.apache.spark.executor.CoarseGrainedExecutorBackend", args, sc.executorEnvs,
classPathEntries, libraryPathEntries, extraJavaOpts)
// Start executors with a few necessary configs for registering with the scheduler
val sparkJavaOpts = Utils.sparkJavaOpts(conf, SparkConf.isExecutorStartupConf)
val javaOpts = sparkJavaOpts ++ extraJavaOpts
val command = Command("org.apache.spark.executor.CoarseGrainedExecutorBackend",
args, sc.executorEnvs, classPathEntries, libraryPathEntries, javaOpts)
val sparkHome = sc.getSparkHome()
val appDesc = new ApplicationDescription(sc.appName, maxCores, sc.executorMemory, command,
sparkHome, sc.ui.appUIAddress, sc.eventLogger.map(_.logDir))
Expand Down
9 changes: 9 additions & 0 deletions core/src/main/scala/org/apache/spark/util/Utils.scala
Original file line number Diff line number Diff line change
Expand Up @@ -1313,4 +1313,13 @@ private[spark] object Utils extends Logging {
s"$className: $desc\n$st"
}

/**
* Convert all spark properties set in the given SparkConf to a sequence of java options.
*/
def sparkJavaOpts(conf: SparkConf, filterKey: (String => Boolean) = _ => true): Seq[String] = {
conf.getAll
.filter { case (k, _) => filterKey(k) }
.map { case (k, v) => s"-D$k=$v" }
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,7 @@ class JsonProtocolSuite extends FunSuite {
}

def createAppDesc(): ApplicationDescription = {
val cmd = new Command("mainClass", List("arg1", "arg2"), Map(), Seq(), Seq())
val cmd = new Command("mainClass", List("arg1", "arg2"), Map(), Seq(), Seq(), Seq())
new ApplicationDescription("name", Some(4), 1234, cmd, Some("sparkHome"), "appUiUrl")
}

Expand All @@ -101,7 +101,7 @@ class JsonProtocolSuite extends FunSuite {

def createDriverCommand() = new Command(
"org.apache.spark.FakeClass", Seq("some arg --and-some options -g foo"),
Map(("K1", "V1"), ("K2", "V2")), Seq("cp1", "cp2"), Seq("lp1", "lp2"), Some("-Dfoo")
Map(("K1", "V1"), ("K2", "V2")), Seq("cp1", "cp2"), Seq("lp1", "lp2"), Seq("-Dfoo")
)

def createDriverDesc() = new DriverDescription("hdfs://some-dir/some.jar", 100, 3,
Expand Down Expand Up @@ -170,7 +170,7 @@ object JsonConstants {
"""
|{"name":"name","cores":4,"memoryperslave":1234,
|"user":"%s","sparkhome":"sparkHome",
|"command":"Command(mainClass,List(arg1, arg2),Map(),List(),List(),None)"}
|"command":"Command(mainClass,List(arg1, arg2),Map(),List(),List(),List())"}
""".format(System.getProperty("user.name", "<unknown>")).stripMargin

val executorRunnerJsonStr =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -200,9 +200,12 @@ class SparkSubmitSuite extends FunSuite with Matchers {
childArgsStr should include regex ("launch spark://h:p .*thejar.jar org.SomeClass arg1 arg2")
mainClass should be ("org.apache.spark.deploy.Client")
classpath should have size (0)
sysProps should have size (3)
sysProps.keys should contain ("spark.jars")
sysProps should have size (5)
sysProps.keys should contain ("SPARK_SUBMIT")
sysProps.keys should contain ("spark.master")
sysProps.keys should contain ("spark.app.name")
sysProps.keys should contain ("spark.jars")
sysProps.keys should contain ("spark.shuffle.spill")
sysProps("spark.shuffle.spill") should be ("false")
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ import org.apache.spark.deploy.{Command, DriverDescription}

class DriverRunnerTest extends FunSuite {
private def createDriverRunner() = {
val command = new Command("mainClass", Seq(), Map(), Seq(), Seq())
val command = new Command("mainClass", Seq(), Map(), Seq(), Seq(), Seq())
val driverDescription = new DriverDescription("jarUrl", 512, 1, true, command)
new DriverRunner("driverId", new File("workDir"), new File("sparkHome"), driverDescription,
null, "akka://1.2.3.4/worker/")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ class ExecutorRunnerTest extends FunSuite {
def f(s:String) = new File(s)
val sparkHome = sys.env.get("SPARK_HOME").orElse(sys.props.get("spark.home"))
val appDesc = new ApplicationDescription("app name", Some(8), 500,
Command("foo", Seq(), Map(), Seq(), Seq()),
Command("foo", Seq(), Map(), Seq(), Seq(), Seq()),
sparkHome, "appUiUrl")
val appId = "12345-worker321-9876"
val er = new ExecutorRunner(appId, 1, appDesc, 8, 500, null, "blah", "worker321", f(sparkHome.getOrElse(".")),
Expand Down

0 comments on commit 4ce92cc

Please sign in to comment.