-
Notifications
You must be signed in to change notification settings - Fork 28.5k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[SPARK-4140] Document dynamic allocation #3731
Changes from 5 commits
53cff58
6827b56
8c64004
246fb44
b9843f2
1281447
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -56,6 +56,112 @@ the same RDDs. For example, the [Shark](http://shark.cs.berkeley.edu) JDBC serve | |
queries. In future releases, in-memory storage systems such as [Tachyon](http://tachyon-project.org) will | ||
provide another approach to share RDDs. | ||
|
||
## Dynamic Resource Allocation | ||
|
||
Spark 1.2 introduces the ability to dynamically scale the set of cluster resources allocated to | ||
your application up and down based on the workload. This means that your application may give | ||
resources back to the cluster if they are no longer used and request them again later when there | ||
is demand. This feature is particularly useful if multiple applications share resources in your | ||
Spark cluster. If a subset of the resources allocated to an application becomes idle, it can be | ||
returned to the cluster's pool of resources and acquired by other applications. In Spark, dynamic | ||
resource allocation is performed on the granularity of the executor and can be enabled through | ||
`spark.dynamicAllocation.enabled`. | ||
|
||
This feature is currently disabled by default and available only on [YARN](running-on-yarn.html). | ||
A future release will extend this to [standalone mode](spark-standalone.html) and | ||
[Mesos coarse-grained mode](running-on-mesos.html#mesos-run-modes). Note that although Spark on | ||
Mesos already has a similar notion of dynamic resource sharing in fine-grained mode, enabling | ||
dynamic allocation allows your Mesos application to take advantage of coarse-grained low-latency | ||
scheduling while sharing cluster resources efficiently. | ||
|
||
Lastly, it is worth noting that Spark's dynamic resource allocation mechanism is cooperative. | ||
This means if a Spark application enables this feature, other applications on the same cluster | ||
are also expected to do so. Otherwise, the cluster's resources will end up being unfairly | ||
distributed to the applications that do not voluntarily give up unused resources they have | ||
acquired. | ||
|
||
### Configuration and Setup | ||
|
||
All configurations used by this feature live under the `spark.dynamicAllocation.*` namespace. | ||
To enable this feature, your application must set `spark.dynamicAllocation.enabled` to `true` and | ||
provide lower and upper bounds for the number of executors through | ||
`spark.dynamicAllocation.minExecutors` and `spark.dynamicAllocation.maxExecutors`. Other relevant | ||
configurations are described on the [configurations page](configuration.html#dynamic-allocation) | ||
and in the subsequent sections in detail. | ||
|
||
Additionally, your application must use an external shuffle service (described below). To enable | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. It would be nice to add a short clause explaining why this is the case |
||
this, set `spark.shuffle.service.enabled` to `true`. In YARN, this external shuffle service is | ||
implemented in `org.apache.spark.yarn.network.YarnShuffleService` that runs in each `NodeManager` | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Should this be broken out into a separate section for users that don't care about dynamic allocation, but want to learn how to use the external shuffle service? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. +1 to add how to use external shuffle service since we need to enable external shuffle service to use dynamic allocation. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This is just a reference to the external shuffle service. The actual documentation for the service will be in its own section elsewhere outside of this patch. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Make sense to me. How about you, @sryza? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Are you saying this section will be replaced with a pointer to the external shuffle service doc once it's added? If so, looks good to me. |
||
in your cluster. To start this service, follow these steps: | ||
|
||
1. Build Spark with the [YARN profile](building-spark.html). Skip this step if you are using a | ||
pre-packaged distribution. | ||
2. Locate the `spark-<version>-yarn-shuffle.jar`. This should be under | ||
`$SPARK_HOME/network/yarn/target/scala-<version>` if you are building Spark yourself, and under | ||
`lib` if you are using a distribution. | ||
2. Add this jar to the classpath of all `NodeManager`s in your cluster. | ||
3. In the `yarn-site.xml` on each node, add `spark_shuffle` to `yarn.nodemanager.aux-services`, | ||
then set `yarn.nodemanager.aux-services.spark_shuffle.class` to | ||
`org.apache.spark.yarn.network.YarnShuffleService`. Additionally, set all relevant | ||
`spark.shuffle.service.*` [configurations](configuration.html). | ||
4. Restart all `NodeManager`s in your cluster. | ||
|
||
### Resource Allocation Policy | ||
|
||
On a high level, Spark should relinquish executors when they are no longer used and acquire | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Nit: I think should be "At a high level" or "From a high level" |
||
executors when they are needed. Since there is no definitive way to predict whether an executor | ||
that is about to be removed will run a task in the near future, or whether a new executor that is | ||
about to be added will actually be idle, we need a set of heuristics to determine when to remove | ||
and request executors. | ||
|
||
#### Request Policy | ||
|
||
A Spark application with dynamic allocation enabled requests additional executors when it has | ||
pending tasks waiting to be scheduled. This condition necessarily implies that the existing set | ||
of executors is insufficient to simultaneously saturate all tasks that have been submitted but | ||
not yet finished. | ||
|
||
Spark requests executors in rounds. The actual request is triggered when there have been pending | ||
tasks for `spark.dynamicAllocation.schedulerBacklogTimeout` seconds, and then triggered again | ||
every `spark.dynamicAllocation.sustainedSchedulerBacklogTimeout` seconds thereafter if the queue | ||
of pending tasks persists. Additionally, the number of executors requested in each round increases | ||
exponentially from the previous round. For instance, an application will add 1 executor in the | ||
first round, and then 2, 4, 8 and so on executors in the subsequent rounds. | ||
|
||
The motivation for an exponential increase policy is twofold. First, an application should request | ||
executors cautiously in the beginning in case it turns out that only a few additional executors is | ||
sufficient. This echoes the justification for TCP slow start. Second, the application should be | ||
able to ramp up its resource usage in a timely manner in case it turns out that many executors are | ||
actually needed. | ||
|
||
#### Remove Policy | ||
|
||
The policy for removing executors is much simpler. A Spark application removes an executor when | ||
it has been idle for more than `spark.dynamicAllocation.executorIdleTimeout` seconds. Note that, | ||
under most circumstances, this condition is mutually exclusive with the request condition, in that | ||
an executor should not be idle if there are still pending tasks to be scheduled. | ||
|
||
### Graceful Decommission of Executors | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This section should mention issues with caching data. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. you're right |
||
|
||
Before dynamic allocation, a Spark executor exits either on failure or when the associated | ||
application has also exited. In both scenarios, all state associated with the executor is no | ||
longer needed and can be safely discarded. With dynamic allocation, however, the application | ||
is still running when an executor is explicitly removed. If the application attempts to access | ||
state stored in or written by the executor, it will have to perform a recompute the state. Thus, | ||
Spark needs a mechanism to decommission an executor gracefully by preserving its state before | ||
removing it. | ||
|
||
This requirement is especially important for shuffles. During a shuffle, the Spark executor first | ||
writes its own map outputs locally to disk, and then acts as the server for those files when other | ||
executors attempt to fetch them. In the event of stragglers, which are tasks that run for much | ||
longer than their peers, dynamic allocation may remove an executor before the shuffle completes, | ||
in which case the shuffle files written by that executor must be recomputed unnecessarily. | ||
|
||
The solution for preserving shuffle files is to use an external shuffle service, also introduced | ||
in Spark 1.2. This service refers to a long-running process that runs on each node of your cluster | ||
independently of your Spark applications and their executors. If the service is enabled, Spark | ||
executors will fetch shuffle files from the service instead of from each other. This means any | ||
shuffle state written by an executor may continue to be served beyond the executor's lifetime. | ||
|
||
# Scheduling Within an Application | ||
|
||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I would possibly rephrase or leave this paragraph out, as there are situations where different dynamicAllocation.enabled settings for different applications are reasonable. I.e. a cluster might have some production applications that need a static allocation to cache data and respond to queries as fast as possible, while others might be interactive and have highly varying resource use. YARN is meant to take care of the fairness aspect.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
fair enough