-
Notifications
You must be signed in to change notification settings - Fork 28.5k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[SPARK-3367] Remove spark.shuffle.spill.compress #2247
Conversation
Replaced it with existing spark.shuffle.compress.
QA tests have started for PR 2247 at commit
|
QA tests have finished for PR 2247 at commit
|
Jenkins, retest this please. |
QA tests have started for PR 2247 at commit
|
Is this being done to workaround the bug (sort shuffle breakage) ?
|
What compression overhead are you talking about? |
The overhead of compressing ?
|
I don't get it. Are you saying compression is turned on for spill files, but not for general shuffle files? And in response to your original question about sort leakage: yes this is breaking that. See discussion in #2178 (comment) |
Yes, exactly : with higher block size configured sometimes.
|
Do you have any data on a job in which disabling shuffle compression helps with performance (especially with sort-based shuffle in which the number of concurrent streams is low)? In all cases I've seen, compression improves performance. I'm not sure if we should keep two options to satisfy an extremely rare corner case. |
QA tests have finished for PR 2247 at commit
|
I think we are looking at it the wrong way - this is an exposed interface It might also break direct usage of ExternalAppendOnlyMap, but since that
|
Yes, unfortunately we currently document |
It's not an interface in the sense that program output doesn't change. It's only a config setting for an optimization. Anyway, I agree with Reynold -- I'd like to see an example where spark.shuffle.spill.compress and spark.shuffle.compress need to be at different values, and see performance numbers for that. It seems to me that you're spilling the same kind of objects in both, so there will be the same tradeoff between I/O and compute time. |
I haven't looked at all the details of this, but to me configs are public interfaces. By you removing one that someone had used in the past could now cause their program to behave differently between minor versions, which might be unexpected. In general I would try to keep things backwards compatible. If this is direct mapping why not just have the code check if either is set? |
I am not sure I follow ... We compress shuffle with first while we compress The team has decided to use it, and I don't want to second guess their
|
The same data that gets spilled gets shuffled in most operations. Anyway, I'd be okay keeping it, but I'd like to understand why it's important. In general we should be aiming to reduce the number of config settings. Maybe this team is doing something wrong by using it. It's important to know the reason if we want to make a decision about reducing the # of config settings. |
Perhaps we should have a separate discussion on the dev mailing list as what we consider public interfaces? That way everyone is in agreement or at least is following the same rules when things get committed. |
Sure, I'd be happy to codify that. In general my approach to this has been that we keep all config settings and defaults that affect correctness (whether the program gives the right result), but things that affect performance are fair to change. For example, we've kept spark.serializer to be Java since changing this by default would break a lot of apps. But we've tweaked things like the % of memory used for caching and buffer sizes, and we've also made some settings largely irrelevant (e.g. Akka frame size used to matter a lot, but we moved first task results and then tasks themselves to not be sent through Akka). Anyway, in this case I'm not arguing that this should be removed in 1.X if there are concerns, I'm just trying to understand whether this is a useful setting to have longer-term. In particular, should we deprecate it and remove it in 2.X? So far I haven't seen a strong case that this is a useful setting, but I might be missing some use case. |
Ok I'm going to close this for now given the debate. @mridulm it would be great to hear more about the performance difference. It is hard for me to imagine a scenario in which you would want different settings for the two. @andrewor14 can you fix the bug? |
Ok. I have filed a JIRA at https://issues.apache.org/jira/browse/SPARK-3426 |
… and spark.shuffle.spill.compress settings are different This PR fixes SPARK-3426, an issue where sort-based shuffle crashes if the `spark.shuffle.spill.compress` and `spark.shuffle.compress` settings have different values. The problem is that sort-based shuffle's read and write paths use different settings for determining whether to apply compression. ExternalSorter writes runs to files using `TempBlockId` ids, which causes `spark.shuffle.spill.compress` to be used for enabling compression, but these spilled files end up being shuffled over the network and read as shuffle files using `ShuffleBlockId` by BlockStoreShuffleFetcher, which causes `spark.shuffle.compress` to be used for enabling decompression. As a result, this leads to errors when these settings disagree. Based on the discussions in #2247 and #2178, it sounds like we don't want to remove the `spark.shuffle.spill.compress` setting. Therefore, I've tried to come up with a fix where `spark.shuffle.spill.compress` is used to compress data that's read and written locally and `spark.shuffle.compress` is used to compress any data that will be fetched / read as shuffle blocks. To do this, I split `TempBlockId` into two new id types, `TempLocalBlockId` and `TempShuffleBlockId`, which map to `spark.shuffle.spill.compress` and `spark.shuffle.compress`, respectively. ExternalAppendOnlyMap also used temp blocks for spilling data. It looks like ExternalSorter was designed to be a generic sorter but its configuration already happens to be tied to sort-based shuffle, so I think it's fine if we use `spark.shuffle.compress` to compress its spills; we can move the compression configuration to the constructor in a later commit if we find that ExternalSorter is being used in other contexts where we want different configuration options to control compression. To summarize: **Before:** | | ExternalAppendOnlyMap | ExternalSorter | |-------|------------------------------|------------------------------| | Read | spark.shuffle.spill.compress | spark.shuffle.compress | | Write | spark.shuffle.spill.compress | spark.shuffle.spill.compress | **After:** | | ExternalAppendOnlyMap | ExternalSorter | |-------|------------------------------|------------------------| | Read | spark.shuffle.spill.compress | spark.shuffle.compress | | Write | spark.shuffle.spill.compress | spark.shuffle.compress | Thanks to andrewor14 for debugging this with me! Author: Josh Rosen <[email protected]> Closes #2890 from JoshRosen/SPARK-3426 and squashes the following commits: 1921cf6 [Josh Rosen] Minor edit for clarity. c8dd8f2 [Josh Rosen] Add comment explaining use of createTempShuffleBlock(). 2c687b9 [Josh Rosen] Fix SPARK-3426. 91e7e40 [Josh Rosen] Combine tests into single test of all combinations 76ca65e [Josh Rosen] Add regression test for SPARK-3426. Conflicts: core/src/main/scala/org/apache/spark/util/collection/ExternalAppendOnlyMap.scala
… and spark.shuffle.spill.compress settings are different This PR fixes SPARK-3426, an issue where sort-based shuffle crashes if the `spark.shuffle.spill.compress` and `spark.shuffle.compress` settings have different values. The problem is that sort-based shuffle's read and write paths use different settings for determining whether to apply compression. ExternalSorter writes runs to files using `TempBlockId` ids, which causes `spark.shuffle.spill.compress` to be used for enabling compression, but these spilled files end up being shuffled over the network and read as shuffle files using `ShuffleBlockId` by BlockStoreShuffleFetcher, which causes `spark.shuffle.compress` to be used for enabling decompression. As a result, this leads to errors when these settings disagree. Based on the discussions in #2247 and #2178, it sounds like we don't want to remove the `spark.shuffle.spill.compress` setting. Therefore, I've tried to come up with a fix where `spark.shuffle.spill.compress` is used to compress data that's read and written locally and `spark.shuffle.compress` is used to compress any data that will be fetched / read as shuffle blocks. To do this, I split `TempBlockId` into two new id types, `TempLocalBlockId` and `TempShuffleBlockId`, which map to `spark.shuffle.spill.compress` and `spark.shuffle.compress`, respectively. ExternalAppendOnlyMap also used temp blocks for spilling data. It looks like ExternalSorter was designed to be a generic sorter but its configuration already happens to be tied to sort-based shuffle, so I think it's fine if we use `spark.shuffle.compress` to compress its spills; we can move the compression configuration to the constructor in a later commit if we find that ExternalSorter is being used in other contexts where we want different configuration options to control compression. To summarize: **Before:** | | ExternalAppendOnlyMap | ExternalSorter | |-------|------------------------------|------------------------------| | Read | spark.shuffle.spill.compress | spark.shuffle.compress | | Write | spark.shuffle.spill.compress | spark.shuffle.spill.compress | **After:** | | ExternalAppendOnlyMap | ExternalSorter | |-------|------------------------------|------------------------| | Read | spark.shuffle.spill.compress | spark.shuffle.compress | | Write | spark.shuffle.spill.compress | spark.shuffle.compress | Thanks to andrewor14 for debugging this with me! Author: Josh Rosen <[email protected]> Closes #2890 from JoshRosen/SPARK-3426 and squashes the following commits: 1921cf6 [Josh Rosen] Minor edit for clarity. c8dd8f2 [Josh Rosen] Add comment explaining use of createTempShuffleBlock(). 2c687b9 [Josh Rosen] Fix SPARK-3426. 91e7e40 [Josh Rosen] Combine tests into single test of all combinations 76ca65e [Josh Rosen] Add regression test for SPARK-3426.
Replaced it with existing spark.shuffle.compress.
See discussion in #2178 (comment)