Skip to content

Commit

Permalink
[SPARK-27369][CORE] Setup resources when Standalone Worker starts up
Browse files Browse the repository at this point in the history
## What changes were proposed in this pull request?

To support GPU-aware scheduling in Standalone (cluster mode), Worker should have ability to setup resources(e.g. GPU/FPGA) when it starts up.

Similar as driver/executor do, Worker has two ways(resourceFile & resourceDiscoveryScript) to setup resources when it starts up.  User could use `SPARK_WORKER_OPTS` to apply resource configs on Worker in the form of "-Dx=y". For example,
```
SPARK_WORKER_OPTS="-Dspark.worker.resource.gpu.amount=2 \
                   -Dspark.worker.resource.fpga.amount=1 \
                   -Dspark.worker.resource.fpga.discoveryScript=/Users/wuyi/tmp/getFPGAResources.sh \
                   -Dspark.worker.resourcesFile=/Users/wuyi/tmp/worker-resource-file"
 ```
## How was this patch tested?

Tested manually in Standalone locally:

- Worker could start up normally when no resources are configured

- Worker should fail to start up when exception threw during setup resources(e.g. unknown directory, parse fail)

- Worker could setup resources from resource file

- Worker could setup resources from discovery scripts

- Worker should setup resources from resource file & discovery scripts when both are configure.

Closes #24841 from Ngone51/dev-worker-resources-setup.

Authored-by: wuyi <[email protected]>
Signed-off-by: Xingbo Jiang <[email protected]>
  • Loading branch information
Ngone51 authored and jiangxb1987 committed Jun 27, 2019
1 parent c277afb commit 7cbe01e
Show file tree
Hide file tree
Showing 3 changed files with 34 additions and 4 deletions.
25 changes: 22 additions & 3 deletions core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,8 @@ import org.apache.spark.internal.config.Tests.IS_TESTING
import org.apache.spark.internal.config.UI._
import org.apache.spark.internal.config.Worker._
import org.apache.spark.metrics.{MetricsSystem, MetricsSystemInstances}
import org.apache.spark.resource.ResourceInformation
import org.apache.spark.resource.ResourceUtils._
import org.apache.spark.rpc._
import org.apache.spark.util.{SparkUncaughtExceptionHandler, ThreadUtils, Utils}

Expand All @@ -54,6 +56,7 @@ private[deploy] class Worker(
workDirPath: String = null,
val conf: SparkConf,
val securityMgr: SecurityManager,
resourceFileOpt: Option[String] = None,
externalShuffleServiceSupplier: Supplier[ExternalShuffleService] = null)
extends ThreadSafeRpcEndpoint with Logging {

Expand Down Expand Up @@ -176,6 +179,9 @@ private[deploy] class Worker(
masterRpcAddresses.length // Make sure we can register with all masters at the same time
)

// visible for tests
private[deploy] var resources: Map[String, ResourceInformation] = _

var coresUsed = 0
var memoryUsed = 0

Expand Down Expand Up @@ -208,6 +214,7 @@ private[deploy] class Worker(
logInfo("Spark home: " + sparkHome)
createWorkDir()
startExternalShuffleService()
setupWorkerResources()
webUi = new WorkerWebUI(this, workDir, webUiPort)
webUi.bind()

Expand All @@ -220,6 +227,16 @@ private[deploy] class Worker(
metricsSystem.getServletHandlers.foreach(webUi.attachHandler)
}

private def setupWorkerResources(): Unit = {
try {
resources = getOrDiscoverAllResources(conf, SPARK_WORKER_PREFIX, resourceFileOpt)
} catch {
case e: Exception =>
logError("Failed to setup worker resources: ", e)
System.exit(1)
}
}

/**
* Change to use the new master.
*
Expand Down Expand Up @@ -785,7 +802,8 @@ private[deploy] object Worker extends Logging {
val conf = new SparkConf
val args = new WorkerArguments(argStrings, conf)
val rpcEnv = startRpcEnvAndEndpoint(args.host, args.port, args.webUiPort, args.cores,
args.memory, args.masters, args.workDir, conf = conf)
args.memory, args.masters, args.workDir, conf = conf,
resourceFileOpt = conf.get(SPARK_WORKER_RESOURCE_FILE))
// With external shuffle service enabled, if we request to launch multiple workers on one host,
// we can only successfully launch the first worker and the rest fails, because with the port
// bound, we may launch no more than one external shuffle service on each host.
Expand All @@ -809,15 +827,16 @@ private[deploy] object Worker extends Logging {
masterUrls: Array[String],
workDir: String,
workerNumber: Option[Int] = None,
conf: SparkConf = new SparkConf): RpcEnv = {
conf: SparkConf = new SparkConf,
resourceFileOpt: Option[String] = None): RpcEnv = {

// The LocalSparkCluster runs multiple local sparkWorkerX RPC Environments
val systemName = SYSTEM_NAME + workerNumber.map(_.toString).getOrElse("")
val securityMgr = new SecurityManager(conf)
val rpcEnv = RpcEnv.create(systemName, host, port, conf, securityMgr)
val masterAddresses = masterUrls.map(RpcAddress.fromSparkURL)
rpcEnv.setupEndpoint(ENDPOINT_NAME, new Worker(rpcEnv, webUiPort, cores, memory,
masterAddresses, ENDPOINT_NAME, workDir, conf, securityMgr))
masterAddresses, ENDPOINT_NAME, workDir, conf, securityMgr, resourceFileOpt))
rpcEnv
}

Expand Down
11 changes: 11 additions & 0 deletions core/src/main/scala/org/apache/spark/internal/config/Worker.scala
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,17 @@ package org.apache.spark.internal.config
import java.util.concurrent.TimeUnit

private[spark] object Worker {
val SPARK_WORKER_PREFIX = "spark.worker"

val SPARK_WORKER_RESOURCE_FILE =
ConfigBuilder("spark.worker.resourcesFile")
.internal()
.doc("Path to a file containing the resources allocated to the worker. " +
"The file should be formatted as a JSON array of ResourceAllocation objects. " +
"Only used internally in standalone mode.")
.stringConf
.createOptional

val WORKER_TIMEOUT = ConfigBuilder("spark.worker.timeout")
.longConf
.createWithDefault(60)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ class WorkerSuite extends SparkFunSuite with Matchers with BeforeAndAfter {
val securityMgr = new SecurityManager(conf)
val rpcEnv = RpcEnv.create("test", "localhost", 12345, conf, securityMgr)
_worker = new Worker(rpcEnv, 50000, 20, 1234 * 5, Array.fill(1)(RpcAddress("1.2.3.4", 1234)),
"Worker", "/tmp", conf, securityMgr, shuffleServiceSupplier)
"Worker", "/tmp", conf, securityMgr, None, shuffleServiceSupplier)
_worker
}

Expand Down

0 comments on commit 7cbe01e

Please sign in to comment.