Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-27369][CORE] Setup resources when Standalone Worker starts up #24841

Closed
wants to merge 7 commits into from

Conversation

Ngone51
Copy link
Member

@Ngone51 Ngone51 commented Jun 11, 2019

What changes were proposed in this pull request?

To support GPU-aware scheduling in Standalone (cluster mode), Worker should have ability to setup resources(e.g. GPU/FPGA) when it starts up.

Similar as driver/executor do, Worker has two ways(resourceFile & resourceDiscoveryScript) to setup resources when it starts up. User could use SPARK_WORKER_OPTS to apply resource configs on Worker in the form of "-Dx=y". For example,

SPARK_WORKER_OPTS="-Dspark.worker.resource.gpu.amount=2 \
                   -Dspark.worker.resource.fpga.amount=1 \
                   -Dspark.worker.resource.fpga.discoveryScript=/Users/wuyi/tmp/getFPGAResources.sh \
                   -Dspark.worker.resourcesFile=/Users/wuyi/tmp/worker-resource-file"

How was this patch tested?

Tested manually in Standalone locally:

  • Worker could start up normally when no resources are configured

  • Worker should fail to start up when exception threw during setup resources(e.g. unknown directory, parse fail)

  • Worker could setup resources from resource file

  • Worker could setup resources from discovery scripts

  • Worker should setup resources from resource file & discovery scripts when both are configure.

@SparkQA
Copy link

SparkQA commented Jun 11, 2019

Test build #106387 has finished for PR 24841 at commit 92c0b52.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@Ngone51
Copy link
Member Author

Ngone51 commented Jun 11, 2019

cc @jiangxb1987 @mengxr @tgravescs

@tgravescs
Copy link
Contributor

pr #24821 is adding in an abstraction layer for the resource handling
I think these will conflict so we need to decide which one to put in first

@Ngone51
Copy link
Member Author

Ngone51 commented Jun 11, 2019

Yeah, I noticed that but haven't go through it. In this PR, Worker has similar behavior with driver/executor to setup resources(something like, resource file first, discovery script second). So, I guess mixing this PR into #24821 may just need some similar modify ?

And, I'm also OK to wait for #24821, since this PR is a smaller one, which is more easy to update.

@@ -220,6 +225,38 @@ private[deploy] class Worker(
metricsSystem.getServletHandlers.foreach(webUi.attachHandler)
}

// TODO if we're starting up multi workers under the same host, discovery script won't work.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How would the resourceFile work under your approach on the scenario of multiple workers launched on the same host? We need to avoid multiple workers access the same external resource address.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For the initial version, I have a base assumption here that user should be responsible for configuring the right resourceFile among multi workers on the same hosts.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

technically a discovery script could work but the writer of that script would have to handle that case inside of the script to assign each worker different resources. Either way we should be sure to document it

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, technically we can do, but I think it may be more troublesome than resourceFile to do it. Document is good, have done in ade97c2.

resources = resourceFile.map { rFile =>
ResourceDiscoverer.parseAllocatedFromJsonFile(rFile)
}.getOrElse {
if (resourceDiscoveryScript.isEmpty) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We don't want to reuse the discoveryScript config?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not sure what you actually mean here. Just explain my understanding. resourceDiscoveryScript is configured from SPARK_WORKER_RESOURCE_DISCOVERY_SCRIPT or --resource-script, which are specially for Worker. And, application(driver/executor) discoveryScipt config haven't setup while Worker is starting up.

ResourceDiscoverer.parseAllocatedFromJsonFile(rFile)
}.getOrElse {
if (resourceDiscoveryScript.isEmpty) {
logWarning(s"Neither resourceFile nor resourceDiscoveryScript found for worker. " +
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure you want to warn here. If users just aren't configuring spark to use other resources there is no reason to warn.

}
} catch {
case t: Throwable =>
logWarning("Failed to setup worker resources: ", t)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

logError

}
}
} catch {
case t: Throwable =>
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is there a reason we catch all Throwable here? I think just Exception makes more sense.

@@ -220,6 +225,38 @@ private[deploy] class Worker(
metricsSystem.getServletHandlers.foreach(webUi.attachHandler)
}

// TODO if we're starting up multi workers under the same host, discovery script won't work.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

technically a discovery script could work but the writer of that script would have to handle that case inside of the script to assign each worker different resources. Either way we should be sure to document it

resourceFile = Some(System.getenv("SPARK_WORKER_RESOURCE_FILE"))
}
if (System.getenv("SPARK_WORKER_RESOURCE_DISCOVERY_SCRIPT") != null) {
parseResourceDiscoveryScript(conf.getenv("SPARK_WORKER_RESOURCE_DISCOVERY_SCRIPT"))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

might be easier to read if we have parseResourceDiscoveryScript return a Map and set it here.

@@ -36,6 +36,8 @@ private[worker] class WorkerArguments(args: Array[String], conf: SparkConf) {
var memory = inferDefaultMemory()
var masters: Array[String] = null
var workDir: String = null
var resourceFile: Option[String] = None
var resourceDiscoveryScript: Map[String, String] = Map()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

= Map.empty might be more clear that this is immutable.

@@ -60,7 +60,7 @@ class WorkerSuite extends SparkFunSuite with Matchers with BeforeAndAfter {
val securityMgr = new SecurityManager(conf)
val rpcEnv = RpcEnv.create("test", "localhost", 12345, conf, securityMgr)
_worker = new Worker(rpcEnv, 50000, 20, 1234 * 5, Array.fill(1)(RpcAddress("1.2.3.4", 1234)),
"Worker", "/tmp", conf, securityMgr, shuffleServiceSupplier)
"Worker", "/tmp", conf, securityMgr, None, Map.empty, shuffleServiceSupplier)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can we add more tests that actually get the resources?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if its easier to do with the next jira for assigments that is fine too

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We need to get Master in to test wether Worker sets up the resources correctly, since Worker could not start up without a living Master. Maybe, we could cover this in the following JIRA task, which would sharing resource info between Master and Worker.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

sounds fine

Copy link
Member

@viirya viirya left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not sure if I understand correctly, but from a curiosity look at the description, executor will also setup resources? Doesn't executor setup it again if Worker already setup it?

@Ngone51
Copy link
Member Author

Ngone51 commented Jun 13, 2019

@viirya IIUC, executor set up the resources from what the worker assigned to it. For example, worker could "split" its own resources to some separate resource files according to Masters' requirements for executors. Then, executor could set up from corresponding resource file when it starts up.

@SparkQA
Copy link

SparkQA commented Jun 13, 2019

Test build #106450 has finished for PR 24841 at commit 14203f5.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@viirya
Copy link
Member

viirya commented Jun 13, 2019

@Ngone51 Sounds reasonable. So the splitting logic will be implemented in following up?

@Ngone51
Copy link
Member Author

Ngone51 commented Jun 13, 2019

@viirya Yeah, I think so.

@jiangxb1987
Copy link
Contributor

@Ngone51 Could you please rebase the PR so we can continue work on this?

@Ngone51
Copy link
Member Author

Ngone51 commented Jun 20, 2019

@jiangxb1987 sure, will do it.

@Ngone51 Ngone51 force-pushed the dev-worker-resources-setup branch from 14203f5 to a1cd955 Compare June 21, 2019 05:53
@SparkQA
Copy link

SparkQA commented Jun 21, 2019

Test build #106750 has finished for PR 24841 at commit a1cd955.

  • This patch fails due to an unknown error code, -9.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Jun 21, 2019

Test build #106752 has finished for PR 24841 at commit 0960bcf.

  • This patch fails due to an unknown error code, -9.
  • This patch merges cleanly.
  • This patch adds no public classes.

@Ngone51
Copy link
Member Author

Ngone51 commented Jun 21, 2019

Jenkins, retest this please.

@SparkQA
Copy link

SparkQA commented Jun 21, 2019

Test build #106759 has finished for PR 24841 at commit 0960bcf.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

ConfigBuilder("spark.worker.resourcesFile")
.internal()
.doc("Path to a file containing the resources allocated to the worker. " +
"The file should be formatted as a JSON array of ResourceInformation objects. " +
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think this is true anymore, I think the json has to be formatted like a ResourceAllocation

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for catching this.

@SparkQA
Copy link

SparkQA commented Jun 24, 2019

Test build #106842 has finished for PR 24841 at commit ba62e35.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

Copy link
Contributor

@jiangxb1987 jiangxb1987 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks good, this is a straightforward change that enables Worker to be able to load resource informations. Resource Informations sync between the master and workers, as well as split of resources among different workers on the same node would be addressed by a separated PR.

@@ -41,6 +41,8 @@ import org.apache.spark.internal.config.Tests.IS_TESTING
import org.apache.spark.internal.config.UI._
import org.apache.spark.internal.config.Worker._
import org.apache.spark.metrics.{MetricsSystem, MetricsSystemInstances}
import org.apache.spark.resource.ResourceInformation
import org.apache.spark.resource.ResourceUtils.getOrDiscoverAllResources
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: I would prefer import org.apache.spark.resource.ResourceUtils._.

@jiangxb1987
Copy link
Contributor

cc @mengxr @tgravescs

@SparkQA
Copy link

SparkQA commented Jun 25, 2019

Test build #106858 has finished for PR 24841 at commit 41a70bd.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

Copy link
Contributor

@WeichenXu123 WeichenXu123 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Two issue:

  • If the discovery script is the format like {"name": "gpu", "addresses": ["0","1"]}, how to avoid launch multiple workers on the same worker node (which will cause resources conflicts)? We should add some code to avoid this case.
  • If the discovery script is the dynamic allocation script for the launching worker, then should we limit the script to allocate exactly the same amount resources with requested amount ? Allocate more amount than requested will be wasted because we're possible to launch multiple workers on the same node ?

@Ngone51
Copy link
Member Author

Ngone51 commented Jun 26, 2019

how to avoid launch multiple workers on the same worker node (which will cause resources conflicts)?

The current plan is to let Master be the role of coordinator to avoid resources conflicts. So, multiple workers are still allowed to be launched on the same node. But if the available resources can't meet the worker's requested amount, it will fail to launch.

If the discovery script is the dynamic allocation script for the launching worker, then should we limit the script to allocate exactly the same amount resources with requested amount ? Allocate more amount than requested will be wasted because we're possible to launch multiple workers on the same node ?

If Master is a coordinator, then the superfluous resources could be allocated to other workers launched on the same node. If no more worker launches, then, those resources could be wasted.

Another possibility is, setting the requested amount to the size of resources found by script if user do not config the resources amount explicitly(similar like cpu, memory do). In this way, once a worker launches on the node, then, no more other worker(using the same script) could launch on the same node(as the first worker has already grabbed all the resources). This may could achieve the goal of issue 1.

@WeichenXu123
Copy link
Contributor

WeichenXu123 commented Jun 26, 2019

@Ngone51

Oh, let me clarify my issues clearly:
Issue1
I said, suppose the case: in machine node A we have provided a discovery script like {"name": "gpu", "addresses": ["0","1"]}, then if we launch 2 workers on node A, then what will happen ? The 2 workers are both allocated gpu-0 and gpu-1. Where's the code to detect this case and then raise error ?

Issue2
I said, for example, suppose we have config worker requesting gpu amount to be 2, but the discovery script allocate more than 2 gpus when this worker launching, is it a good discovery script ? I think the discovery script should exactly allocate the gpu amount the worker requesting, so I suggest the verification in assertAllResourceAllocationsMeetRequests can be "numAllocatedByDiscoveryScript==numRequsted" (current verification is "numAllocatedByDiscoveryScript>=numRequsted")

Thanks :)

@tgravescs
Copy link
Contributor

  1. if you do this then your script has to handle this case. This not impossible its just left up to the user. Or you could have the Master coordinate, but you would need to make sure there isn't some sort of isolation going on, because then the gpu ids could potentially overlap. Really if you do this I would recommend the other method of providing the resource file and then you can split the GPU's between your workers. The script would just give you an automated way to do that.
  2. This can be run in lots of different environments, as long as you have at least the number you requested its fine. The discovery script isn't "allocating". Its discovering whats there. If you happen to have some hosts with 6 gpu's but only want to run with 4 this allows it. Originally the function did have a strict check, but really it would just be a user nice to have thing and I can see some people not wanting it for experimentation/testing. It doesn't functionally matter if the container has more.

@jiangxb1987
Copy link
Contributor

jiangxb1987 commented Jun 26, 2019

Just my 2cents: We may rely on the discovery script to discover all the available resources on the host, but that doesn't means the worker shall occupy them all. My idea is the Worker shall send the information of all available resources to the Master on register, and then the Master shall be responsible for coordination, and tell the Worker which addresses it shall use.

One tricky thing here is when to recycle the resources. Ideally we shall recycle them when a Worker is lost, but as Tom mentioned, a lost Worker may be still running, so there is possibility that the resources addresses are still being used. Instead, we shall make Worker to explicitly send a ReleaseResource message to the Master to release the resource addresses it no longer need. This way we may completely avoid resource address collision, but when a Worker dies silently, we may never recycle the resources allocated to it.

Issue1
I said, suppose the case: in machine node A we have provided a discovery script like {"name": "gpu", "addresses": ["0","1"]}, then if we launch 2 workers on node A, then what will happen ? The 2 workers are both allocated gpu-0 and gpu-1. Where's the code to detect this case and then raise error ?

The Master will keep the information that which addresses have been allocated to a Worker, so when a new Worker joins, the Master will detect which addresses are free, and make sure to only allocate the free addresses to it.

Issue2
I said, for example, suppose we have config worker requesting gpu amount to be 2, but the discovery script allocate more than 2 gpus when this worker launching, is it a good discovery script ? I think the discovery script should exactly allocate the gpu amount the worker requesting, so I suggest the verification in assertAllResourceAllocationsMeetRequests can be "numAllocatedByDiscoveryScript==numRequsted" (current verification is "numAllocatedByDiscoveryScript>=numRequsted")

We don't require the number of addresses detected by the discovery script to be exactly the same as the request, because the assumption is too strong, and it may happen that the hardware on a node may change over time too (we may add new devices, and devices may broken). So we only require the number of discover addresses is greater than the request, and Spark will ensure we only use the requested number of devices.

@Ngone51
Copy link
Member Author

Ngone51 commented Jun 27, 2019

Where's the code to detect this case and then raise error ?

This part is planning to be done in the following PR :) @WeichenXu123

@jiangxb1987
Copy link
Contributor

Thanks, merged to master!

@Ngone51
Copy link
Member Author

Ngone51 commented Jun 27, 2019

Thank you @jiangxb1987 @tgravescs @viirya @WeichenXu123

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

7 participants