From 53cff5840923af81e048c0333117dc48163e4408 Mon Sep 17 00:00:00 2001 From: Tsuyoshi Ozawa Date: Sat, 13 Dec 2014 16:34:25 +0900 Subject: [PATCH 1/5] Adding a documentation about dynamic resource allocation. Signed-off-by: Tsuyoshi Ozawa --- docs/configuration.md | 49 +++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 49 insertions(+) diff --git a/docs/configuration.md b/docs/configuration.md index 64aa94f622afa..0e8eff405e719 100644 --- a/docs/configuration.md +++ b/docs/configuration.md @@ -1142,6 +1142,55 @@ Apart from these, the following properties are also available, and may be useful +#### Dynamic allocation + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + +
Property NameDefaultMeaning
spark.dynamicAllocation.enabledfalse + Enabling dynamic resource allocation. + Note that spark.shuffle.service.enabled need to be also true if cluster mode is YARN. +
spark.dynamicAllocation.minExecutors + Minimum value of executors when dynamic allocation is enabled. +
spark.dynamicAllocation.maxExecutors + Maximum value of executors when dynamic allocation is enabled. +
spark.dynamicAllocation.schedulerBacklogTimeout + If there are backlogged tasks for this duration, add new executors. +
spark.dynamicAllocation.sustainedSchedulerBacklogTimeout + If the backlog is sustained for this duration, add more executors + This is used only after the initial backlog timeout is exceeded. +
spark.dynamicAllocation.executorIdleTimeout + If an executor has been idle for this duration, remove it. +
+ #### Cluster Managers Each cluster manager in Spark has additional configuration options. Configurations can be found on the pages for each mode: From 6827b56e37e25d64ce52af1caad742563c2bdb40 Mon Sep 17 00:00:00 2001 From: Tsuyoshi Ozawa Date: Wed, 17 Dec 2014 02:14:15 +0900 Subject: [PATCH 2/5] Fixing a documentation of spark.dynamicAllocation.enabled. Signed-off-by: Tsuyoshi Ozawa --- docs/configuration.md | 28 ++++++++++++++++++++-------- 1 file changed, 20 insertions(+), 8 deletions(-) diff --git a/docs/configuration.md b/docs/configuration.md index 0e8eff405e719..dbd1d095d07a5 100644 --- a/docs/configuration.md +++ b/docs/configuration.md @@ -1149,42 +1149,54 @@ Apart from these, the following properties are also available, and may be useful spark.dynamicAllocation.enabled false - Enabling dynamic resource allocation. + Enabling dynamic allocations and removals of executors based on the workload. + The add policy depends on whether there are backlogged tasks waiting to be scheduled. If + the scheduler queue is not drained in N seconds, then new executors are added. + N is configured via spark.dynamicAllocation.schedulerBacklogTimeout and + spark.dynamicAllocation.sustainedSchedulerBacklogTimeout. + If the queue persists for another M seconds, then more executors are added and so on. + M is configured via spark.dynamicAllocation.executorIdleTimeout. + The number added in each round increases exponentially from the previous round until an upper bound on the + number of executors has been reached. The upper bound is based both on a configured property + and on the number of tasks pending: the policy will never increase the number of executor + requests past the number needed to handle all pending tasks. + To enable this feature, spark.dynamicAllocation.minExecutors and + spark.dynamicAllocation.maxExecutors must be configured. Note that spark.shuffle.service.enabled need to be also true if cluster mode is YARN. spark.dynamicAllocation.minExecutors - + -1 Minimum value of executors when dynamic allocation is enabled. spark.dynamicAllocation.maxExecutors - + -1 Maximum value of executors when dynamic allocation is enabled. - spark.dynamicAllocation.schedulerBacklogTimeout - + spark.dynamicAllocation.schedulerBacklogTimeout + 60 If there are backlogged tasks for this duration, add new executors. spark.dynamicAllocation.sustainedSchedulerBacklogTimeout - + 60 - If the backlog is sustained for this duration, add more executors + If the backlog is sustained for this duration, add more executors. This is used only after the initial backlog timeout is exceeded. spark.dynamicAllocation.executorIdleTimeout - + 600 If an executor has been idle for this duration, remove it. From 8c64004029ece1700fb5930ba1e9ddc19c1c3bd1 Mon Sep 17 00:00:00 2001 From: Andrew Or Date: Wed, 17 Dec 2014 22:35:38 -0800 Subject: [PATCH 3/5] Add documentation for dynamic allocation (without configs) --- docs/job-scheduling.md | 106 +++++++++++++++++++++++++++++++++++++++++ 1 file changed, 106 insertions(+) diff --git a/docs/job-scheduling.md b/docs/job-scheduling.md index 94604f301dd46..584d8de1bd681 100644 --- a/docs/job-scheduling.md +++ b/docs/job-scheduling.md @@ -56,6 +56,112 @@ the same RDDs. For example, the [Shark](http://shark.cs.berkeley.edu) JDBC serve queries. In future releases, in-memory storage systems such as [Tachyon](http://tachyon-project.org) will provide another approach to share RDDs. +## Dynamic Resource Allocation + +Spark 1.2 introduces the ability to dynamically scale the set of cluster resources allocated to +your application up and down based on the workload. This means that your application may give +resources back to the cluster if they are no longer used and request them again later when there +is demand. This feature is particularly useful if multiple applications share resources in your +Spark cluster. If a subset of the resources allocated to an application becomes idle, it can be +returned to the cluster's pool of resources and acquired by other applications. In Spark, dynamic +resource allocation is performed on the granularity of the executor and can be enabled through +`spark.dynamicAllocation.enabled`. + +This feature is currently disabled by default and available only on [YARN](running-on-yarn.html). +A future release will extend this to [standalone mode](spark-standalone.html) and +[Mesos coarse-grained mode](running-on-mesos.html#mesos-run-modes). Note that although Spark on +Mesos already has a similar notion of dynamic resource sharing in fine-grained mode, enabling +dynamic allocation allows your Mesos application to take advantage of coarse-grained low-latency +scheduling while sharing cluster resources efficiently. + +Lastly, it is worth noting that Spark's dynamic resource allocation mechanism is cooperative. +This means if a Spark application enables this feature, other applications on the same cluster +are also expected to do so. Otherwise, the cluster's resources will end up being unfairly +distributed to the applications that do not voluntarily give up unused resources they have +acquired. + +### Configuration and Setup + +All configurations used by this feature live under the `spark.dynamicAllocation.*` namespace. +To enable this feature, your application must set `spark.dynamicAllocation.enabled` to `true` and +provide lower and upper bounds for the number of executors through +`spark.dynamicAllocation.minExecutors` and `spark.dynamicAllocation.maxExecutors`. Other relevant +configurations are described on the [configurations page](configuration.html) and in the subsequent +sections in detail. + +Additionally, your application must use an external shuffle service (described below). To enable +this, set `spark.shuffle.service.enabled` to `true`. In YARN, this external shuffle service is +implemented in `org.apache.spark.yarn.network.YarnShuffleService` that runs in each `NodeManager` +in your cluster. To start this service, follow these steps: + +1. Build Spark with the [YARN profile](building-spark.html). Skip this step if you are using a +pre-packaged distribution. +2. Locate the `spark--yarn-shuffle.jar`. This should be under +`$SPARK_HOME/network/yarn/target/scala-` if you are building Spark yourself, and under +`lib` if you are using a distribution. +2. Add this jar to the classpath of all `NodeManager`s in your cluster. +3. In the `yarn-site.xml` on each node, add `spark_shuffle` to `yarn.nodemanager.aux-services`, +then set `yarn.nodemanager.aux-services.spark_shuffle.class` to +`org.apache.spark.yarn.network.YarnShuffleService`. Additionally, set all relevant +`spark.shuffle.service.*` [configurations](configuration.html). +4. Restart all `NodeManager`s in your cluster. + +### Resource Allocation Policy + +On a high level, Spark should relinquish executors when they are no longer used and acquire +executors when they are needed. Since there is no definitive way to predict whether an executor +that is about to be removed will run a task in the near future, or whether a new executor that is +about to be added will actually be idle, we need a set of heuristics to determine when to remove +and request executors. + +#### Request Policy + +A Spark application with dynamic allocation enabled requests additional executors when it has +pending tasks waiting to be scheduled. This condition necessarily implies that the existing set +of executors is insufficient to simultaneously saturate all tasks that have been submitted but +not yet finished. + +Spark requests executors in rounds. The actual request is triggered when there have been pending +tasks for `spark.dynamicAllocation.schedulerBacklogTimeout` seconds, and then triggered again +every `spark.dynamicAllocation.sustainedSchedulerBacklogTimeout` seconds thereafter if the queue +of pending tasks persists. Additionally, the number of executors requested in each round increases +exponentially from the previous round. For instance, an application will add 1 executor in the +first round, and then 2, 4, 8 and so on executors in the subsequent rounds. + +The motivation for an exponential increase policy is twofold. First, an application should request +executors cautiously in the beginning in case it turns out that only a few additional executors is +sufficient. This echoes the justification for TCP slow start. Second, the application should be +able to ramp up its resource usage in a timely manner in case it turns out that many executors are +actually needed. + +#### Remove Policy + +The policy for removing executors is much simpler. A Spark application removes an executor when +it has been idle for more than `spark.dynamicAllocation.executorIdleTimeout` seconds. Note that, +under most circumstances, this condition is mutually exclusive with the request condition, in that +an executor should not be idle if there are still pending tasks to be scheduled. + +### Graceful Decommission of Executors + +Before dynamic allocation, a Spark executor exits either on failure or when the associated +application has also exited. In both scenarios, all state associated with the executor is no +longer needed and can be safely discarded. With dynamic allocation, however, the application +is still running when an executor is explicitly removed. If the application attempts to access +state stored in or written by the executor, it will have to perform a recompute the state. Thus, +Spark needs a mechanism to decommission an executor gracefully by preserving its state before +removing it. + +This requirement is especially important for shuffles. During a shuffle, the Spark executor first +writes its own map outputs locally to disk, and then acts as the server for those files when other +executors attempt to fetch them. In the event of stragglers, which are tasks that run for much +longer than their peers, dynamic allocation may remove an executor before the shuffle completes, +in which case the shuffle files written by that executor must be recomputed unnecessarily. + +The solution for preserving shuffle files is to use an external shuffle service, also introduced +in Spark 1.2. This service refers to a long-running process that runs on each node of your cluster +independently of your Spark applications and their executors. If the service is enabled, Spark +executors will fetch shuffle files from the service instead of from each other. This means any +shuffle state written by an executor may continue to be served beyond the executor's lifetime. # Scheduling Within an Application From b9843f2c673f30c5111f2a2a29e15dcde00042db Mon Sep 17 00:00:00 2001 From: Andrew Or Date: Wed, 17 Dec 2014 22:50:25 -0800 Subject: [PATCH 4/5] Document the configs as well --- docs/configuration.md | 122 ++++++++++++++++++++--------------------- docs/job-scheduling.md | 4 +- 2 files changed, 63 insertions(+), 63 deletions(-) diff --git a/docs/configuration.md b/docs/configuration.md index dbd1d095d07a5..2c8dea869b092 100644 --- a/docs/configuration.md +++ b/docs/configuration.md @@ -1008,6 +1008,67 @@ Apart from these, the following properties are also available, and may be useful +#### Dynamic allocation + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + +
Property NameDefaultMeaning
spark.dynamicAllocation.enabledfalse + Whether to use dynamic resource allocation, which scales the number of executors registered + with this application up and down based on the workload. Note that this is currently only + available on YARN mode. For more detail, see the description + here. +

+ This requires the following configurations to be set: + spark.dynamicAllocation.minExecutors, + spark.dynamicAllocation.maxExecutors, and + spark.shuffle.service.enabled +
spark.dynamicAllocation.minExecutors(none) + Lower bound for the number of executors if dynamic allocation is enabled (required). +
spark.dynamicAllocation.maxExecutors(none) + Upper bound for the number of executors if dynamic allocation is enabled (required). +
spark.dynamicAllocation.schedulerBacklogTimeout60 + If dynamic allocation is enabled and there have been pending tasks backlogged for more than + this duration (in seconds), new executors will be requested. For more detail, see this + description. +
spark.dynamicAllocation.sustainedSchedulerBacklogTimeoutschedulerBacklogTimeout + Same as spark.dynamicAllocation.schedulerBacklogTimeout, but used only for + subsequent executor requests. For more detail, see this + description. +
spark.dynamicAllocation.executorIdleTimeout600 + If dynamic allocation is enabled and an executor has been idle for more than this duration + (in seconds), the executor will be removed. For more detail, see this + description. +
+ #### Security @@ -1142,67 +1203,6 @@ Apart from these, the following properties are also available, and may be useful
Property NameDefaultMeaning
-#### Dynamic allocation - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
Property NameDefaultMeaning
spark.dynamicAllocation.enabledfalse - Enabling dynamic allocations and removals of executors based on the workload. - The add policy depends on whether there are backlogged tasks waiting to be scheduled. If - the scheduler queue is not drained in N seconds, then new executors are added. - N is configured via spark.dynamicAllocation.schedulerBacklogTimeout and - spark.dynamicAllocation.sustainedSchedulerBacklogTimeout. - If the queue persists for another M seconds, then more executors are added and so on. - M is configured via spark.dynamicAllocation.executorIdleTimeout. - The number added in each round increases exponentially from the previous round until an upper bound on the - number of executors has been reached. The upper bound is based both on a configured property - and on the number of tasks pending: the policy will never increase the number of executor - requests past the number needed to handle all pending tasks. - To enable this feature, spark.dynamicAllocation.minExecutors and - spark.dynamicAllocation.maxExecutors must be configured. - Note that spark.shuffle.service.enabled need to be also true if cluster mode is YARN. -
spark.dynamicAllocation.minExecutors-1 - Minimum value of executors when dynamic allocation is enabled. -
spark.dynamicAllocation.maxExecutors-1 - Maximum value of executors when dynamic allocation is enabled. -
spark.dynamicAllocation.schedulerBacklogTimeout60 - If there are backlogged tasks for this duration, add new executors. -
spark.dynamicAllocation.sustainedSchedulerBacklogTimeout60 - If the backlog is sustained for this duration, add more executors. - This is used only after the initial backlog timeout is exceeded. -
spark.dynamicAllocation.executorIdleTimeout600 - If an executor has been idle for this duration, remove it. -
- #### Cluster Managers Each cluster manager in Spark has additional configuration options. Configurations can be found on the pages for each mode: diff --git a/docs/job-scheduling.md b/docs/job-scheduling.md index 584d8de1bd681..78c61dce2ea1f 100644 --- a/docs/job-scheduling.md +++ b/docs/job-scheduling.md @@ -86,8 +86,8 @@ All configurations used by this feature live under the `spark.dynamicAllocation. To enable this feature, your application must set `spark.dynamicAllocation.enabled` to `true` and provide lower and upper bounds for the number of executors through `spark.dynamicAllocation.minExecutors` and `spark.dynamicAllocation.maxExecutors`. Other relevant -configurations are described on the [configurations page](configuration.html) and in the subsequent -sections in detail. +configurations are described on the [configurations page](configuration.html#dynamic-allocation) +and in the subsequent sections in detail. Additionally, your application must use an external shuffle service (described below). To enable this, set `spark.shuffle.service.enabled` to `true`. In YARN, this external shuffle service is From 1281447a71f87e1a45b7a8b2ef7faabc3d300fc1 Mon Sep 17 00:00:00 2001 From: Andrew Or Date: Fri, 19 Dec 2014 19:18:03 -0800 Subject: [PATCH 5/5] Address a few comments --- docs/job-scheduling.md | 22 ++++++++++++---------- 1 file changed, 12 insertions(+), 10 deletions(-) diff --git a/docs/job-scheduling.md b/docs/job-scheduling.md index 78c61dce2ea1f..dfbb871f01d2c 100644 --- a/docs/job-scheduling.md +++ b/docs/job-scheduling.md @@ -74,12 +74,6 @@ Mesos already has a similar notion of dynamic resource sharing in fine-grained m dynamic allocation allows your Mesos application to take advantage of coarse-grained low-latency scheduling while sharing cluster resources efficiently. -Lastly, it is worth noting that Spark's dynamic resource allocation mechanism is cooperative. -This means if a Spark application enables this feature, other applications on the same cluster -are also expected to do so. Otherwise, the cluster's resources will end up being unfairly -distributed to the applications that do not voluntarily give up unused resources they have -acquired. - ### Configuration and Setup All configurations used by this feature live under the `spark.dynamicAllocation.*` namespace. @@ -89,9 +83,11 @@ provide lower and upper bounds for the number of executors through configurations are described on the [configurations page](configuration.html#dynamic-allocation) and in the subsequent sections in detail. -Additionally, your application must use an external shuffle service (described below). To enable -this, set `spark.shuffle.service.enabled` to `true`. In YARN, this external shuffle service is -implemented in `org.apache.spark.yarn.network.YarnShuffleService` that runs in each `NodeManager` +Additionally, your application must use an external shuffle service. The purpose of the service is +to preserve the shuffle files written by executors so the executors can be safely removed (more +detail described [below](job-scheduling.html#graceful-decommission-of-executors)). To enable +this service, set `spark.shuffle.service.enabled` to `true`. In YARN, this external shuffle service +is implemented in `org.apache.spark.yarn.network.YarnShuffleService` that runs in each `NodeManager` in your cluster. To start this service, follow these steps: 1. Build Spark with the [YARN profile](building-spark.html). Skip this step if you are using a @@ -108,7 +104,7 @@ then set `yarn.nodemanager.aux-services.spark_shuffle.class` to ### Resource Allocation Policy -On a high level, Spark should relinquish executors when they are no longer used and acquire +At a high level, Spark should relinquish executors when they are no longer used and acquire executors when they are needed. Since there is no definitive way to predict whether an executor that is about to be removed will run a task in the near future, or whether a new executor that is about to be added will actually be idle, we need a set of heuristics to determine when to remove @@ -163,6 +159,12 @@ independently of your Spark applications and their executors. If the service is executors will fetch shuffle files from the service instead of from each other. This means any shuffle state written by an executor may continue to be served beyond the executor's lifetime. +In addition to writing shuffle files, executors also cache data either on disk or in memory. +When an executor is removed, however, all cached data will no longer be accessible. There is +currently not yet a solution for this in Spark 1.2. In future releases, the cached data may be +preserved through an off-heap storage similar in spirit to how shuffle files are preserved through +the external shuffle service. + # Scheduling Within an Application Inside a given Spark application (SparkContext instance), multiple parallel jobs can run simultaneously if