-
Notifications
You must be signed in to change notification settings - Fork 28.5k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[SPARK-7165] [SQL] use sort merge join for outer join #5717
Conversation
Test build #30964 has finished for PR 5717 at commit
|
Test build #31103 has finished for PR 5717 at commit
|
Test build #33121 has finished for PR 5717 at commit
|
case FullOuter => | ||
left.output.map(_.withNullability(true)) ++ right.output.map(_.withNullability(true)) | ||
case x => | ||
throw new Exception(s"SortMergeJoin should not take $x as the JoinType") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
sys.error
Test build #33580 has finished for PR 5717 at commit
|
retest this please. |
Test build #33582 has finished for PR 5717 at commit
|
|
||
override def outputPartitioning: Partitioning = left.outputPartitioning | ||
override def outputPartitioning: Partitioning = joinType match { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
NOTICE: Should always be streamed.outputPartitioning
once #6413 merged, see https://github.com/apache/spark/pull/6413/files#diff-48230fdc68c8c172d22709ed90f8817dR50
Test build #33707 has finished for PR 5717 at commit
|
Jenkins, retest this please. |
|
||
private def requiredOrders(keys: Seq[Expression]): Seq[SortOrder] = | ||
keys.map(SortOrder(_, Ascending)) | ||
|
||
protected override def doExecute(): RDD[Row] = { | ||
val leftResults = left.execute().map(_.copy()) | ||
val rightResults = right.execute().map(_.copy()) | ||
val streamResults = streamed.execute().map(_.copy()) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why do we need to copy the streamed rows? I understand why we need to do the copy for the buffered results, since we might be dealing with mutable input rows, but that shouldn't be a problem for the stream side, right?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think we need to copy this, it has something to do with the external sort.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We certainly need to copy the inputs that are passed to external sort, but the ExternalSort
operator itself should take care of that. Here, I think we're consuming the result of a sort operator and are not buffering rows from streamResults
(unless I've overlooked other buffering inside of zipPartitions
somehow).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
That's true for Left/Right Outer and even inner join, however, in full outer join, we probably need to cache the streamed row once, but you're right, we can do the copy whenever necessary during the iterating, not here.
Test build #34777 has finished for PR 5717 at commit
|
@JoshRosen Thanks for your comments, I will refine the code accordingly. |
By the way, to provide a bit of context for why I'm reviewing this PR: I'm working on some optimizations to sorting in Spark SQL which should benefit sort-merge-join, so I've looked over all of this code pretty recently. |
@adrian-wang, I'm planning to take another pass on this pretty soon. At a high level, this patch is in very good shape since most of its code is modeled after other existing join implementations in Spark SQL. If you update this in the next couple of days, I'll try my best to be responsive with my reviews so we can get this in soon and not have too many merge conflicts. |
Test build #35233 has finished for PR 5717 at commit
|
retest this please. |
Test build #35337 has finished for PR 5717 at commit
|
Thanks for updating this; I'll try to take another review pass tomorrow. |
if (bufferedPosition >= bufferedMatches.size) { | ||
bufferedPosition = 0 | ||
if (joinType != FullOuter || secondStreamedElement == null) { | ||
fetchStreamed() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think we should use boundCondition
to update bufferedMatches
after we fetchStreamed ()
.Otherwise we may get wrong answer.For example
table a(key int,value int);table b(key int,value int)
data of a
1 3
1 1
2 1
2 3
data of b
1 1
2 1
select a.key,b.key,a.value-b.value from a left outer join b on a.key=b.key and a.value - b.value > 1
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good catch, I'll rewrite this part.
Test build #39030 has finished for PR 5717 at commit
|
Test build #39159 has finished for PR 5717 at commit
|
Test build #39678 has finished for PR 5717 at commit
|
Test build #39689 has finished for PR 5717 at commit
|
@JoshRosen I've fixed the bug that @jeanlyn mentioned, can you merge this first and then do the following steps in #7904 ? |
We should have a test to guard against reintroduction of the the bug that @jeanlyn mentioned. I find the code here to be really dense and hard to understand, so I'd like to try to pursue my design first. There's another 1.5 blocker / critical related to eliminating JoinedRow in favor of Tungsten's RowJoiner when UnsafeRows are used, and I think that the code re-use enabled by my design will make this significantly easier to accomplish. |
Also, I think that it might be a little clearer to introduce a separate |
("SELECT * FROM testData RIGHT JOIN testData2 ON key = a where key = 2", | ||
classOf[BroadcastHashOuterJoin]), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It looks like this patch causes us to plan SortMergeJoin for outer joins that are capable of using BroadcastHashOuterJoin, which seems like it could lead to performance issues by triggering unnecessary shuffling of the large table.
As a result, I think that we should not change the broadcast-enabled half of the test, but, rather, should update the broadcast-disabled half to test both the sort-merge-join enabled and sort-merge-join-disabled configurations.
retest this please. |
Test build #227 has finished for PR 5717 at commit
|
Test build #39844 has finished for PR 5717 at commit
|
Test build #39983 has finished for PR 5717 at commit
|
} | ||
|
||
/** | ||
* Searches the right iterator for the next rows that have matches in left side, and store | ||
* them in a buffer. | ||
* Searches the right iterator for the next rows that have matches in left side (only check |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is it confusing because right and left can be both streamed or buffered here? Do we need to use streamed and buffered in the comments as well?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This may be clarified slightly in my own SMJ patch, #7904.
…t outer join This patch adds a new `SortMergeOuterJoin` operator that performs left and right outer joins using sort merge join. It also refactors `SortMergeJoin` in order to improve performance and code clarity. Along the way, I also performed a couple pieces of minor cleanup and optimization: - Rename the `HashJoin` physical planner rule to `EquiJoinSelection`, since it's also used for non-hash joins. - Rewrite the comment at the top of `HashJoin` to better explain the precedence for choosing join operators. - Update `JoinSuite` to use `SqlTestUtils.withConf` for changing SQLConf settings. This patch incorporates several ideas from adrian-wang's patch, #5717. Closes #5717. <!-- Reviewable:start --> [<img src="https://reviewable.io/review_button.png" height=40 alt="Review on Reviewable"/>](https://reviewable.io/reviews/apache/spark/7904) <!-- Reviewable:end --> Author: Josh Rosen <[email protected]> Author: Daoyuan Wang <[email protected]> Closes #7904 from JoshRosen/outer-join-smj and squashes 1 commits. (cherry picked from commit 91e9389) Signed-off-by: Reynold Xin <[email protected]>
…t outer join This patch adds a new `SortMergeOuterJoin` operator that performs left and right outer joins using sort merge join. It also refactors `SortMergeJoin` in order to improve performance and code clarity. Along the way, I also performed a couple pieces of minor cleanup and optimization: - Rename the `HashJoin` physical planner rule to `EquiJoinSelection`, since it's also used for non-hash joins. - Rewrite the comment at the top of `HashJoin` to better explain the precedence for choosing join operators. - Update `JoinSuite` to use `SqlTestUtils.withConf` for changing SQLConf settings. This patch incorporates several ideas from adrian-wang's patch, apache#5717. Closes apache#5717. <!-- Reviewable:start --> [<img src="https://reviewable.io/review_button.png" height=40 alt="Review on Reviewable"/>](https://reviewable.io/reviews/apache/spark/7904) <!-- Reviewable:end --> Author: Josh Rosen <[email protected]> Author: Daoyuan Wang <[email protected]> Closes apache#7904 from JoshRosen/outer-join-smj and squashes 1 commits.
This is an extended version of #5208
In this patch, we are introducing sort merge join for not only inner joins, but left outer/ right outer/ full outer joins.
Using sort merge join could resolve the OOM which is quite common as the memory easily becomes too small for joins of large tables.
Test cases are always available in SortMergeCompatibilitySuite.
Also , This patch would benefit from #3438 quite a lot.
/cc @chenghao-intel