-
Notifications
You must be signed in to change notification settings - Fork 28.5k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[SPARK-22683][CORE] Add a executorAllocationRatio parameter to throttle the parallelism of the dynamic allocation #19881
Conversation
Please see JIRA. I don't think this is worth doing. |
conf.getInt("spark.executor.cores", 1) / conf.getInt("spark.task.cpus", 1) | ||
|
||
private val tasksPerExecutorSlot = conf.getInt("spark.dynamicAllocation.tasksPerExecutorSlot", 1) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think we should change the name of this config because spark doesn't have the concept of slots and I think it could be confusing to the users who might expect exactly x tasks to be processed on each executor. I am thinking more along the lines of spark.dynamicAllocation.maxExecutorsPerStageDivisor=max # of executors based on # of tasks required for that stage divided by this number. I'm open to other config names here though.
I think we would also need to define its interaction with spark.dynamicAllocation.maxExecutors as well as how it works as # of running/to be run tasks changes.
conf.getInt("spark.executor.cores", 1) / conf.getInt("spark.task.cpus", 1) | ||
|
||
private val tasksPerExecutorSlot = conf.getInt("spark.dynamicAllocation.tasksPerExecutorSlot", 1) | ||
|
||
private val tasksPerExecutor = tasksPerExecutorSlot * taskSlotPerExecutor |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Since we aren't using concept of slots, I think we should leave the tasksPerExecutor alone and put this functionality into maxNumExecutorsNeeded()
ping @jcuquemelle can you update this? |
Sorry, I didn't see the ping, I will have a look shortly. |
2abd46f
to
56c3f43
Compare
The new semantics (throttling w.r.t max possible parallelism) is actually simpler to understand. I'm proposing another name which doesn't have any ambiguity with the existing maxExecutors param, but I'm open to any other name proposal. |
jenkins, test this please |
Test build #88180 has finished for PR 19881 at commit
|
@jcuquemelle please fix the style |
docs/configuration.md
Outdated
<td> | ||
By default, the dynamic allocation will request enough executors to maximize the | ||
parallelism according to the number of tasks to process. While this minimizes the | ||
latency of the job, with small tasks this setting wastes a lot of resources due to |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
can waste.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
docs/configuration.md
Outdated
executor allocation overhead, as some executor might not even do any work. | ||
This setting allows to set a divisor that will be used to reduce the number of | ||
executors w.r.t. full parallelism | ||
Defaults to 1.0 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think we should define that maxExecutors trumps this setting.
If I have 10000 tasks, divisor 2, I would expect 5000 executors, but if max executors is 1000, that is all I get.
we should add a test for this interaction as well
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done
docs/configuration.md
Outdated
latency of the job, with small tasks this setting wastes a lot of resources due to | ||
executor allocation overhead, as some executor might not even do any work. | ||
This setting allows to set a divisor that will be used to reduce the number of | ||
executors w.r.t. full parallelism |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
add period at end of parallelism
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
docs/configuration.md
Outdated
@@ -1795,6 +1796,19 @@ Apart from these, the following properties are also available, and may be useful | |||
Lower bound for the number of executors if dynamic allocation is enabled. | |||
</td> | |||
</tr> | |||
<tr> | |||
<td><code>spark.dynamicAllocation.fullParallelismDivisor</code></td> |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Naming configs is really hard and lots of different opinions on it and in the end someone is going to be confused, I need to think about this some more. I see the reason to use Parallelism here rather then maxExecutors (maxExecutorsDivisor - could be confusing if people think it applies to the maxExecutors config), but I also think parallelism would be confused with the parallelism in the spark.default.parallelism, its not defining number of tasks but number of executors to allocate based on the parallelism. Another one I thought of is executorAllocationDivisor. I'll think about it some more and get back.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
How about something like fullAllocationDivisor ? or fullExecutorAllocationDivisor ? I think the naming should reflect the fact that it is a divisor w.r.t. the full possible parallelism/number of executors
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
sorry didn't get back to this earlier, I think fullExecutorAllocationDivisor would be fine.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
could you update the PR title and description to fit the new approach?
b2e24f3
to
c6641c1
Compare
@felixcheung: updated PR title and description |
jenkins, test this please |
jenkins, ok to test |
Test build #88472 has finished for PR 19881 at commit
|
var manager = sc.executorAllocationManager.get | ||
post(sc.listenerBus, SparkListenerStageSubmitted(createStageInfo(0, 20))) | ||
for (i <- 0 to 5) { | ||
addExecutors(manager) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this loop isn't really needed right? All we are checking is the target not the number to add?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If we want to check the capping by max / min executors, we need to actually try and add executors. The max /min capping does not occur during the computation of the target number of exes, but at the time they are added
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ok
just minor comment about the test otherwise looks good. |
@@ -116,9 +120,12 @@ private[spark] class ExecutorAllocationManager( | |||
// TODO: The default value of 1 for spark.executor.cores works right now because dynamic | |||
// allocation is only supported for YARN and the default number of cores per executor in YARN is | |||
// 1, but it might need to be attained differently for different cluster managers | |||
private val tasksPerExecutor = | |||
private val tasksPerExecutorForFullParallelism = |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We don't really need this variable now, can we just remove it?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
it is used at 2 places, one to validate arguments and the other to actually compute the target number of executors. If I remove this variable, I will need to either store spark.executor.cores
and spark.task.cpus
instead, or to fetch them each time we do a validation or a computation of target nbExecutors
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@jiangxb1987, do you agree with my comment, or do you still want me to remove the variable ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I was originally thinking we may avoid introducing the concept tasksPerExecutorForFullParallelism
, but rather only have executorCores and taskCPUs, but I don't have a strong opinion over that.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is not exposed, it is merely a more precise description of the actual computation. I just wanted to state more clearly that the existing default behavior is maximizing the parallelism
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ok
+1 |
cc @rxin |
I'll leave this a bit longer but then I'm going to merge it later today |
Can you wait another day? I just find the name pretty weird. Do we have
other configs that use the “divisor” suffix?
…On Wed, Mar 28, 2018 at 7:23 AM Tom Graves ***@***.***> wrote:
I'll leave this a bit longer but then I'm going to merge it later today
—
You are receiving this because you were mentioned.
Reply to this email directly, view it on GitHub
<#19881 (comment)>, or mute
the thread
<https://github.com/notifications/unsubscribe-auth/AATvPOFekjRxMQwLNeHMCtxZt92Fv3YGks5ti5z8gaJpZM4Q1Frd>
.
|
Yes we can wait another day or so if you are looking at it, this discussion has been going on for a long time now though, if you have a better name suggestion let us know. No other configs have "divisor" suffix.s |
@rxin , can we merge this PR ? |
Maybe instead of "divisor", we just have a "rate" or "factor" that can be floating point value, and use multiplication rather than division? This way people can also make it even more aggressive. |
@rxin : more aggressive must be forbidden, because the setting of 1.0 gives enough executors so that if the executor provisioning was perfect (e.g. all executors were available at the same time) and the mapping of tasks to executors was optimal, each executor core (or taskSlot as in the original naming) would process exactly one task. If you ask for more executors, you're sure they will be wasted. |
conf.getInt("spark.executor.cores", 1) / conf.getInt("spark.task.cpus", 1) | ||
|
||
private val fullExecutorAllocationDivisor = | ||
conf.getDouble("spark.dynamicAllocation.fullExecutorAllocationDivisor", 1.0) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
forgot about this earlier but this should really be a config similar to DYN_ALLOCATION_MIN_EXECUTORS
@rxin I assume you are just trying to not use divisor since its not used anywhere else? As @jcuquemelle state I don't see a use case for this to be made more aggressive if you have one please let us know, but otherwise it just wastes resources. Personally I still like divisor because that is what you are doing. I don't think because its not in any other configs is a good reason to not use it. Looking at I don't see any public configs that have factor in the name of them either. I am not fond of rate because its not a rate (ie how quickly/slowly you are allocating), its a limit on max number of executors. I also think its more natural for people to think of this as a divisor vs a multiplier. if I want 1/2 of the executors you divide by 2. I think we should name it based on what is most likely understood by the end user. |
SGTM on divisor. Do we need "full" there in the config? |
No we don't strictly need it in the name, the reasoning behind it was to indicate that this was a divisor based on if you have fully allocated executors for all the tasks and were running full parallelism. |
I thought about this more, and I actually think something like this makes more sense: What do you think? |
I'm fine with that |
Ok, will quickly do the change |
Thanks @jcuquemelle |
Test build #89467 has finished for PR 19881 at commit
|
@@ -26,7 +26,10 @@ import scala.util.control.{ControlThrowable, NonFatal} | |||
import com.codahale.metrics.{Gauge, MetricRegistry} | |||
|
|||
import org.apache.spark.internal.Logging | |||
import org.apache.spark.internal.config.{DYN_ALLOCATION_MAX_EXECUTORS, DYN_ALLOCATION_MIN_EXECUTORS} | |||
import org.apache.spark.internal.config.{ |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I would just make this import org.apache.spark.internal.config._
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done
docs/configuration.md
Outdated
@@ -1751,6 +1751,7 @@ Apart from these, the following properties are also available, and may be useful | |||
<code>spark.dynamicAllocation.minExecutors</code>, | |||
<code>spark.dynamicAllocation.maxExecutors</code>, and | |||
<code>spark.dynamicAllocation.initialExecutors</code> | |||
<code>spark.dynamicAllocation.fullExecutorAllocationDivisor</code> |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
needs changed to executorAllocationRatio
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done, missed that one, sorry :-)
…executors let's say an executor has spark.executor.cores / spark.task.cpus taskSlots The current dynamic allocation policy allocates enough executors to have each taskSlot execute a single task, which wastes resources when tasks are small regarding executor allocation overhead. By adding the tasksPerExecutorSlot, it is made possible to specify how many tasks a single slot should ideally execute to mitigate the overhead of executor allocation.
This allows for a different semantic, which yields a simpler explanation and allows considering this parameter as a double for a finer control Utests have been updated to actually test the number of executors and have been refactored
fixed / updated doc
15732ab
to
3b1dddc
Compare
Test build #89712 has finished for PR 19881 at commit
|
+1 |
What changes were proposed in this pull request?
By default, the dynamic allocation will request enough executors to maximize the
parallelism according to the number of tasks to process. While this minimizes the
latency of the job, with small tasks this setting can waste a lot of resources due to
executor allocation overhead, as some executor might not even do any work.
This setting allows to set a ratio that will be used to reduce the number of
target executors w.r.t. full parallelism.
The number of executors computed with this setting is still fenced by
spark.dynamicAllocation.maxExecutors
andspark.dynamicAllocation.minExecutors
How was this patch tested?
Units tests and runs on various actual workloads on a Yarn Cluster