Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-9729] [SPARK-9363] [SQL] Use sort merge join for left and right outer join #7904

Closed
wants to merge 62 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
62 commits
Select commit Hold shift + click to select a range
be19a0f
[SPARK-9054] [SQL] Rename RowOrdering to InterpretedOrdering; use new…
JoshRosen Aug 5, 2015
34b8e0c
Import ordering
JoshRosen Aug 5, 2015
e610655
Add comment RE: Ascending ordering
JoshRosen Aug 5, 2015
df88548
Squash @adrian-wang's changes.
adrian-wang Jun 19, 2015
58edb2e
Remove old TODO
JoshRosen Aug 3, 2015
9faa2ee
Use withSQLConf in JoinSuite
JoshRosen Aug 4, 2015
8d83e15
Use explicit toScala conversions in ShuffledHashOuterJoin.
JoshRosen Aug 4, 2015
a471a6e
Revert changes to SortMergeJoin; add new SortMergeOuterJoin operator
JoshRosen Aug 4, 2015
cf8c042
Fix join operator selection for outer join:
JoshRosen Aug 4, 2015
a09d6e3
Rename HashOuterJoin to OuterJoin.
JoshRosen Aug 4, 2015
58b2d1c
Clean up non-obvious side-effect in JoinedRow.with[Left|Right]
JoshRosen Aug 4, 2015
07ef478
Style cleanup in flatMap; use curly braces instead of parens.
JoshRosen Aug 4, 2015
c3c7ed4
Move initialize() definition closer to usage.
JoshRosen Aug 4, 2015
78714dd
Large refactoring of SMJ internals to improve clarity.
JoshRosen Aug 5, 2015
124f4ba
Remove unnecessary row copying.
JoshRosen Aug 5, 2015
8c50c30
Support SMJ for left outer join.
JoshRosen Aug 5, 2015
8dade55
Also enable for right outer join.
JoshRosen Aug 5, 2015
8e496b2
Fix scalastyle
JoshRosen Aug 5, 2015
6587ef2
Rewrite OuterJoinSuite in preparation for adding more tests.
JoshRosen Aug 5, 2015
a8d1074
Merge branch 'SPARK-9054' into outer-join-smj
JoshRosen Aug 5, 2015
4603081
Merge remote-tracking branch 'origin/master' into outer-join-smj
JoshRosen Aug 5, 2015
681e879
Add tests for outer joins with both inputs empty
JoshRosen Aug 6, 2015
3772505
Fix two minor bugs in SMJ (regression tests pending)
JoshRosen Aug 6, 2015
716bdff
Merge remote-tracking branch 'origin/master' into outer-join-smj
JoshRosen Aug 6, 2015
82632c8
Allow UnsafeRows to be processed in SortMergeJoin
JoshRosen Aug 7, 2015
6e18bc3
Rename HashJoin to EquiJoinSelection
JoshRosen Aug 7, 2015
289e91d
Remove unnecessary requiredChildDistribution from BroadcastHashOuterJoin
JoshRosen Aug 7, 2015
e3f6d71
Use ArrayBuffer instead of CompactBuffer
JoshRosen Aug 7, 2015
075f372
Add missing row key null checks in BroadcastHashOuterJoin
JoshRosen Aug 7, 2015
df250c8
Update to reflect deferral of full outer join to followup patch
JoshRosen Aug 7, 2015
6bbde8c
Merge remote-tracking branch 'origin/master' into outer-join-smj
JoshRosen Aug 7, 2015
bdf513c
Rename build to buffered
JoshRosen Aug 7, 2015
1d8a48c
Update SortMergeJoin to output UnsafeRow in Unsafe mode
JoshRosen Aug 7, 2015
82b7e45
Try to clean up confusingly dense one-liner
JoshRosen Aug 7, 2015
4a4590f
Commment update
JoshRosen Aug 7, 2015
93723e2
Experiment towards using efficient internal iterators.
JoshRosen Aug 7, 2015
7712f7e
Merge remote-tracking branch 'origin/master' into outer-join-smj
JoshRosen Aug 7, 2015
441b89a
Back out now-unnecessary changes to other OuterJoin operators.
JoshRosen Aug 7, 2015
d16b60a
Revert another unnecessary change.
JoshRosen Aug 7, 2015
2c1253f
Add RowIterator.fromScala and use it to guarantee that copying is unn…
JoshRosen Aug 7, 2015
2b68452
Override row format methods for SortMergeOuterJoin
JoshRosen Aug 7, 2015
db80faa
Merge remote-tracking branch 'origin/master' into outer-join-smj
JoshRosen Aug 7, 2015
d41ac51
Fix bug in advancing leftIdx/rightIdx.
JoshRosen Aug 8, 2015
9f48a5c
Efficiency improvement in boundCondition.
JoshRosen Aug 8, 2015
1813a45
For left and right outer joins, streamed rows should not have null jo…
JoshRosen Aug 8, 2015
f183307
Two minor comments on output ordering
JoshRosen Aug 8, 2015
f456086
Add note RE: non-nullability of streamed side's join keys.
JoshRosen Aug 8, 2015
2e5eb2d
Fix loss of rows when removing RowIteratorToScala wrapper.
JoshRosen Aug 8, 2015
a7a24f5
Use RowIterator in SortMergeJoin as well
JoshRosen Aug 8, 2015
7910e83
Add giant comment to RowIterator
JoshRosen Aug 8, 2015
fd439cb
Move RowIterator to execution package
JoshRosen Aug 8, 2015
51ee4b2
Remove incorrect assertions; the non-join-key columns can be null
JoshRosen Aug 8, 2015
e23db3d
Experiment with removing copy
JoshRosen Aug 8, 2015
f701652
Fix incorrectly-placed null check.
JoshRosen Aug 9, 2015
7d3cc5d
It turns out that the copy is unnecessary.
JoshRosen Aug 9, 2015
f83b412
Push null check into buffered iterator next().
JoshRosen Aug 9, 2015
81956b0
Improve unit test coverage of join physical operators.
JoshRosen Aug 6, 2015
899dce2
Expand test data to cover multiple buffered rows per group.
JoshRosen Aug 10, 2015
e79909e
Fix parallelism in join operator unit tests.
JoshRosen Aug 11, 2015
5c34f75
Add regression test exposing bug with missing while loop
JoshRosen Aug 11, 2015
c188a21
Fix while loops while adding regression tests.
JoshRosen Aug 11, 2015
eabacca
comment updates
JoshRosen Aug 11, 2015
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -37,20 +37,20 @@ class JoinedRow extends InternalRow {
}

/** Updates this JoinedRow to used point at two new base rows. Returns itself. */
def apply(r1: InternalRow, r2: InternalRow): InternalRow = {
def apply(r1: InternalRow, r2: InternalRow): JoinedRow = {
row1 = r1
row2 = r2
this
}

/** Updates this JoinedRow by updating its left base row. Returns itself. */
def withLeft(newLeft: InternalRow): InternalRow = {
def withLeft(newLeft: InternalRow): JoinedRow = {
row1 = newLeft
this
}

/** Updates this JoinedRow by updating its right base row. Returns itself. */
def withRight(newRight: InternalRow): InternalRow = {
def withRight(newRight: InternalRow): JoinedRow = {
row2 = newRight
this
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -873,7 +873,7 @@ class SQLContext(@transient val sparkContext: SparkContext)
HashAggregation ::
Aggregation ::
LeftSemiJoin ::
HashJoin ::
EquiJoinSelection ::
InMemoryScans ::
BasicOperators ::
CartesianProduct ::
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,93 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.sql.execution

import java.util.NoSuchElementException

import org.apache.spark.sql.catalyst.InternalRow

/**
* An internal iterator interface which presents a more restrictive API than
* [[scala.collection.Iterator]].
*
* One major departure from the Scala iterator API is the fusing of the `hasNext()` and `next()`
* calls: Scala's iterator allows users to call `hasNext()` without immediately advancing the
* iterator to consume the next row, whereas RowIterator combines these calls into a single
* [[advanceNext()]] method.
*/
private[sql] abstract class RowIterator {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why do we need RowIterator?

(we do not need to make any change for this in this PR)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We don't need it anymore, strictly speaking, although I think it's a nicer API to implement against.

/**
* Advance this iterator by a single row. Returns `false` if this iterator has no more rows
* and `true` otherwise. If this returns `true`, then the new row can be retrieved by calling
* [[getRow]].
*/
def advanceNext(): Boolean

/**
* Retrieve the row from this iterator. This method is idempotent. It is illegal to call this
* method after [[advanceNext()]] has returned `false`.
*/
def getRow: InternalRow

/**
* Convert this RowIterator into a [[scala.collection.Iterator]].
*/
def toScala: Iterator[InternalRow] = new RowIteratorToScala(this)
}

object RowIterator {
def fromScala(scalaIter: Iterator[InternalRow]): RowIterator = {
scalaIter match {
case wrappedRowIter: RowIteratorToScala => wrappedRowIter.rowIter
case _ => new RowIteratorFromScala(scalaIter)
}
}
}

private final class RowIteratorToScala(val rowIter: RowIterator) extends Iterator[InternalRow] {
private [this] var hasNextWasCalled: Boolean = false
private [this] var _hasNext: Boolean = false
override def hasNext: Boolean = {
// Idempotency:
if (!hasNextWasCalled) {
_hasNext = rowIter.advanceNext()
hasNextWasCalled = true
}
_hasNext
}
override def next(): InternalRow = {
if (!hasNext) throw new NoSuchElementException
hasNextWasCalled = false
rowIter.getRow
}
}

private final class RowIteratorFromScala(scalaIter: Iterator[InternalRow]) extends RowIterator {
private[this] var _next: InternalRow = null
override def advanceNext(): Boolean = {
if (scalaIter.hasNext) {
_next = scalaIter.next()
true
} else {
_next = null
false
}
}
override def getRow: InternalRow = _next
override def toScala: Iterator[InternalRow] = scalaIter
}
Original file line number Diff line number Diff line change
Expand Up @@ -63,19 +63,23 @@ private[sql] abstract class SparkStrategies extends QueryPlanner[SparkPlan] {
}

/**
* Uses the ExtractEquiJoinKeys pattern to find joins where at least some of the predicates can be
* evaluated by matching hash keys.
* Uses the [[ExtractEquiJoinKeys]] pattern to find joins where at least some of the predicates
* can be evaluated by matching join keys.
*
* This strategy applies a simple optimization based on the estimates of the physical sizes of
* the two join sides. When planning a [[joins.BroadcastHashJoin]], if one side has an
* estimated physical size smaller than the user-settable threshold
* [[org.apache.spark.sql.SQLConf.AUTO_BROADCASTJOIN_THRESHOLD]], the planner would mark it as the
* ''build'' relation and mark the other relation as the ''stream'' side. The build table will be
* ''broadcasted'' to all of the executors involved in the join, as a
* [[org.apache.spark.broadcast.Broadcast]] object. If both estimates exceed the threshold, they
* will instead be used to decide the build side in a [[joins.ShuffledHashJoin]].
* Join implementations are chosen with the following precedence:
*
* - Broadcast: if one side of the join has an estimated physical size that is smaller than the
* user-configurable [[org.apache.spark.sql.SQLConf.AUTO_BROADCASTJOIN_THRESHOLD]] threshold
* or if that side has an explicit broadcast hint (e.g. the user applied the
* [[org.apache.spark.sql.functions.broadcast()]] function to a DataFrame), then that side
* of the join will be broadcasted and the other side will be streamed, with no shuffling
* performed. If both sides of the join are eligible to be broadcasted then the
* - Sort merge: if the matching join keys are sortable and
* [[org.apache.spark.sql.SQLConf.SORTMERGE_JOIN]] is enabled (default), then sort merge join
* will be used.
* - Hash: will be chosen if neither of the above optimizations apply to this join.
*/
object HashJoin extends Strategy with PredicateHelper {
object EquiJoinSelection extends Strategy with PredicateHelper {

private[this] def makeBroadcastHashJoin(
leftKeys: Seq[Expression],
Expand All @@ -90,14 +94,15 @@ private[sql] abstract class SparkStrategies extends QueryPlanner[SparkPlan] {
}

def apply(plan: LogicalPlan): Seq[SparkPlan] = plan match {

// --- Inner joins --------------------------------------------------------------------------

case ExtractEquiJoinKeys(Inner, leftKeys, rightKeys, condition, left, CanBroadcast(right)) =>
makeBroadcastHashJoin(leftKeys, rightKeys, left, right, condition, joins.BuildRight)

case ExtractEquiJoinKeys(Inner, leftKeys, rightKeys, condition, CanBroadcast(left), right) =>
makeBroadcastHashJoin(leftKeys, rightKeys, left, right, condition, joins.BuildLeft)

// If the sort merge join option is set, we want to use sort merge join prior to hashjoin
// for now let's support inner join first, then add outer join
case ExtractEquiJoinKeys(Inner, leftKeys, rightKeys, condition, left, right)
if sqlContext.conf.sortMergeJoinEnabled && RowOrdering.isOrderable(leftKeys) =>
val mergeJoin =
Expand All @@ -115,6 +120,8 @@ private[sql] abstract class SparkStrategies extends QueryPlanner[SparkPlan] {
leftKeys, rightKeys, buildSide, planLater(left), planLater(right))
condition.map(Filter(_, hashJoin)).getOrElse(hashJoin) :: Nil

// --- Outer joins --------------------------------------------------------------------------

case ExtractEquiJoinKeys(
LeftOuter, leftKeys, rightKeys, condition, left, CanBroadcast(right)) =>
joins.BroadcastHashOuterJoin(
Expand All @@ -125,10 +132,22 @@ private[sql] abstract class SparkStrategies extends QueryPlanner[SparkPlan] {
joins.BroadcastHashOuterJoin(
leftKeys, rightKeys, RightOuter, condition, planLater(left), planLater(right)) :: Nil

case ExtractEquiJoinKeys(LeftOuter, leftKeys, rightKeys, condition, left, right)
if sqlContext.conf.sortMergeJoinEnabled && RowOrdering.isOrderable(leftKeys) =>
joins.SortMergeOuterJoin(
leftKeys, rightKeys, LeftOuter, condition, planLater(left), planLater(right)) :: Nil

case ExtractEquiJoinKeys(RightOuter, leftKeys, rightKeys, condition, left, right)
if sqlContext.conf.sortMergeJoinEnabled && RowOrdering.isOrderable(leftKeys) =>
joins.SortMergeOuterJoin(
leftKeys, rightKeys, RightOuter, condition, planLater(left), planLater(right)) :: Nil

case ExtractEquiJoinKeys(joinType, leftKeys, rightKeys, condition, left, right) =>
joins.ShuffledHashOuterJoin(
leftKeys, rightKeys, joinType, condition, planLater(left), planLater(right)) :: Nil

// --- Cases where this strategy does not apply ---------------------------------------------

case _ => Nil
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,8 +65,9 @@ case class BroadcastNestedLoopJoin(
left.output.map(_.withNullability(true)) ++ right.output
case FullOuter =>
left.output.map(_.withNullability(true)) ++ right.output.map(_.withNullability(true))
case _ =>
left.output ++ right.output
case x =>
throw new IllegalArgumentException(
s"BroadcastNestedLoopJoin should not take $x as the JoinType")
}
}

Expand Down
Loading