Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-7165] [SQL] use sort merge join for outer join #5717

Closed
wants to merge 10 commits into from

Conversation

adrian-wang
Copy link
Contributor

This is an extended version of #5208
In this patch, we are introducing sort merge join for not only inner joins, but left outer/ right outer/ full outer joins.
Using sort merge join could resolve the OOM which is quite common as the memory easily becomes too small for joins of large tables.

Test cases are always available in SortMergeCompatibilitySuite.
Also , This patch would benefit from #3438 quite a lot.

/cc @chenghao-intel

@SparkQA
Copy link

SparkQA commented Apr 27, 2015

Test build #30964 has finished for PR 5717 at commit fc862f4.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.
  • This patch does not change any dependencies.

@SparkQA
Copy link

SparkQA commented Apr 28, 2015

Test build #31103 has finished for PR 5717 at commit ae68ee7.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.
  • This patch does not change any dependencies.

@SparkQA
Copy link

SparkQA commented May 20, 2015

Test build #33121 has finished for PR 5717 at commit 44fd7cf.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

case FullOuter =>
left.output.map(_.withNullability(true)) ++ right.output.map(_.withNullability(true))
case x =>
throw new Exception(s"SortMergeJoin should not take $x as the JoinType")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

sys.error

@SparkQA
Copy link

SparkQA commented May 27, 2015

Test build #33580 has finished for PR 5717 at commit 6aaa593.

  • This patch fails SparkR unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@adrian-wang
Copy link
Contributor Author

retest this please.

@SparkQA
Copy link

SparkQA commented May 27, 2015

Test build #33582 has finished for PR 5717 at commit 6aaa593.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.


override def outputPartitioning: Partitioning = left.outputPartitioning
override def outputPartitioning: Partitioning = joinType match {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

NOTICE: Should always be streamed.outputPartitioning once #6413 merged, see https://github.com/apache/spark/pull/6413/files#diff-48230fdc68c8c172d22709ed90f8817dR50

@SparkQA
Copy link

SparkQA commented May 29, 2015

Test build #33707 has finished for PR 5717 at commit add49a2.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@JoshRosen
Copy link
Contributor

Jenkins, retest this please.


private def requiredOrders(keys: Seq[Expression]): Seq[SortOrder] =
keys.map(SortOrder(_, Ascending))

protected override def doExecute(): RDD[Row] = {
val leftResults = left.execute().map(_.copy())
val rightResults = right.execute().map(_.copy())
val streamResults = streamed.execute().map(_.copy())
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why do we need to copy the streamed rows? I understand why we need to do the copy for the buffered results, since we might be dealing with mutable input rows, but that shouldn't be a problem for the stream side, right?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we need to copy this, it has something to do with the external sort.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We certainly need to copy the inputs that are passed to external sort, but the ExternalSort operator itself should take care of that. Here, I think we're consuming the result of a sort operator and are not buffering rows from streamResults (unless I've overlooked other buffering inside of zipPartitions somehow).

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That's true for Left/Right Outer and even inner join, however, in full outer join, we probably need to cache the streamed row once, but you're right, we can do the copy whenever necessary during the iterating, not here.

@SparkQA
Copy link

SparkQA commented Jun 12, 2015

Test build #34777 has finished for PR 5717 at commit add49a2.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@adrian-wang
Copy link
Contributor Author

@JoshRosen Thanks for your comments, I will refine the code accordingly.

@JoshRosen
Copy link
Contributor

By the way, to provide a bit of context for why I'm reviewing this PR: I'm working on some optimizations to sorting in Spark SQL which should benefit sort-merge-join, so I've looked over all of this code pretty recently.

@JoshRosen
Copy link
Contributor

@adrian-wang, I'm planning to take another pass on this pretty soon. At a high level, this patch is in very good shape since most of its code is modeled after other existing join implementations in Spark SQL. If you update this in the next couple of days, I'll try my best to be responsive with my reviews so we can get this in soon and not have too many merge conflicts.

@SparkQA
Copy link

SparkQA commented Jun 19, 2015

Test build #35233 has finished for PR 5717 at commit 211e101.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
    • class SerializableConfiguration(@transient var value: Configuration) extends Serializable
    • class SerializableJobConf(@transient var value: JobConf) extends Serializable
    • class ElementwiseProduct(VectorTransformer):
    • case class CreateStruct(children: Seq[Expression]) extends Expression
    • case class Sqrt(child: Expression) extends UnaryMathExpression(math.sqrt, "SQRT")
    • case class Logarithm(left: Expression, right: Expression)
    • case class SetCommand(kv: Option[(String, Option[String])]) extends RunnableCommand with Logging

@adrian-wang
Copy link
Contributor Author

retest this please.

@SparkQA
Copy link

SparkQA commented Jun 20, 2015

Test build #35337 has finished for PR 5717 at commit 211e101.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
    • class SerializableConfiguration(@transient var value: Configuration) extends Serializable
    • class SerializableJobConf(@transient var value: JobConf) extends Serializable

@JoshRosen
Copy link
Contributor

Thanks for updating this; I'll try to take another review pass tomorrow.

if (bufferedPosition >= bufferedMatches.size) {
bufferedPosition = 0
if (joinType != FullOuter || secondStreamedElement == null) {
fetchStreamed()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we should use boundCondition to update bufferedMatches after we fetchStreamed () .Otherwise we may get wrong answer.For example

table a(key int,value int);table b(key int,value int)
data of a
1 3
1 1
2 1
2 3

data of b
1 1
2 1
select a.key,b.key,a.value-b.value from a left outer join b on a.key=b.key and a.value - b.value > 1

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good catch, I'll rewrite this part.

@SparkQA
Copy link

SparkQA commented Jul 30, 2015

Test build #39030 has finished for PR 5717 at commit fd73084.

  • This patch fails Scala style tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Jul 31, 2015

Test build #39159 has finished for PR 5717 at commit f520079.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Aug 4, 2015

Test build #39678 has finished for PR 5717 at commit d0e65c5.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Aug 4, 2015

Test build #39689 has finished for PR 5717 at commit bff834a.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@adrian-wang
Copy link
Contributor Author

@JoshRosen I've fixed the bug that @jeanlyn mentioned, can you merge this first and then do the following steps in #7904 ?

@JoshRosen
Copy link
Contributor

We should have a test to guard against reintroduction of the the bug that @jeanlyn mentioned.

I find the code here to be really dense and hard to understand, so I'd like to try to pursue my design first. There's another 1.5 blocker / critical related to eliminating JoinedRow in favor of Tungsten's RowJoiner when UnsafeRows are used, and I think that the code re-use enabled by my design will make this significantly easier to accomplish.

@JoshRosen
Copy link
Contributor

Also, I think that it might be a little clearer to introduce a separate SortMergeOuterJoin operator rather than trying to combine the inner and outer joins into the same operator. This would be consistent with what we've done for other joins.

("SELECT * FROM testData RIGHT JOIN testData2 ON key = a where key = 2",
classOf[BroadcastHashOuterJoin]),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It looks like this patch causes us to plan SortMergeJoin for outer joins that are capable of using BroadcastHashOuterJoin, which seems like it could lead to performance issues by triggering unnecessary shuffling of the large table.

As a result, I think that we should not change the broadcast-enabled half of the test, but, rather, should update the broadcast-disabled half to test both the sort-merge-join enabled and sort-merge-join-disabled configurations.

@adrian-wang
Copy link
Contributor Author

retest this please.

@SparkQA
Copy link

SparkQA commented Aug 5, 2015

Test build #227 has finished for PR 5717 at commit 549796e.

  • This patch fails to build.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Aug 5, 2015

Test build #39844 has finished for PR 5717 at commit 549796e.

  • This patch fails to build.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Aug 6, 2015

Test build #39983 has finished for PR 5717 at commit d02f6bb.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

}

/**
* Searches the right iterator for the next rows that have matches in left side, and store
* them in a buffer.
* Searches the right iterator for the next rows that have matches in left side (only check
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is it confusing because right and left can be both streamed or buffered here? Do we need to use streamed and buffered in the comments as well?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This may be clarified slightly in my own SMJ patch, #7904.

asfgit pushed a commit that referenced this pull request Aug 11, 2015
…t outer join

This patch adds a new `SortMergeOuterJoin` operator that performs left and right outer joins using sort merge join.  It also refactors `SortMergeJoin` in order to improve performance and code clarity.

Along the way, I also performed a couple pieces of minor cleanup and optimization:

- Rename the `HashJoin` physical planner rule to `EquiJoinSelection`, since it's also used for non-hash joins.
- Rewrite the comment at the top of `HashJoin` to better explain the precedence for choosing join operators.
- Update `JoinSuite` to use `SqlTestUtils.withConf` for changing SQLConf settings.

This patch incorporates several ideas from adrian-wang's patch, #5717.

Closes #5717.

<!-- Reviewable:start -->
[<img src="https://reviewable.io/review_button.png" height=40 alt="Review on Reviewable"/>](https://reviewable.io/reviews/apache/spark/7904)
<!-- Reviewable:end -->

Author: Josh Rosen <[email protected]>
Author: Daoyuan Wang <[email protected]>

Closes #7904 from JoshRosen/outer-join-smj and squashes 1 commits.

(cherry picked from commit 91e9389)
Signed-off-by: Reynold Xin <[email protected]>
@asfgit asfgit closed this in 91e9389 Aug 11, 2015
CodingCat pushed a commit to CodingCat/spark that referenced this pull request Aug 17, 2015
…t outer join

This patch adds a new `SortMergeOuterJoin` operator that performs left and right outer joins using sort merge join.  It also refactors `SortMergeJoin` in order to improve performance and code clarity.

Along the way, I also performed a couple pieces of minor cleanup and optimization:

- Rename the `HashJoin` physical planner rule to `EquiJoinSelection`, since it's also used for non-hash joins.
- Rewrite the comment at the top of `HashJoin` to better explain the precedence for choosing join operators.
- Update `JoinSuite` to use `SqlTestUtils.withConf` for changing SQLConf settings.

This patch incorporates several ideas from adrian-wang's patch, apache#5717.

Closes apache#5717.

<!-- Reviewable:start -->
[<img src="https://reviewable.io/review_button.png" height=40 alt="Review on Reviewable"/>](https://reviewable.io/reviews/apache/spark/7904)
<!-- Reviewable:end -->

Author: Josh Rosen <[email protected]>
Author: Daoyuan Wang <[email protected]>

Closes apache#7904 from JoshRosen/outer-join-smj and squashes 1 commits.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

7 participants