-
Notifications
You must be signed in to change notification settings - Fork 28.5k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[SPARK-2205] [SQL] Avoid unnecessary exchange operators in multi-way joins #7773
[SPARK-2205] [SQL] Avoid unnecessary exchange operators in multi-way joins #7773
Conversation
…ning-improvements
Test build #38962 has finished for PR 7773 at commit
|
801b807
to
4a99204
Compare
case x => | ||
throw new IllegalArgumentException(s"HashOuterJoin should not take $x as the JoinType") | ||
} | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I will remove this change for now. Once we have the nullSafe concept, we can better describe how the result of this join operator is partitioned. For example, right now, it is not safe to say that the output of this operator is partitioned by the rightKeys
when we have a left outer join (because rows with null keys are not clustered).
Test build #38966 has finished for PR 7773 at commit
|
@@ -57,6 +57,8 @@ case class BroadcastHashOuterJoin( | |||
override def requiredChildDistribution: Seq[Distribution] = | |||
UnspecifiedDistribution :: UnspecifiedDistribution :: Nil | |||
|
|||
override def outputPartitioning: Partitioning = streamedPlan.outputPartitioning |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is a bug fix.
@@ -94,8 +95,12 @@ sealed trait Partitioning { | |||
*/ | |||
def compatibleWith(other: Partitioning): Boolean | |||
|
|||
/** Returns the expressions that are used to key the partitioning. */ | |||
def keyExpressions: Seq[Expression] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Seems keyExpressions
is not used at all and I do not remember when we added it. So, I am removing it.
Test build #38980 has finished for PR 7773 at commit
|
Test build #38989 has finished for PR 7773 at commit
|
|
||
test("PartitioningCollection") { | ||
// First, we disable broadcast join. | ||
val origThreshold = conf.autoBroadcastJoinThreshold |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This test could use the new SQLTestUtils withConf
and withTempTable
helper functions, I think.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'll make this change now.
Jenkins, retest this please. |
test("PartitioningCollection") { | ||
// First, we disable broadcast join. | ||
val origThreshold = conf.autoBroadcastJoinThreshold | ||
setConf(SQLConf.AUTO_BROADCASTJOIN_THRESHOLD, 0) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think that you have to set AUTO_BROADCASTJOIN_THRESHOLD
to -1
to disable broadcast, not 0
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actually, it looks like the implementation might be out of sync w.r.t. the docs for AUTO_BROADCASTJOIN_THRESHOLD
...
Test build #39080 has finished for PR 7773 at commit
|
Test build #39089 has finished for PR 7773 at commit
|
@@ -122,7 +127,10 @@ case object SinglePartition extends Partitioning { | |||
case _ => false | |||
} | |||
|
|||
override def keyExpressions: Seq[Expression] = Nil | |||
override def guarantees(other: Partitioning): Boolean = other match { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Shouldn't SinglePartition
technically guarantee any partitioning which produces a single partition, such as HashPartitioning
with a single partition? I guess that hash partitioning with one partition shouldn't ever occur, though.
Test build #39092 has finished for PR 7773 at commit
|
Let's wait and see if we can merge #7807 to remove |
… from Exchange While reviewing yhuai's patch for SPARK-2205 (#7773), I noticed that Exchange's `compatible` check may be incorrectly returning `false` in many cases. As far as I know, this is not actually a problem because the `compatible`, `meetsRequirements`, and `needsAnySort` checks are serving only as short-circuit performance optimizations that are not necessary for correctness. In order to reduce code complexity, I think that we should remove these checks and unconditionally rewrite the operator's children. This should be safe because we rewrite the tree in a single bottom-up pass. Author: Josh Rosen <[email protected]> Closes #7807 from JoshRosen/SPARK-9489 and squashes the following commits: 9d76ce9 [Josh Rosen] [SPARK-9489] Remove compatibleWith, meetsRequirements, and needsAnySort checks from Exchange
58b27eb
to
0c18da3
Compare
…ning-improvements
0c18da3
to
5c45924
Compare
Test build #39131 has finished for PR 7773 at commit
|
Jenkins, retest this please. |
* guarantees the same partitioning scheme described by `other`. | ||
*/ | ||
// TODO: Add an example once we have the `nullSafe` concept. | ||
def guarantees(other: Partitioning): Boolean |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do you think semanticEqual
is a better name? I think this method is basically doing a equality check. For example, if other
is a HashPartitioning('a :: Nil, 10)
and this
is a SinglePartition
. We probably do not want to return true because the parent of this operator can be a join and the sibling of this operator can be HashPartitioned.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think that we should only consider a name like semanticEqual
or semanticEquiv
if a.guarantees(b)
implies b.guarantees(a)
and vice-versa.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yeah, makes sense. Then, semanticEqual
is not a good name because once we have the concept of nullSafe
. This method will not have the commutative
property because nullSafe
hash partitioning can be treated as nullUnsafe
hash partitioning.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yep. Let's leave this for now.
Test build #39370 has finished for PR 7773 at commit
|
Jenkins, retest this please. |
Test build #39444 has finished for PR 7773 at commit
|
The failed test (HiveCompatibilitySuite's semijoin) is tracked by https://issues.apache.org/jira/browse/SPARK-9482. |
test this please |
@JoshRosen If you think changes in this PR are good, how about we merge it once it passes tests? |
Test build #39474 has finished for PR 7773 at commit
|
OK. I am merging it to master. |
Yeah, sorry for timing out here. Consider this a post-hoc LGTM. |
This PR adds
PartitioningCollection
, which is used to represent theoutputPartitioning
for SparkPlans with multiple children (e.g.ShuffledHashJoin
). So, aSparkPlan
can have multiple descriptions of its partitioning schemes. TakingShuffledHashJoin
as an example, it has two descriptions of its partitioning schemes, i.e.left.outputPartitioning
andright.outputPartitioning
. So when we have a query likeselect * from t1 join t2 on (t1.x = t2.x) join t3 on (t2.x = t3.x)
will only have three Exchange operators (when shuffled joins are needed) instead of four.The code in this PR was authored by @yhuai; I'm opening this PR to factor out this change from #7685, a larger pull request which contains two other optimizations.