-
Notifications
You must be signed in to change notification settings - Fork 28.5k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[SPARK-2926][Shuffle]Add MR style sort-merge shuffle read for Spark sort-based shuffle #3438
Conversation
Test build #23807 has finished for PR 3438 at commit
|
Test build #23809 has finished for PR 3438 at commit
|
@@ -17,6 +17,7 @@ | |||
|
|||
package org.apache.spark.rdd | |||
|
|||
import scala.collection.mutable |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Redundant imports
The main changes we implemented here are:
|
Hi @jerryshao @sryza what is the status of this, is it still WIP? |
Hi @andrewor14, this is no longer a WIP. It requires a rebase, but was hoping to get some feedback on the approach before working on that. |
I see. By the way in general it's good to remove the |
Ah yeah, great point. @jerryshao mind updating the title? I don't have access. |
Yeah, will do, thanks a lot :). |
7d839cd
to
c3275ff
Compare
Test build #27831 has finished for PR 3438 at commit
|
Hi @andrewor14 , I've rebased this code against the latest master, would you please help to review this code, thanks a lot. |
/** keyComparator for mergeSort, id keyOrdering is not available, | ||
* using hashcode of key to compare */ | ||
private val keyComparator: Comparator[K] = dep.keyOrdering.getOrElse(new Comparator[K] { | ||
override def compare(a: K, b: K) = { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We never run into here right? As dep.keyOrdering
should be always defined for the SortShuffleReader
, simply throws an exception?
Thanks @jerryshao @sryza , I have some minor comments for this. cc / @andrewor14 @rxin |
* Notify the merger that no more on disk blocks will be registered. | ||
*/ | ||
def doneRegisteringOnDiskBlocks(): Unit = { | ||
doneRegistering = true |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
there is a dead lock with Line 176. so i think we need to put doneRegistering = true to mergeReadyMonitor.synchronized {}.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks a lot @lianhuiwang for your comments, we've also met this issue through running queries. I will fix this ASAP.
c3275ff
to
d6c94da
Compare
Conflicts: core/src/main/scala/org/apache/spark/storage/BlockFetcherIterator.scala core/src/main/scala/org/apache/spark/storage/BlockManager.scala core/src/test/scala/org/apache/spark/storage/BlockFetcherIteratorSuite.scala Conflicts: core/src/main/scala/org/apache/spark/storage/ShuffleBlockFetcherIterator.scala
…k size, and properly increment _diskBytesSpilled
Test build #30143 has finished for PR 3438 at commit
|
assert(partitions(0) === Seq((0, 5), (0, 8), (2, 6))) | ||
assert(partitions(1) === Seq((1, 3), (3, 8), (3, 8))) | ||
assert(partitions(0).toSet === Set((0, 5), (0, 8), (2, 6))) | ||
assert(partitions(1).toSet === Set((1, 3), (3, 8), (3, 8))) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why change this? it would not be able to check the right order after sort.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hi @adrian-wang , the ordering is correct, since we only guarantee by-key ordering, the order of tuples with same key is not guaranteed by this PR, this required secondary-sort.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The original code will return (0, 5), (0, 8), (2, 6)
and this PR will return (0, 8), (0, 5), (2, 6)
. I think they both follow the criteria of sort based shuffle (keep the key ordering). Maybe the change is not so straightforward, but it is correct from my understanding.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
From the test it self, it couldn't tell the tuples in partitions(0) or partition(1) is sorted, right?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah, this is a problem. I will figure out a better way to test it.
Thanks for the initial work from Ishiihara in #3173 This PR introduce a new join method of sort merge join, which firstly ensure that keys of same value are in the same partition, and inside each partition the Rows are sorted by key. Then we can run down both sides together, find matched rows using [sort merge join](http://en.wikipedia.org/wiki/Sort-merge_join). In this way, we don't have to store the whole hash table of one side as hash join, thus we have less memory usage. Also, this PR would benefit from #3438 , making the sorting phrase much more efficient. We introduced a new configuration of "spark.sql.planner.sortMergeJoin" to switch between this(`true`) and ShuffledHashJoin(`false`), probably we want the default value of it be `false` at first. Author: Daoyuan Wang <[email protected]> Author: Michael Armbrust <[email protected]> This patch had conflicts when merged, resolved by Committer: Michael Armbrust <[email protected]> Closes #5208 from adrian-wang/smj and squashes the following commits: 2493b9f [Daoyuan Wang] fix style 5049d88 [Daoyuan Wang] propagate rowOrdering for RangePartitioning f91a2ae [Daoyuan Wang] yin's comment: use external sort if option is enabled, add comments f515cd2 [Daoyuan Wang] yin's comment: outputOrdering, join suite refine ec8061b [Daoyuan Wang] minor change 413fd24 [Daoyuan Wang] Merge pull request #3 from marmbrus/pr/5208 952168a [Michael Armbrust] add type 5492884 [Michael Armbrust] copy when ordering 7ddd656 [Michael Armbrust] Cleanup addition of ordering requirements b198278 [Daoyuan Wang] inherit ordering in project c8e82a3 [Daoyuan Wang] fix style 6e897dd [Daoyuan Wang] hide boundReference from manually construct RowOrdering for key compare in smj 8681d73 [Daoyuan Wang] refactor Exchange and fix copy for sorting 2875ef2 [Daoyuan Wang] fix changed configuration 61d7f49 [Daoyuan Wang] add omitted comment 00a4430 [Daoyuan Wang] fix bug 078d69b [Daoyuan Wang] address comments: add comments, do sort in shuffle, and others 3af6ba5 [Daoyuan Wang] use buffer for only one side 171001f [Daoyuan Wang] change default outputordering 47455c9 [Daoyuan Wang] add apache license ... a28277f [Daoyuan Wang] fix style 645c70b [Daoyuan Wang] address comments using sort 068c35d [Daoyuan Wang] fix new style and add some tests 925203b [Daoyuan Wang] address comments 07ce92f [Daoyuan Wang] fix ArrayIndexOutOfBound 42fca0e [Daoyuan Wang] code clean e3ec096 [Daoyuan Wang] fix comment style.. 2edd235 [Daoyuan Wang] fix outputpartitioning 57baa40 [Daoyuan Wang] fix sort eval bug 303b6da [Daoyuan Wang] fix several errors 95db7ad [Daoyuan Wang] fix brackets for if-statement 4464f16 [Daoyuan Wang] fix error 880d8e9 [Daoyuan Wang] sort merge join for spark sql
val partialMergedItr = | ||
MergeUtil.mergeSort(itrGroup, keyComparator, dep.keyOrdering, dep.aggregator) | ||
val curWriteMetrics = new ShuffleWriteMetrics() | ||
var writer = blockManager.getDiskWriter(tmpBlockId, file, ser, fileBufferSize, curWriteMetrics) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
getDiskWriter
had changed on #5606
@jerryshao unfortunately the shuffle code has changed significantly since this patch was last updated and it will unlikely be merged. Would you mind closing this patch for now? If there's interest we can always reopen it against the latest master branch. |
OK, thanks a lot. |
Is there any progress about this mechanism study? |
This is a joint work with @sryza, Details and performance test report can be seen in SPARK-2926.