-
Notifications
You must be signed in to change notification settings - Fork 28.5k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[SPARK-27369][CORE] Setup resources when Standalone Worker starts up #24841
Conversation
Test build #106387 has finished for PR 24841 at commit
|
pr #24821 is adding in an abstraction layer for the resource handling |
Yeah, I noticed that but haven't go through it. In this PR, Worker has similar behavior with driver/executor to setup resources(something like, resource file first, discovery script second). So, I guess mixing this PR into #24821 may just need some similar modify ? And, I'm also OK to wait for #24821, since this PR is a smaller one, which is more easy to update. |
@@ -220,6 +225,38 @@ private[deploy] class Worker( | |||
metricsSystem.getServletHandlers.foreach(webUi.attachHandler) | |||
} | |||
|
|||
// TODO if we're starting up multi workers under the same host, discovery script won't work. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
How would the resourceFile
work under your approach on the scenario of multiple workers launched on the same host? We need to avoid multiple workers access the same external resource address.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
For the initial version, I have a base assumption here that user should be responsible for configuring the right resourceFile
among multi workers on the same hosts.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
technically a discovery script could work but the writer of that script would have to handle that case inside of the script to assign each worker different resources. Either way we should be sure to document it
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah, technically we can do, but I think it may be more troublesome than resourceFile
to do it. Document is good, have done in ade97c2.
resources = resourceFile.map { rFile => | ||
ResourceDiscoverer.parseAllocatedFromJsonFile(rFile) | ||
}.getOrElse { | ||
if (resourceDiscoveryScript.isEmpty) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We don't want to reuse the discoveryScript config?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Not sure what you actually mean here. Just explain my understanding. resourceDiscoveryScript
is configured from SPARK_WORKER_RESOURCE_DISCOVERY_SCRIPT
or --resource-script
, which are specially for Worker. And, application(driver/executor) discoveryScipt config haven't setup while Worker is starting up.
ResourceDiscoverer.parseAllocatedFromJsonFile(rFile) | ||
}.getOrElse { | ||
if (resourceDiscoveryScript.isEmpty) { | ||
logWarning(s"Neither resourceFile nor resourceDiscoveryScript found for worker. " + |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm not sure you want to warn here. If users just aren't configuring spark to use other resources there is no reason to warn.
} | ||
} catch { | ||
case t: Throwable => | ||
logWarning("Failed to setup worker resources: ", t) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
logError
} | ||
} | ||
} catch { | ||
case t: Throwable => |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
is there a reason we catch all Throwable here? I think just Exception makes more sense.
@@ -220,6 +225,38 @@ private[deploy] class Worker( | |||
metricsSystem.getServletHandlers.foreach(webUi.attachHandler) | |||
} | |||
|
|||
// TODO if we're starting up multi workers under the same host, discovery script won't work. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
technically a discovery script could work but the writer of that script would have to handle that case inside of the script to assign each worker different resources. Either way we should be sure to document it
resourceFile = Some(System.getenv("SPARK_WORKER_RESOURCE_FILE")) | ||
} | ||
if (System.getenv("SPARK_WORKER_RESOURCE_DISCOVERY_SCRIPT") != null) { | ||
parseResourceDiscoveryScript(conf.getenv("SPARK_WORKER_RESOURCE_DISCOVERY_SCRIPT")) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
might be easier to read if we have parseResourceDiscoveryScript return a Map and set it here.
@@ -36,6 +36,8 @@ private[worker] class WorkerArguments(args: Array[String], conf: SparkConf) { | |||
var memory = inferDefaultMemory() | |||
var masters: Array[String] = null | |||
var workDir: String = null | |||
var resourceFile: Option[String] = None | |||
var resourceDiscoveryScript: Map[String, String] = Map() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
= Map.empty might be more clear that this is immutable.
@@ -60,7 +60,7 @@ class WorkerSuite extends SparkFunSuite with Matchers with BeforeAndAfter { | |||
val securityMgr = new SecurityManager(conf) | |||
val rpcEnv = RpcEnv.create("test", "localhost", 12345, conf, securityMgr) | |||
_worker = new Worker(rpcEnv, 50000, 20, 1234 * 5, Array.fill(1)(RpcAddress("1.2.3.4", 1234)), | |||
"Worker", "/tmp", conf, securityMgr, shuffleServiceSupplier) | |||
"Worker", "/tmp", conf, securityMgr, None, Map.empty, shuffleServiceSupplier) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
can we add more tests that actually get the resources?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
if its easier to do with the next jira for assigments that is fine too
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We need to get Master in to test wether Worker sets up the resources correctly, since Worker could not start up without a living Master. Maybe, we could cover this in the following JIRA task, which would sharing resource info between Master and Worker.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
sounds fine
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Not sure if I understand correctly, but from a curiosity look at the description, executor will also setup resources? Doesn't executor setup it again if Worker already setup it?
@viirya IIUC, executor set up the resources from what the worker assigned to it. For example, worker could "split" its own resources to some separate resource files according to Masters' requirements for executors. Then, executor could set up from corresponding resource file when it starts up. |
Test build #106450 has finished for PR 24841 at commit
|
@Ngone51 Sounds reasonable. So the splitting logic will be implemented in following up? |
@viirya Yeah, I think so. |
@Ngone51 Could you please rebase the PR so we can continue work on this? |
@jiangxb1987 sure, will do it. |
14203f5
to
a1cd955
Compare
Test build #106750 has finished for PR 24841 at commit
|
Test build #106752 has finished for PR 24841 at commit
|
Jenkins, retest this please. |
Test build #106759 has finished for PR 24841 at commit
|
ConfigBuilder("spark.worker.resourcesFile") | ||
.internal() | ||
.doc("Path to a file containing the resources allocated to the worker. " + | ||
"The file should be formatted as a JSON array of ResourceInformation objects. " + |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't think this is true anymore, I think the json has to be formatted like a ResourceAllocation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for catching this.
Test build #106842 has finished for PR 24841 at commit
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks good, this is a straightforward change that enables Worker to be able to load resource informations. Resource Informations sync between the master and workers, as well as split of resources among different workers on the same node would be addressed by a separated PR.
@@ -41,6 +41,8 @@ import org.apache.spark.internal.config.Tests.IS_TESTING | |||
import org.apache.spark.internal.config.UI._ | |||
import org.apache.spark.internal.config.Worker._ | |||
import org.apache.spark.metrics.{MetricsSystem, MetricsSystemInstances} | |||
import org.apache.spark.resource.ResourceInformation | |||
import org.apache.spark.resource.ResourceUtils.getOrDiscoverAllResources |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: I would prefer import org.apache.spark.resource.ResourceUtils._
.
Test build #106858 has finished for PR 24841 at commit
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Two issue:
- If the discovery script is the format like
{"name": "gpu", "addresses": ["0","1"]}
, how to avoid launch multiple workers on the same worker node (which will cause resources conflicts)? We should add some code to avoid this case. - If the discovery script is the dynamic allocation script for the launching worker, then should we limit the script to allocate exactly the same amount resources with requested amount ? Allocate more amount than requested will be wasted because we're possible to launch multiple workers on the same node ?
The current plan is to let Master be the role of coordinator to avoid resources conflicts. So, multiple workers are still allowed to be launched on the same node. But if the available resources can't meet the worker's requested amount, it will fail to launch.
If Master is a coordinator, then the superfluous resources could be allocated to other workers launched on the same node. If no more worker launches, then, those resources could be wasted. Another possibility is, setting the requested amount to the size of resources found by script if user do not config the resources amount explicitly(similar like cpu, memory do). In this way, once a worker launches on the node, then, no more other worker(using the same script) could launch on the same node(as the first worker has already grabbed all the resources). This may could achieve the goal of issue 1. |
Oh, let me clarify my issues clearly: Issue2 Thanks :) |
|
Just my 2cents: We may rely on the discovery script to discover all the available resources on the host, but that doesn't means the worker shall occupy them all. My idea is the Worker shall send the information of all available resources to the Master on register, and then the Master shall be responsible for coordination, and tell the Worker which addresses it shall use. One tricky thing here is when to recycle the resources. Ideally we shall recycle them when a Worker is lost, but as Tom mentioned, a lost Worker may be still running, so there is possibility that the resources addresses are still being used. Instead, we shall make Worker to explicitly send a
The Master will keep the information that which addresses have been allocated to a Worker, so when a new Worker joins, the Master will detect which addresses are free, and make sure to only allocate the free addresses to it.
We don't require the number of addresses detected by the discovery script to be exactly the same as the request, because the assumption is too strong, and it may happen that the hardware on a node may change over time too (we may add new devices, and devices may broken). So we only require the number of discover addresses is greater than the request, and Spark will ensure we only use the requested number of devices. |
This part is planning to be done in the following PR :) @WeichenXu123 |
Thanks, merged to master! |
Thank you @jiangxb1987 @tgravescs @viirya @WeichenXu123 |
What changes were proposed in this pull request?
To support GPU-aware scheduling in Standalone (cluster mode), Worker should have ability to setup resources(e.g. GPU/FPGA) when it starts up.
Similar as driver/executor do, Worker has two ways(resourceFile & resourceDiscoveryScript) to setup resources when it starts up. User could use
SPARK_WORKER_OPTS
to apply resource configs on Worker in the form of "-Dx=y". For example,How was this patch tested?
Tested manually in Standalone locally:
Worker could start up normally when no resources are configured
Worker should fail to start up when exception threw during setup resources(e.g. unknown directory, parse fail)
Worker could setup resources from resource file
Worker could setup resources from discovery scripts
Worker should setup resources from resource file & discovery scripts when both are configure.