Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-2926][Shuffle]Add MR style sort-merge shuffle read for Spark sort-based shuffle #3438

Closed
wants to merge 27 commits into from

Conversation

jerryshao
Copy link
Contributor

This is a joint work with @sryza, Details and performance test report can be seen in SPARK-2926.

@jerryshao jerryshao changed the title [SPARK-2926][Shuffle]Add MR style sort-merge shuffle read for Spark sort-based shuffle [WIP][SPARK-2926][Shuffle]Add MR style sort-merge shuffle read for Spark sort-based shuffle Nov 25, 2014
@SparkQA
Copy link

SparkQA commented Nov 25, 2014

Test build #23807 has finished for PR 3438 at commit bfc2614.

  • This patch fails Scala style tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Nov 25, 2014

Test build #23809 has finished for PR 3438 at commit 7d839cd.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
    • case class MemoryShuffleBlock(blockId: BlockId, blockData: ManagedBuffer)
    • final class ShuffleBlockFetcherIterator(
    • final class ShuffleRawBlockFetcherIterator(
    • case class DiskShuffleBlock(blockId: BlockId, file: File, len: Long)

@@ -17,6 +17,7 @@

package org.apache.spark.rdd

import scala.collection.mutable
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Redundant imports

@sryza
Copy link
Contributor

sryza commented Nov 25, 2014

The main changes we implemented here are:

  • When a shuffle operation has a key ordering, sort records by key on the map side in addition to sorting by partition.
  • On the reduce side, keep blocks in serialized form, and deserialize and merge them when passing to the operation's output iterator. This means that only (# of blocks being merged) records need to be deserialized at any point in time. This part can be found in SortShuffleReader.
  • If the fetched blocks overflow memory, merge them to an on-disk file.
  • Add a TieredDiskMerger that avoids random I/O by merging 100 on-disk blocks at once. This should be able to be used by ExternalAppendOnlyMap and ExternalSorter as well.

@andrewor14
Copy link
Contributor

Hi @jerryshao @sryza what is the status of this, is it still WIP?

@sryza
Copy link
Contributor

sryza commented Feb 19, 2015

Hi @andrewor14, this is no longer a WIP. It requires a rebase, but was hoping to get some feedback on the approach before working on that.

@andrewor14
Copy link
Contributor

I see. By the way in general it's good to remove the WIP in the title if it is no longer so to encourage reviewers look at this closely.

@sryza
Copy link
Contributor

sryza commented Feb 19, 2015

Ah yeah, great point. @jerryshao mind updating the title? I don't have access.

@jerryshao
Copy link
Contributor Author

Yeah, will do, thanks a lot :).

@jerryshao jerryshao changed the title [WIP][SPARK-2926][Shuffle]Add MR style sort-merge shuffle read for Spark sort-based shuffle [SPARK-2926][Shuffle]Add MR style sort-merge shuffle read for Spark sort-based shuffle Feb 20, 2015
@jerryshao jerryshao force-pushed the sort-shuffle-read-new-netty branch from 7d839cd to c3275ff Compare February 22, 2015 06:09
@SparkQA
Copy link

SparkQA commented Feb 22, 2015

Test build #27831 has finished for PR 3438 at commit c3275ff.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
    • case class MemoryShuffleBlock(blockId: BlockId, blockData: ManagedBuffer)
    • final class ShuffleBlockFetcherIterator(
    • final class ShuffleRawBlockFetcherIterator(
    • case class DiskShuffleBlock(blockId: BlockId, file: File, len: Long)

@jerryshao
Copy link
Contributor Author

Hi @andrewor14 , I've rebased this code against the latest master, would you please help to review this code, thanks a lot.

/** keyComparator for mergeSort, id keyOrdering is not available,
* using hashcode of key to compare */
private val keyComparator: Comparator[K] = dep.keyOrdering.getOrElse(new Comparator[K] {
override def compare(a: K, b: K) = {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We never run into here right? As dep.keyOrdering should be always defined for the SortShuffleReader, simply throws an exception?

@chenghao-intel
Copy link
Contributor

Thanks @jerryshao @sryza , I have some minor comments for this.
This PR is quite critical for performance improvement in Spark SQL (Sort Merge Join) #5208, I'd like to see merging this PR ASAP.
And we took both this PR and #5208 for a performance benchmark, it showed memory utilization reduce dramatically, and even better performance than without those 2 PRs. More benchmark details will be published soon.

cc / @andrewor14 @rxin

* Notify the merger that no more on disk blocks will be registered.
*/
def doneRegisteringOnDiskBlocks(): Unit = {
doneRegistering = true
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

there is a dead lock with Line 176. so i think we need to put doneRegistering = true to mergeReadyMonitor.synchronized {}.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks a lot @lianhuiwang for your comments, we've also met this issue through running queries. I will fix this ASAP.

@jerryshao jerryshao force-pushed the sort-shuffle-read-new-netty branch from c3275ff to d6c94da Compare April 13, 2015 05:11
jerryshao and others added 9 commits April 13, 2015 13:15
Conflicts:
	core/src/main/scala/org/apache/spark/storage/BlockFetcherIterator.scala
	core/src/main/scala/org/apache/spark/storage/BlockManager.scala
	core/src/test/scala/org/apache/spark/storage/BlockFetcherIteratorSuite.scala

Conflicts:
	core/src/main/scala/org/apache/spark/storage/ShuffleBlockFetcherIterator.scala
@SparkQA
Copy link

SparkQA commented Apr 13, 2015

Test build #30143 has finished for PR 3438 at commit d6c94da.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
    • case class MemoryShuffleBlock(blockId: BlockId, blockData: ManagedBuffer)
    • final class ShuffleBlockFetcherIterator(
    • final class ShuffleRawBlockFetcherIterator(
    • case class DiskShuffleBlock(blockId: BlockId, file: File, len: Long)
  • This patch does not change any dependencies.

assert(partitions(0) === Seq((0, 5), (0, 8), (2, 6)))
assert(partitions(1) === Seq((1, 3), (3, 8), (3, 8)))
assert(partitions(0).toSet === Set((0, 5), (0, 8), (2, 6)))
assert(partitions(1).toSet === Set((1, 3), (3, 8), (3, 8)))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why change this? it would not be able to check the right order after sort.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hi @adrian-wang , the ordering is correct, since we only guarantee by-key ordering, the order of tuples with same key is not guaranteed by this PR, this required secondary-sort.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The original code will return (0, 5), (0, 8), (2, 6) and this PR will return (0, 8), (0, 5), (2, 6). I think they both follow the criteria of sort based shuffle (keep the key ordering). Maybe the change is not so straightforward, but it is correct from my understanding.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

From the test it self, it couldn't tell the tuples in partitions(0) or partition(1) is sorted, right?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, this is a problem. I will figure out a better way to test it.

asfgit pushed a commit that referenced this pull request Apr 15, 2015
Thanks for the initial work from Ishiihara in #3173

This PR introduce a new join method of sort merge join, which firstly ensure that keys of same value are in the same partition, and inside each partition the Rows are sorted by key. Then we can run down both sides together, find matched rows using [sort merge join](http://en.wikipedia.org/wiki/Sort-merge_join). In this way, we don't have to store the whole hash table of one side as hash join, thus we have less memory usage. Also, this PR would benefit from #3438 , making the sorting phrase much more efficient.

We introduced a new configuration of "spark.sql.planner.sortMergeJoin" to switch between this(`true`) and ShuffledHashJoin(`false`), probably we want the default value of it be `false` at first.

Author: Daoyuan Wang <[email protected]>
Author: Michael Armbrust <[email protected]>

This patch had conflicts when merged, resolved by
Committer: Michael Armbrust <[email protected]>

Closes #5208 from adrian-wang/smj and squashes the following commits:

2493b9f [Daoyuan Wang] fix style
5049d88 [Daoyuan Wang] propagate rowOrdering for RangePartitioning
f91a2ae [Daoyuan Wang] yin's comment: use external sort if option is enabled, add comments
f515cd2 [Daoyuan Wang] yin's comment: outputOrdering, join suite refine
ec8061b [Daoyuan Wang] minor change
413fd24 [Daoyuan Wang] Merge pull request #3 from marmbrus/pr/5208
952168a [Michael Armbrust] add type
5492884 [Michael Armbrust] copy when ordering
7ddd656 [Michael Armbrust] Cleanup addition of ordering requirements
b198278 [Daoyuan Wang] inherit ordering in project
c8e82a3 [Daoyuan Wang] fix style
6e897dd [Daoyuan Wang] hide boundReference from manually construct RowOrdering for key compare in smj
8681d73 [Daoyuan Wang] refactor Exchange and fix copy for sorting
2875ef2 [Daoyuan Wang] fix changed configuration
61d7f49 [Daoyuan Wang] add omitted comment
00a4430 [Daoyuan Wang] fix bug
078d69b [Daoyuan Wang] address comments: add comments, do sort in shuffle, and others
3af6ba5 [Daoyuan Wang] use buffer for only one side
171001f [Daoyuan Wang] change default outputordering
47455c9 [Daoyuan Wang] add apache license ...
a28277f [Daoyuan Wang] fix style
645c70b [Daoyuan Wang] address comments using sort
068c35d [Daoyuan Wang] fix new style and add some tests
925203b [Daoyuan Wang] address comments
07ce92f [Daoyuan Wang] fix ArrayIndexOutOfBound
42fca0e [Daoyuan Wang] code clean
e3ec096 [Daoyuan Wang] fix comment style..
2edd235 [Daoyuan Wang] fix outputpartitioning
57baa40 [Daoyuan Wang] fix sort eval bug
303b6da [Daoyuan Wang] fix several errors
95db7ad [Daoyuan Wang] fix brackets for if-statement
4464f16 [Daoyuan Wang] fix error
880d8e9 [Daoyuan Wang] sort merge join for spark sql
val partialMergedItr =
MergeUtil.mergeSort(itrGroup, keyComparator, dep.keyOrdering, dep.aggregator)
val curWriteMetrics = new ShuffleWriteMetrics()
var writer = blockManager.getDiskWriter(tmpBlockId, file, ser, fileBufferSize, curWriteMetrics)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

getDiskWriter had changed on #5606

@andrewor14
Copy link
Contributor

@jerryshao unfortunately the shuffle code has changed significantly since this patch was last updated and it will unlikely be merged. Would you mind closing this patch for now? If there's interest we can always reopen it against the latest master branch.

@jerryshao jerryshao closed this Sep 2, 2015
@jerryshao
Copy link
Contributor Author

OK, thanks a lot.

@yaooqinn
Copy link
Member

Is there any progress about this mechanism study?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

9 participants