Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

SPARK-2787: Make sort-based shuffle write files directly when there's no sorting/aggregation and # partitions is small #1799

Closed
wants to merge 7 commits into from

Conversation

mateiz
Copy link
Contributor

@mateiz mateiz commented Aug 6, 2014

As described in https://issues.apache.org/jira/browse/SPARK-2787, right now sort-based shuffle is more expensive than hash-based for map operations that do no partial aggregation or sorting, such as groupByKey. This is because it has to serialize each data item twice (once when spilling to intermediate files, and then again when merging these files object-by-object). This patch adds a code path to just write separate files directly if the # of output partitions is small, and concatenate them at the end to produce a sorted file.

On the unit test side, I added some tests that force or don't force this bypass path to be used, and checked that our tests for other features (e.g. all the operations) cover both cases.

@SparkQA
Copy link

SparkQA commented Aug 6, 2014

QA tests have started for PR 1799. This patch DID NOT merge cleanly!
View progress: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/17985/consoleFull

@SparkQA
Copy link

SparkQA commented Aug 6, 2014

QA results for PR 1799:
- This patch FAILED unit tests.

For more information see test ouptut:
https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/17985/consoleFull

@SparkQA
Copy link

SparkQA commented Aug 6, 2014

QA tests have started for PR 1799. This patch merges cleanly.
View progress: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/17999/consoleFull

@mateiz
Copy link
Contributor Author

mateiz commented Aug 6, 2014

test this please

@SparkQA
Copy link

SparkQA commented Aug 6, 2014

QA tests have started for PR 1799. This patch merges cleanly.
View progress: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/18000/consoleFull

@SparkQA
Copy link

SparkQA commented Aug 6, 2014

QA results for PR 1799:
- This patch FAILED unit tests.
- This patch merges cleanly
- This patch adds no public classes

For more information see test ouptut:
https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/18000/consoleFull

@mateiz
Copy link
Contributor Author

mateiz commented Aug 6, 2014

test this please

@SparkQA
Copy link

SparkQA commented Aug 6, 2014

QA tests have started for PR 1799. This patch merges cleanly.
View progress: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/18009/consoleFull

@SparkQA
Copy link

SparkQA commented Aug 6, 2014

QA results for PR 1799:
- This patch FAILED unit tests.
- This patch merges cleanly
- This patch adds no public classes

For more information see test ouptut:
https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/18009/consoleFull

@mateiz
Copy link
Contributor Author

mateiz commented Aug 6, 2014

test this please

@mateiz
Copy link
Contributor Author

mateiz commented Aug 6, 2014

@rxin / @andrewor14 would be good if you review this when you have a chance. This is something we should add in 1.1 since sort-based shuffle is still off by default.

@mateiz
Copy link
Contributor Author

mateiz commented Aug 6, 2014

BTW the test failures both time were in a Flume test for streaming, which might just be flaky.

@SparkQA
Copy link

SparkQA commented Aug 6, 2014

QA tests have started for PR 1799. This patch merges cleanly.
View progress: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/18036/consoleFull

@andrewor14
Copy link
Contributor

Yeah the flaky tests are fixed here #1803

@mateiz
Copy link
Contributor Author

mateiz commented Aug 6, 2014

Ah cool, glad it's being fixed.

@SparkQA
Copy link

SparkQA commented Aug 6, 2014

QA results for PR 1799:
- This patch PASSES unit tests.
- This patch merges cleanly
- This patch adds no public classes

For more information see test ouptut:
https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/18036/consoleFull

val shortShuffleMgrNames = Map(
"HASH" -> "org.apache.spark.shuffle.hash.HashShuffleManager",
"SORT" -> "org.apache.spark.shuffle.sort.SortShuffleManager")
val shuffleMgrName = conf.get("spark.shuffle.manager", "HASH")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can we make this case insensitive?

@mateiz
Copy link
Contributor Author

mateiz commented Aug 7, 2014

Thanks; updated to deal with comments.

@SparkQA
Copy link

SparkQA commented Aug 7, 2014

QA tests have started for PR 1799. This patch merges cleanly.
View progress: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/18096/consoleFull

@SparkQA
Copy link

SparkQA commented Aug 7, 2014

QA results for PR 1799:
- This patch PASSES unit tests.
- This patch merges cleanly
- This patch adds no public classes

For more information see test ouptut:
https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/18096/consoleFull

@mateiz
Copy link
Contributor Author

mateiz commented Aug 8, 2014

@rxin does this look okay?

@rxin
Copy link
Contributor

rxin commented Aug 8, 2014

LGTM

@rxin
Copy link
Contributor

rxin commented Aug 8, 2014

I'm merging this in master & branch-1.1 (since sort-based is disabled by default)

@mateiz
Copy link
Contributor Author

mateiz commented Aug 8, 2014

Alright, thanks. Going to merge it.

@asfgit asfgit closed this in 6906b69 Aug 8, 2014
asfgit pushed a commit that referenced this pull request Aug 8, 2014
… no sorting/aggregation and # partitions is small

As described in https://issues.apache.org/jira/browse/SPARK-2787, right now sort-based shuffle is more expensive than hash-based for map operations that do no partial aggregation or sorting, such as groupByKey. This is because it has to serialize each data item twice (once when spilling to intermediate files, and then again when merging these files object-by-object). This patch adds a code path to just write separate files directly if the # of output partitions is small, and concatenate them at the end to produce a sorted file.

On the unit test side, I added some tests that force or don't force this bypass path to be used, and checked that our tests for other features (e.g. all the operations) cover both cases.

Author: Matei Zaharia <[email protected]>

Closes #1799 from mateiz/SPARK-2787 and squashes the following commits:

88cf26a [Matei Zaharia] Fix rebase
10233af [Matei Zaharia] Review comments
398cb95 [Matei Zaharia] Fix looking up shuffle manager in conf
ca3efd9 [Matei Zaharia] Add docs for shuffle manager properties, and allow short names for them
d0ae3c5 [Matei Zaharia] Fix some comments
90d084f [Matei Zaharia] Add code path to bypass merge-sort in ExternalSorter, and tests
31e5d7c [Matei Zaharia] Move existing logic for writing partitioned files into ExternalSorter

(cherry picked from commit 6906b69)
Signed-off-by: Reynold Xin <[email protected]>
val shortShuffleMgrNames = Map(
"hash" -> "org.apache.spark.shuffle.hash.HashShuffleManager",
"sort" -> "org.apache.spark.shuffle.sort.SortShuffleManager")
val shuffleMgrName = conf.get("spark.shuffle.manager", "hash")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I ran into a problem using these short names: in ShuffleBlockManager, there's a line that looks at the spark.shuffle.manager property to see whether we're using sort-based shuffle:

  // Are we using sort-based shuffle?
  val sortBasedShuffle =
    conf.get("spark.shuffle.manager", "") == classOf[SortShuffleManager].getName

This won't work properly if the configuration property is set to one of the short names.

We can't just re-assign the property to the full name because the BlockManager will have already been created by this point and it will have created the ShuffleBlockManager with the wrong property value. Similarly, the ShuffleBlockManager can't access SparkEnv to inspect the actual ShuffleManager because it won't be fully initialized.

I think we should perform all configuration normalization / mutation at a single top-level location and then treat the configuration as immutable from that point forward, since that seems easier to reason about. What do you think about moving the aliasing / normalization to the top of SparkEnv?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd rather not change the configuration under the user, that would be confusing if they later print it or look in the web UI. Instead, maybe add a SparkEnv.getShuffleManagerClass(conf: SparkConf) that can return the real class name.

Also I'd be fine initializing the ShuffleBlockManager after the ShuffleManager if that works, and using isInstanceOf. That would be the cleanest.

xiliu82 pushed a commit to xiliu82/spark that referenced this pull request Sep 4, 2014
… no sorting/aggregation and # partitions is small

As described in https://issues.apache.org/jira/browse/SPARK-2787, right now sort-based shuffle is more expensive than hash-based for map operations that do no partial aggregation or sorting, such as groupByKey. This is because it has to serialize each data item twice (once when spilling to intermediate files, and then again when merging these files object-by-object). This patch adds a code path to just write separate files directly if the # of output partitions is small, and concatenate them at the end to produce a sorted file.

On the unit test side, I added some tests that force or don't force this bypass path to be used, and checked that our tests for other features (e.g. all the operations) cover both cases.

Author: Matei Zaharia <[email protected]>

Closes apache#1799 from mateiz/SPARK-2787 and squashes the following commits:

88cf26a [Matei Zaharia] Fix rebase
10233af [Matei Zaharia] Review comments
398cb95 [Matei Zaharia] Fix looking up shuffle manager in conf
ca3efd9 [Matei Zaharia] Add docs for shuffle manager properties, and allow short names for them
d0ae3c5 [Matei Zaharia] Fix some comments
90d084f [Matei Zaharia] Add code path to bypass merge-sort in ExternalSorter, and tests
31e5d7c [Matei Zaharia] Move existing logic for writing partitioned files into ExternalSorter
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

5 participants