Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-24519] Make the threshold for highly compressed map status configurable #21527

Closed
wants to merge 5 commits into from

Conversation

hthuynh2
Copy link

@hthuynh2 hthuynh2 commented Jun 11, 2018

Problem
MapStatus uses hardcoded value of 2000 partitions to determine if it should use highly compressed map status. We should make it configurable to allow users to more easily tune their jobs with respect to this without having for them to modify their code to change the number of partitions. Note we can leave this as an internal/undocumented config for now until we have more advise for the users on how to set this config.
Some of my reasoning:
The config gives you a way to easily change something without the user having to change code, redeploy jar, and then run again. You can simply change the config and rerun. It also allows for easier experimentation. Changing the # of partitions has other side affects, whether good or bad is situation dependent. It can be worse are you could be increasing # of output files when you don't want to be, affects the # of tasks needs and thus executors to run in parallel, etc.
There have been various talks about this number at spark summits where people have told customers to increase it to be 2001 partitions. Note if you just do a search for spark 2000 partitions you will fine various things all talking about this number. This shows that people are modifying their code to take this into account so it seems to me having this configurable would be better.
Once we have more advice for users we could expose this and document information on it.

What changes were proposed in this pull request?
I make the hardcoded value mentioned above to be configurable under the name SHUFFLE_MIN_NUM_PARTS_TO_HIGHLY_COMPRESS, which has default value to be 2000. Users can set it to the value they want by setting the property name spark.shuffle.minNumPartitionsToHighlyCompress

How was this patch tested?
I wrote a unit test to make sure that the default value is 2000, and IllegalArgumentException will be thrown if user set it to a non-positive value. The unit test also checks that highly compressed map status is correctly used when the number of partition is greater than SHUFFLE_MIN_NUM_PARTS_TO_HIGHLY_COMPRESS.

@hthuynh2 hthuynh2 changed the title Spark branch 1 [SPARK-24519] MapStatus has 2000 hardcoded Jun 11, 2018
@hthuynh2
Copy link
Author

@tgravescs

@tgravescs
Copy link
Contributor

ok to test

@SparkQA
Copy link

SparkQA commented Jun 11, 2018

Test build #91669 has finished for PR 21527 at commit d3f24b5.

  • This patch fails Scala style tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@markhamstra
Copy link
Contributor

We should make it configurable.

That's a debatable assertion all by itself -- and quite unfortunately, there is no more justification for this claim in the JIRA ticket. Without proof that there is a better configuration under certain circumstances, and without guidance as to why and how this value should be reconfigured, I'd argue that adding another knob for users to twiddle just adds confusion, potential for misconfiguration and more configuration space that is unlikely to be adequately tested.

@tgravescs
Copy link
Contributor

yes it is debatable, but with it being configurable it gives you the option to change. the only other way is to change the # of partitions which could be more costly. There have been various talks about this number at spark summits where people have told customers to increase it to be 2001 partitions. Note if you just do a search for spark 2000 partitions you will fine various things all talking about this number.

Personally I would like to see it configurable. If there is enough that disagree we can just close. I have seen a few users bump up the number of partitions when they were close to 2001 to get the memory saving, but I don't have any hard numbers.

@squito @vanzin have you seen people changing # of partitions where this config would be useful?

@markhamstra
Copy link
Contributor

@tgravescs If there is value in making it configurable, that is all fine and good. My argument is against making it configurable just for the sake of making it configurable. If there is more going on than that, then at a minimum we need documentation explaining why and when this number should be changed, and suggestions of appropriate values.

@squito
Copy link
Contributor

squito commented Jun 11, 2018

I tell users 2001 partitions all the time. I haven't bothered trying to make it configurable in the past because I dunno what else you'd set this value to -- normally I think it would be better for them to bump up the number of partitions anyway. Probably there are some cases where you'd rather change the conf rather than change the number of partitions?

agree with Mark, would be nice for the jira & pr description to explain why this is useful.

@SparkQA
Copy link

SparkQA commented Jun 12, 2018

Test build #91675 has finished for PR 21527 at commit 4c8acfa.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@tgravescs
Copy link
Contributor

we can definitely update the description with more details.

Personally I'm not fond of any hardcoded magic number like this that you could override with at least a internal config (meaning leaving it undocumented and only special case). It gives you a way to easily change something without the user having to change code, redeploy jar, and then run again. You can simply change the config and rerun. It also allows for easier experimentation. Changing the # of partitions has other side affects, whether good or bad is situation dependent. It can be worse are you could be increasing # of output files when you don't want to be, affects the # of tasks needs and thus executors to run in parallel, etc.

If no one else has seen a situation for this, I'm ok with closing for now until we have more concrete data. Which really perhaps should be turned into just improving it in general so we don't need 2 kinds.

@tgravescs
Copy link
Contributor

test this please

@markhamstra
Copy link
Contributor

we can definitely update the description with more details.

Eventually, some of the motivation and advice/suggestions need to get into the main user docs, as well.

@squito
Copy link
Contributor

squito commented Jun 12, 2018

I'm totally onboard with exposing it, though I'd say undocumented unless we have some concrete advice on how to set it. I was just explaining my experience so far and why I haven't bothered with changing it.

@SparkQA
Copy link

SparkQA commented Jun 12, 2018

Test build #91724 has finished for PR 21527 at commit 4c8acfa.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@tgravescs
Copy link
Contributor

test this please

@tgravescs
Copy link
Contributor

test failure seems to be reported here: https://issues.apache.org/jira/browse/SPARK-23369

@SparkQA
Copy link

SparkQA commented Jun 14, 2018

Test build #91850 has finished for PR 21527 at commit 4c8acfa.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@tgravescs
Copy link
Contributor

so I think as the pr is we don't document this config in the .md file, other then that we need to udpate the description on the PR and jira. @markhamstra are you ok with that? If we decide to expose the config then we can update the docs with more tuning information for users.

@markhamstra
Copy link
Contributor

Sure, as long as we are not telling users that this is something that they can or should use, that's fine.

@mridulm
Copy link
Contributor

mridulm commented Jun 18, 2018

+1 on making this configurable.
Like @tgravescs, I dont like hardcoded constants - all for making it a private config not necessarily exposed to users.
Will allow developers to tune it as required to improve value with time/constraints.

@tgravescs
Copy link
Contributor

I updated the jira description @hthuynh2 please update the description on the PR to match

@hthuynh2
Copy link
Author

I updated it. Thanks.

private[spark] val SHUFFLE_MIN_NUM_PARTS_TO_HIGHLY_COMPRESS =
ConfigBuilder("spark.shuffle.minNumPartitionsToHighlyCompress")
.doc("Number of partitions to determine if MapStatus should use HighlyCompressedMapStatus")
.intConf
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

add in .internal() to the config so its recording as being internal config for now.

@hthuynh2
Copy link
Author

@tgravescs I updated it. Thanks.

@tgravescs
Copy link
Contributor

+1. Leave this open for a bit to see if anyone else has comments.

@@ -188,4 +188,37 @@ class MapStatusSuite extends SparkFunSuite {
assert(count === 3000)
}
}

test("YSPARK-500 MapStatus has 2000 hardcoded") {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

rename to "HighlyCompressedMapStatus has configurable threshold"

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

there's also a typo in the bug number.

}
}
// Test positive values
for(s <- 1 to 3000) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is overkill, especially since you're creating a SparkContext in each test. (Look at how long this takes compared to other tests: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/91850/testReport/org.apache.spark.scheduler/MapStatusSuite/)

you really dont' need a SparkContext for these tests, I'd just set a SparkEnv -- just use mockito to create a mock which return the conf, and be sure to unset it at the end of the test in a finally. that's probably enough that runtime will be small, but in any case,just a few cases are probably enough:

Seq(1, 100, 499, 500, 501).foreach { s => ...

@vanzin
Copy link
Contributor

vanzin commented Jun 20, 2018

Also could you change the PR title to describe the change, not the problem.

"Make the threshold for highly compressed map status configurable." Or something.

@SparkQA
Copy link

SparkQA commented Jun 20, 2018

Test build #92137 has finished for PR 21527 at commit 179fb16.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@hthuynh2 hthuynh2 changed the title [SPARK-24519] MapStatus has 2000 hardcoded [SPARK-24519] Make the threshold for highly compressed map status configurable Jun 21, 2018
@hthuynh2
Copy link
Author

@vanzin @squito Thank for the comments. I updated the PR, please have a look and let me know if anything need to be changed. Thank you.

@SparkQA
Copy link

SparkQA commented Jun 21, 2018

Test build #92154 has finished for PR 21527 at commit 4cb492e.

  • This patch fails due to an unknown error code, -9.
  • This patch merges cleanly.
  • This patch adds no public classes.

@tgravescs
Copy link
Contributor

test this please

Copy link
Contributor

@squito squito left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

lgtm assuming tests pass

@SparkQA
Copy link

SparkQA commented Jun 21, 2018

Test build #92169 has finished for PR 21527 at commit 4cb492e.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@tgravescs
Copy link
Contributor

merged to master, thanks @hthuynh2

@asfgit asfgit closed this in 39dfaf2 Jun 22, 2018
@@ -50,7 +50,9 @@ private[spark] sealed trait MapStatus {
private[spark] object MapStatus {

def apply(loc: BlockManagerId, uncompressedSizes: Array[Long]): MapStatus = {
if (uncompressedSizes.length > 2000) {
if (uncompressedSizes.length > Option(SparkEnv.get)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this should be done once, rather than for every constructor, shouldn't it? might introduce a hot codepath for very large workloads here.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

btw this creates the impression that this can be modified in runtime, but in practice the executors generate these MapStatuses so they can't really be changed on executors.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How about creating a "static" val shuffleMinNumPartsToHighlyCompress for this? Please let me know if this is good for you so I can update it. Thanks.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes would be better to be done once, please file a jira.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the only tricky thing is how to write the test cases for this.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

8 participants