-
Notifications
You must be signed in to change notification settings - Fork 28.5k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[SPARK-25401] [SQL] Reorder join predicates to match child outputOrdering #23267
[SPARK-25401] [SQL] Reorder join predicates to match child outputOrdering #23267
Conversation
ok to test |
Test build #99889 has finished for PR 23267 at commit
|
cc @gatorsmile @cloud-fan @dongjoon-hyun can i ask for review please |
Could someone please take a look at this PR? @gatorsmile @cloud-fan @dongjoon-hyun @holdenk |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I am not sure about this change. If we change the keys ordering for matching the sort order, that order doesn't match anymore the one for partitionings. This may introduce some problems.
I am leaving anyway some comments on the code, in case my general concern above is answered...
@@ -293,6 +314,21 @@ case class EnsureRequirements(conf: SQLConf) extends Rule[SparkPlan] { | |||
} | |||
} | |||
|
|||
private def reorderJoinPredicatesForOrdering(plan: SparkPlan): SparkPlan = { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think we can avoid this and include this transformation in the former reorderJoinPredicates
method, after the reorder for partitionings. I'd rather have a reorderJoinKeysForOrderings
called there or something similar.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I am not sure this would work. The point here was to first reorder join predicates for partitioning, then check for the child outputPartitioning (which happens in the method ensureDistributionAndOrdering) and decide if we need Exchange or not and AFTER that reorder the join predicates again to satisfy the child outputOrdering to avoid Exchange.
rightOrders.length == leftKeys.length && | ||
leftKeys.forall { x => | ||
(rightOrders.map(_.asInstanceOf[Expression])).exists(_.semanticEquals(x))} => | ||
reorder(leftKeys, rightKeys, rightOrders.map(_.asInstanceOf[Expression]), leftKeys) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
reorder(leftKeys, rightKeys, rightOrders.map(_.asInstanceOf[Expression]), rigthKeys)
and please add a UT which fails before correcting this and passes after.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should this UT test the reorderJoinKeys function? Or do you have something else in mind?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I meant a test like the one you added. But please, first do prove that the current solution is fine (since I doubt so, see #23267 (comment)). Once we ensure that the current change is safe, you can go ahead addressing these comments. Thanks.
@mgaido91 Thank you very much for your comments. We are changing the key order at the end of the method ensureDistributionAndOrdering and at that moment the child outputPartitioning is already checked and therefore it is ok to change the order of the join keys - it is not going to add unnecessary Exchange for the mismatch. |
@davidvrba my point is: is it safe to do so? I mean, are we changing the plan 2 times potentially now: the first time we reorder the keys in order to accomplish with the partitioning, the second for the orderings. So with the second change we are basically "undoing" part of the previous change, which consists of:
Hence I am not sure this change you are introducing is fine in general. My understanding is that the change here is not safe in all conditions, in particular in the case when 2 re-orderings occur. If you can show that all the possible cases are safe, then it is fine, but my feeling is that it is not. |
@@ -276,7 +297,7 @@ case class EnsureRequirements(conf: SQLConf) extends Rule[SparkPlan] { | |||
* introduced). This rule will change the ordering of the join keys to match with the | |||
* partitioning of the join nodes' children. | |||
*/ | |||
private def reorderJoinPredicates(plan: SparkPlan): SparkPlan = { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In the historical reason (#19257 (comment)), this method was added as a workaround. So, I feel it is compliated to extend this method for this case... basically, IMO we need a general logic here to cover this case and more. cc: @cloud-fan
Can one of the admins verify this patch? |
Closing this due to author's inactivity. |
What changes were proposed in this pull request?
In case of SortMergeJoin if tables are bucketed with keys (k1, k2) and sorted with keys (k2, k1), EnsureRequirements will add unnecessary SortExec. In this PR the improvement is that we reorder join predicate keys once more - to align it with child outputOrdering.
How was this patch tested?
Adding a new test.