-
Notifications
You must be signed in to change notification settings - Fork 28.5k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[SPARK-9729] [SPARK-9363] [SQL] Use sort merge join for left and right outer join #7904
Closed
Closed
Changes from all commits
Commits
Show all changes
62 commits
Select commit
Hold shift + click to select a range
be19a0f
[SPARK-9054] [SQL] Rename RowOrdering to InterpretedOrdering; use new…
JoshRosen 34b8e0c
Import ordering
JoshRosen e610655
Add comment RE: Ascending ordering
JoshRosen df88548
Squash @adrian-wang's changes.
adrian-wang 58edb2e
Remove old TODO
JoshRosen 9faa2ee
Use withSQLConf in JoinSuite
JoshRosen 8d83e15
Use explicit toScala conversions in ShuffledHashOuterJoin.
JoshRosen a471a6e
Revert changes to SortMergeJoin; add new SortMergeOuterJoin operator
JoshRosen cf8c042
Fix join operator selection for outer join:
JoshRosen a09d6e3
Rename HashOuterJoin to OuterJoin.
JoshRosen 58b2d1c
Clean up non-obvious side-effect in JoinedRow.with[Left|Right]
JoshRosen 07ef478
Style cleanup in flatMap; use curly braces instead of parens.
JoshRosen c3c7ed4
Move initialize() definition closer to usage.
JoshRosen 78714dd
Large refactoring of SMJ internals to improve clarity.
JoshRosen 124f4ba
Remove unnecessary row copying.
JoshRosen 8c50c30
Support SMJ for left outer join.
JoshRosen 8dade55
Also enable for right outer join.
JoshRosen 8e496b2
Fix scalastyle
JoshRosen 6587ef2
Rewrite OuterJoinSuite in preparation for adding more tests.
JoshRosen a8d1074
Merge branch 'SPARK-9054' into outer-join-smj
JoshRosen 4603081
Merge remote-tracking branch 'origin/master' into outer-join-smj
JoshRosen 681e879
Add tests for outer joins with both inputs empty
JoshRosen 3772505
Fix two minor bugs in SMJ (regression tests pending)
JoshRosen 716bdff
Merge remote-tracking branch 'origin/master' into outer-join-smj
JoshRosen 82632c8
Allow UnsafeRows to be processed in SortMergeJoin
JoshRosen 6e18bc3
Rename HashJoin to EquiJoinSelection
JoshRosen 289e91d
Remove unnecessary requiredChildDistribution from BroadcastHashOuterJoin
JoshRosen e3f6d71
Use ArrayBuffer instead of CompactBuffer
JoshRosen 075f372
Add missing row key null checks in BroadcastHashOuterJoin
JoshRosen df250c8
Update to reflect deferral of full outer join to followup patch
JoshRosen 6bbde8c
Merge remote-tracking branch 'origin/master' into outer-join-smj
JoshRosen bdf513c
Rename build to buffered
JoshRosen 1d8a48c
Update SortMergeJoin to output UnsafeRow in Unsafe mode
JoshRosen 82b7e45
Try to clean up confusingly dense one-liner
JoshRosen 4a4590f
Commment update
JoshRosen 93723e2
Experiment towards using efficient internal iterators.
JoshRosen 7712f7e
Merge remote-tracking branch 'origin/master' into outer-join-smj
JoshRosen 441b89a
Back out now-unnecessary changes to other OuterJoin operators.
JoshRosen d16b60a
Revert another unnecessary change.
JoshRosen 2c1253f
Add RowIterator.fromScala and use it to guarantee that copying is unn…
JoshRosen 2b68452
Override row format methods for SortMergeOuterJoin
JoshRosen db80faa
Merge remote-tracking branch 'origin/master' into outer-join-smj
JoshRosen d41ac51
Fix bug in advancing leftIdx/rightIdx.
JoshRosen 9f48a5c
Efficiency improvement in boundCondition.
JoshRosen 1813a45
For left and right outer joins, streamed rows should not have null jo…
JoshRosen f183307
Two minor comments on output ordering
JoshRosen f456086
Add note RE: non-nullability of streamed side's join keys.
JoshRosen 2e5eb2d
Fix loss of rows when removing RowIteratorToScala wrapper.
JoshRosen a7a24f5
Use RowIterator in SortMergeJoin as well
JoshRosen 7910e83
Add giant comment to RowIterator
JoshRosen fd439cb
Move RowIterator to execution package
JoshRosen 51ee4b2
Remove incorrect assertions; the non-join-key columns can be null
JoshRosen e23db3d
Experiment with removing copy
JoshRosen f701652
Fix incorrectly-placed null check.
JoshRosen 7d3cc5d
It turns out that the copy is unnecessary.
JoshRosen f83b412
Push null check into buffered iterator next().
JoshRosen 81956b0
Improve unit test coverage of join physical operators.
JoshRosen 899dce2
Expand test data to cover multiple buffered rows per group.
JoshRosen e79909e
Fix parallelism in join operator unit tests.
JoshRosen 5c34f75
Add regression test exposing bug with missing while loop
JoshRosen c188a21
Fix while loops while adding regression tests.
JoshRosen eabacca
comment updates
JoshRosen File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
93 changes: 93 additions & 0 deletions
93
sql/core/src/main/scala/org/apache/spark/sql/execution/RowIterator.scala
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,93 @@ | ||
/* | ||
* Licensed to the Apache Software Foundation (ASF) under one or more | ||
* contributor license agreements. See the NOTICE file distributed with | ||
* this work for additional information regarding copyright ownership. | ||
* The ASF licenses this file to You under the Apache License, Version 2.0 | ||
* (the "License"); you may not use this file except in compliance with | ||
* the License. You may obtain a copy of the License at | ||
* | ||
* http://www.apache.org/licenses/LICENSE-2.0 | ||
* | ||
* Unless required by applicable law or agreed to in writing, software | ||
* distributed under the License is distributed on an "AS IS" BASIS, | ||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
* See the License for the specific language governing permissions and | ||
* limitations under the License. | ||
*/ | ||
|
||
package org.apache.spark.sql.execution | ||
|
||
import java.util.NoSuchElementException | ||
|
||
import org.apache.spark.sql.catalyst.InternalRow | ||
|
||
/** | ||
* An internal iterator interface which presents a more restrictive API than | ||
* [[scala.collection.Iterator]]. | ||
* | ||
* One major departure from the Scala iterator API is the fusing of the `hasNext()` and `next()` | ||
* calls: Scala's iterator allows users to call `hasNext()` without immediately advancing the | ||
* iterator to consume the next row, whereas RowIterator combines these calls into a single | ||
* [[advanceNext()]] method. | ||
*/ | ||
private[sql] abstract class RowIterator { | ||
/** | ||
* Advance this iterator by a single row. Returns `false` if this iterator has no more rows | ||
* and `true` otherwise. If this returns `true`, then the new row can be retrieved by calling | ||
* [[getRow]]. | ||
*/ | ||
def advanceNext(): Boolean | ||
|
||
/** | ||
* Retrieve the row from this iterator. This method is idempotent. It is illegal to call this | ||
* method after [[advanceNext()]] has returned `false`. | ||
*/ | ||
def getRow: InternalRow | ||
|
||
/** | ||
* Convert this RowIterator into a [[scala.collection.Iterator]]. | ||
*/ | ||
def toScala: Iterator[InternalRow] = new RowIteratorToScala(this) | ||
} | ||
|
||
object RowIterator { | ||
def fromScala(scalaIter: Iterator[InternalRow]): RowIterator = { | ||
scalaIter match { | ||
case wrappedRowIter: RowIteratorToScala => wrappedRowIter.rowIter | ||
case _ => new RowIteratorFromScala(scalaIter) | ||
} | ||
} | ||
} | ||
|
||
private final class RowIteratorToScala(val rowIter: RowIterator) extends Iterator[InternalRow] { | ||
private [this] var hasNextWasCalled: Boolean = false | ||
private [this] var _hasNext: Boolean = false | ||
override def hasNext: Boolean = { | ||
// Idempotency: | ||
if (!hasNextWasCalled) { | ||
_hasNext = rowIter.advanceNext() | ||
hasNextWasCalled = true | ||
} | ||
_hasNext | ||
} | ||
override def next(): InternalRow = { | ||
if (!hasNext) throw new NoSuchElementException | ||
hasNextWasCalled = false | ||
rowIter.getRow | ||
} | ||
} | ||
|
||
private final class RowIteratorFromScala(scalaIter: Iterator[InternalRow]) extends RowIterator { | ||
private[this] var _next: InternalRow = null | ||
override def advanceNext(): Boolean = { | ||
if (scalaIter.hasNext) { | ||
_next = scalaIter.next() | ||
true | ||
} else { | ||
_next = null | ||
false | ||
} | ||
} | ||
override def getRow: InternalRow = _next | ||
override def toScala: Iterator[InternalRow] = scalaIter | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why do we need
RowIterator
?(we do not need to make any change for this in this PR)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We don't need it anymore, strictly speaking, although I think it's a nicer API to implement against.