-
Notifications
You must be signed in to change notification settings - Fork 28.5k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[SPARK-24519] Make the threshold for highly compressed map status configurable #21527
Conversation
ok to test |
Test build #91669 has finished for PR 21527 at commit
|
That's a debatable assertion all by itself -- and quite unfortunately, there is no more justification for this claim in the JIRA ticket. Without proof that there is a better configuration under certain circumstances, and without guidance as to why and how this value should be reconfigured, I'd argue that adding another knob for users to twiddle just adds confusion, potential for misconfiguration and more configuration space that is unlikely to be adequately tested. |
yes it is debatable, but with it being configurable it gives you the option to change. the only other way is to change the # of partitions which could be more costly. There have been various talks about this number at spark summits where people have told customers to increase it to be 2001 partitions. Note if you just do a search for spark 2000 partitions you will fine various things all talking about this number. Personally I would like to see it configurable. If there is enough that disagree we can just close. I have seen a few users bump up the number of partitions when they were close to 2001 to get the memory saving, but I don't have any hard numbers. @squito @vanzin have you seen people changing # of partitions where this config would be useful? |
@tgravescs If there is value in making it configurable, that is all fine and good. My argument is against making it configurable just for the sake of making it configurable. If there is more going on than that, then at a minimum we need documentation explaining why and when this number should be changed, and suggestions of appropriate values. |
I tell users 2001 partitions all the time. I haven't bothered trying to make it configurable in the past because I dunno what else you'd set this value to -- normally I think it would be better for them to bump up the number of partitions anyway. Probably there are some cases where you'd rather change the conf rather than change the number of partitions? agree with Mark, would be nice for the jira & pr description to explain why this is useful. |
Test build #91675 has finished for PR 21527 at commit
|
we can definitely update the description with more details. Personally I'm not fond of any hardcoded magic number like this that you could override with at least a internal config (meaning leaving it undocumented and only special case). It gives you a way to easily change something without the user having to change code, redeploy jar, and then run again. You can simply change the config and rerun. It also allows for easier experimentation. Changing the # of partitions has other side affects, whether good or bad is situation dependent. It can be worse are you could be increasing # of output files when you don't want to be, affects the # of tasks needs and thus executors to run in parallel, etc. If no one else has seen a situation for this, I'm ok with closing for now until we have more concrete data. Which really perhaps should be turned into just improving it in general so we don't need 2 kinds. |
test this please |
Eventually, some of the motivation and advice/suggestions need to get into the main user docs, as well. |
I'm totally onboard with exposing it, though I'd say undocumented unless we have some concrete advice on how to set it. I was just explaining my experience so far and why I haven't bothered with changing it. |
Test build #91724 has finished for PR 21527 at commit
|
test this please |
test failure seems to be reported here: https://issues.apache.org/jira/browse/SPARK-23369 |
Test build #91850 has finished for PR 21527 at commit
|
so I think as the pr is we don't document this config in the .md file, other then that we need to udpate the description on the PR and jira. @markhamstra are you ok with that? If we decide to expose the config then we can update the docs with more tuning information for users. |
Sure, as long as we are not telling users that this is something that they can or should use, that's fine. |
+1 on making this configurable. |
I updated the jira description @hthuynh2 please update the description on the PR to match |
I updated it. Thanks. |
private[spark] val SHUFFLE_MIN_NUM_PARTS_TO_HIGHLY_COMPRESS = | ||
ConfigBuilder("spark.shuffle.minNumPartitionsToHighlyCompress") | ||
.doc("Number of partitions to determine if MapStatus should use HighlyCompressedMapStatus") | ||
.intConf |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
add in .internal() to the config so its recording as being internal config for now.
@tgravescs I updated it. Thanks. |
+1. Leave this open for a bit to see if anyone else has comments. |
@@ -188,4 +188,37 @@ class MapStatusSuite extends SparkFunSuite { | |||
assert(count === 3000) | |||
} | |||
} | |||
|
|||
test("YSPARK-500 MapStatus has 2000 hardcoded") { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
rename to "HighlyCompressedMapStatus has configurable threshold"
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
there's also a typo in the bug number.
} | ||
} | ||
// Test positive values | ||
for(s <- 1 to 3000) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this is overkill, especially since you're creating a SparkContext in each test. (Look at how long this takes compared to other tests: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/91850/testReport/org.apache.spark.scheduler/MapStatusSuite/)
you really dont' need a SparkContext for these tests, I'd just set a SparkEnv
-- just use mockito to create a mock which return the conf, and be sure to unset it at the end of the test in a finally
. that's probably enough that runtime will be small, but in any case,just a few cases are probably enough:
Seq(1, 100, 499, 500, 501).foreach { s => ...
Also could you change the PR title to describe the change, not the problem. "Make the threshold for highly compressed map status configurable." Or something. |
Test build #92137 has finished for PR 21527 at commit
|
Test build #92154 has finished for PR 21527 at commit
|
test this please |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
lgtm assuming tests pass
Test build #92169 has finished for PR 21527 at commit
|
merged to master, thanks @hthuynh2 |
@@ -50,7 +50,9 @@ private[spark] sealed trait MapStatus { | |||
private[spark] object MapStatus { | |||
|
|||
def apply(loc: BlockManagerId, uncompressedSizes: Array[Long]): MapStatus = { | |||
if (uncompressedSizes.length > 2000) { | |||
if (uncompressedSizes.length > Option(SparkEnv.get) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this should be done once, rather than for every constructor, shouldn't it? might introduce a hot codepath for very large workloads here.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
btw this creates the impression that this can be modified in runtime, but in practice the executors generate these MapStatuses so they can't really be changed on executors.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
How about creating a "static" val shuffleMinNumPartsToHighlyCompress for this? Please let me know if this is good for you so I can update it. Thanks.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yes would be better to be done once, please file a jira.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
the only tricky thing is how to write the test cases for this.
Problem
MapStatus uses hardcoded value of 2000 partitions to determine if it should use highly compressed map status. We should make it configurable to allow users to more easily tune their jobs with respect to this without having for them to modify their code to change the number of partitions. Note we can leave this as an internal/undocumented config for now until we have more advise for the users on how to set this config.
Some of my reasoning:
The config gives you a way to easily change something without the user having to change code, redeploy jar, and then run again. You can simply change the config and rerun. It also allows for easier experimentation. Changing the # of partitions has other side affects, whether good or bad is situation dependent. It can be worse are you could be increasing # of output files when you don't want to be, affects the # of tasks needs and thus executors to run in parallel, etc.
There have been various talks about this number at spark summits where people have told customers to increase it to be 2001 partitions. Note if you just do a search for spark 2000 partitions you will fine various things all talking about this number. This shows that people are modifying their code to take this into account so it seems to me having this configurable would be better.
Once we have more advice for users we could expose this and document information on it.
What changes were proposed in this pull request?
I make the hardcoded value mentioned above to be configurable under the name SHUFFLE_MIN_NUM_PARTS_TO_HIGHLY_COMPRESS, which has default value to be 2000. Users can set it to the value they want by setting the property name spark.shuffle.minNumPartitionsToHighlyCompress
How was this patch tested?
I wrote a unit test to make sure that the default value is 2000, and IllegalArgumentException will be thrown if user set it to a non-positive value. The unit test also checks that highly compressed map status is correctly used when the number of partition is greater than SHUFFLE_MIN_NUM_PARTS_TO_HIGHLY_COMPRESS.