-
Notifications
You must be signed in to change notification settings - Fork 28.5k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
SPARK-2787: Make sort-based shuffle write files directly when there's no sorting/aggregation and # partitions is small #1799
Conversation
QA tests have started for PR 1799. This patch DID NOT merge cleanly! |
QA results for PR 1799: |
QA tests have started for PR 1799. This patch merges cleanly. |
test this please |
QA tests have started for PR 1799. This patch merges cleanly. |
QA results for PR 1799: |
test this please |
QA tests have started for PR 1799. This patch merges cleanly. |
QA results for PR 1799: |
test this please |
@rxin / @andrewor14 would be good if you review this when you have a chance. This is something we should add in 1.1 since sort-based shuffle is still off by default. |
BTW the test failures both time were in a Flume test for streaming, which might just be flaky. |
QA tests have started for PR 1799. This patch merges cleanly. |
Yeah the flaky tests are fixed here #1803 |
Ah cool, glad it's being fixed. |
QA results for PR 1799: |
val shortShuffleMgrNames = Map( | ||
"HASH" -> "org.apache.spark.shuffle.hash.HashShuffleManager", | ||
"SORT" -> "org.apache.spark.shuffle.sort.SortShuffleManager") | ||
val shuffleMgrName = conf.get("spark.shuffle.manager", "HASH") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
can we make this case insensitive?
Also renamed ExternalSorter.write(Iterator) to insertAll, to match ExternalAppendOnlyMap
Thanks; updated to deal with comments. |
QA tests have started for PR 1799. This patch merges cleanly. |
QA results for PR 1799: |
@rxin does this look okay? |
LGTM |
I'm merging this in master & branch-1.1 (since sort-based is disabled by default) |
Alright, thanks. Going to merge it. |
… no sorting/aggregation and # partitions is small As described in https://issues.apache.org/jira/browse/SPARK-2787, right now sort-based shuffle is more expensive than hash-based for map operations that do no partial aggregation or sorting, such as groupByKey. This is because it has to serialize each data item twice (once when spilling to intermediate files, and then again when merging these files object-by-object). This patch adds a code path to just write separate files directly if the # of output partitions is small, and concatenate them at the end to produce a sorted file. On the unit test side, I added some tests that force or don't force this bypass path to be used, and checked that our tests for other features (e.g. all the operations) cover both cases. Author: Matei Zaharia <[email protected]> Closes #1799 from mateiz/SPARK-2787 and squashes the following commits: 88cf26a [Matei Zaharia] Fix rebase 10233af [Matei Zaharia] Review comments 398cb95 [Matei Zaharia] Fix looking up shuffle manager in conf ca3efd9 [Matei Zaharia] Add docs for shuffle manager properties, and allow short names for them d0ae3c5 [Matei Zaharia] Fix some comments 90d084f [Matei Zaharia] Add code path to bypass merge-sort in ExternalSorter, and tests 31e5d7c [Matei Zaharia] Move existing logic for writing partitioned files into ExternalSorter (cherry picked from commit 6906b69) Signed-off-by: Reynold Xin <[email protected]>
val shortShuffleMgrNames = Map( | ||
"hash" -> "org.apache.spark.shuffle.hash.HashShuffleManager", | ||
"sort" -> "org.apache.spark.shuffle.sort.SortShuffleManager") | ||
val shuffleMgrName = conf.get("spark.shuffle.manager", "hash") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I ran into a problem using these short names: in ShuffleBlockManager, there's a line that looks at the spark.shuffle.manager
property to see whether we're using sort-based shuffle:
// Are we using sort-based shuffle?
val sortBasedShuffle =
conf.get("spark.shuffle.manager", "") == classOf[SortShuffleManager].getName
This won't work properly if the configuration property is set to one of the short names.
We can't just re-assign the property to the full name because the BlockManager will have already been created by this point and it will have created the ShuffleBlockManager with the wrong property value. Similarly, the ShuffleBlockManager can't access SparkEnv to inspect the actual ShuffleManager because it won't be fully initialized.
I think we should perform all configuration normalization / mutation at a single top-level location and then treat the configuration as immutable from that point forward, since that seems easier to reason about. What do you think about moving the aliasing / normalization to the top of SparkEnv?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'd rather not change the configuration under the user, that would be confusing if they later print it or look in the web UI. Instead, maybe add a SparkEnv.getShuffleManagerClass(conf: SparkConf) that can return the real class name.
Also I'd be fine initializing the ShuffleBlockManager after the ShuffleManager if that works, and using isInstanceOf. That would be the cleanest.
… no sorting/aggregation and # partitions is small As described in https://issues.apache.org/jira/browse/SPARK-2787, right now sort-based shuffle is more expensive than hash-based for map operations that do no partial aggregation or sorting, such as groupByKey. This is because it has to serialize each data item twice (once when spilling to intermediate files, and then again when merging these files object-by-object). This patch adds a code path to just write separate files directly if the # of output partitions is small, and concatenate them at the end to produce a sorted file. On the unit test side, I added some tests that force or don't force this bypass path to be used, and checked that our tests for other features (e.g. all the operations) cover both cases. Author: Matei Zaharia <[email protected]> Closes apache#1799 from mateiz/SPARK-2787 and squashes the following commits: 88cf26a [Matei Zaharia] Fix rebase 10233af [Matei Zaharia] Review comments 398cb95 [Matei Zaharia] Fix looking up shuffle manager in conf ca3efd9 [Matei Zaharia] Add docs for shuffle manager properties, and allow short names for them d0ae3c5 [Matei Zaharia] Fix some comments 90d084f [Matei Zaharia] Add code path to bypass merge-sort in ExternalSorter, and tests 31e5d7c [Matei Zaharia] Move existing logic for writing partitioned files into ExternalSorter
As described in https://issues.apache.org/jira/browse/SPARK-2787, right now sort-based shuffle is more expensive than hash-based for map operations that do no partial aggregation or sorting, such as groupByKey. This is because it has to serialize each data item twice (once when spilling to intermediate files, and then again when merging these files object-by-object). This patch adds a code path to just write separate files directly if the # of output partitions is small, and concatenate them at the end to produce a sorted file.
On the unit test side, I added some tests that force or don't force this bypass path to be used, and checked that our tests for other features (e.g. all the operations) cover both cases.