Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-25401] [SQL] Reorder join predicates to match child outputOrdering #23267

Closed

Conversation

davidvrba
Copy link
Contributor

What changes were proposed in this pull request?

In case of SortMergeJoin if tables are bucketed with keys (k1, k2) and sorted with keys (k2, k1), EnsureRequirements will add unnecessary SortExec. In this PR the improvement is that we reorder join predicate keys once more - to align it with child outputOrdering.

How was this patch tested?

Adding a new test.

@dongjoon-hyun
Copy link
Member

ok to test

@SparkQA
Copy link

SparkQA commented Dec 10, 2018

Test build #99889 has finished for PR 23267 at commit 6022e77.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@davidvrba
Copy link
Contributor Author

cc @gatorsmile @cloud-fan @dongjoon-hyun can i ask for review please

@davidvrba
Copy link
Contributor Author

Could someone please take a look at this PR? @gatorsmile @cloud-fan @dongjoon-hyun @holdenk

@mn-mikke
Copy link
Contributor

cc @maropu @mgaido91

Copy link
Contributor

@mgaido91 mgaido91 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am not sure about this change. If we change the keys ordering for matching the sort order, that order doesn't match anymore the one for partitionings. This may introduce some problems.

I am leaving anyway some comments on the code, in case my general concern above is answered...

@@ -293,6 +314,21 @@ case class EnsureRequirements(conf: SQLConf) extends Rule[SparkPlan] {
}
}

private def reorderJoinPredicatesForOrdering(plan: SparkPlan): SparkPlan = {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we can avoid this and include this transformation in the former reorderJoinPredicates method, after the reorder for partitionings. I'd rather have a reorderJoinKeysForOrderings called there or something similar.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am not sure this would work. The point here was to first reorder join predicates for partitioning, then check for the child outputPartitioning (which happens in the method ensureDistributionAndOrdering) and decide if we need Exchange or not and AFTER that reorder the join predicates again to satisfy the child outputOrdering to avoid Exchange.

rightOrders.length == leftKeys.length &&
leftKeys.forall { x =>
(rightOrders.map(_.asInstanceOf[Expression])).exists(_.semanticEquals(x))} =>
reorder(leftKeys, rightKeys, rightOrders.map(_.asInstanceOf[Expression]), leftKeys)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

reorder(leftKeys, rightKeys, rightOrders.map(_.asInstanceOf[Expression]), rigthKeys)

and please add a UT which fails before correcting this and passes after.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should this UT test the reorderJoinKeys function? Or do you have something else in mind?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I meant a test like the one you added. But please, first do prove that the current solution is fine (since I doubt so, see #23267 (comment)). Once we ensure that the current change is safe, you can go ahead addressing these comments. Thanks.

@davidvrba
Copy link
Contributor Author

@mgaido91 Thank you very much for your comments. We are changing the key order at the end of the method ensureDistributionAndOrdering and at that moment the child outputPartitioning is already checked and therefore it is ok to change the order of the join keys - it is not going to add unnecessary Exchange for the mismatch.

@mgaido91
Copy link
Contributor

@davidvrba my point is: is it safe to do so? I mean, are we changing the plan 2 times potentially now: the first time we reorder the keys in order to accomplish with the partitioning, the second for the orderings.

So with the second change we are basically "undoing" part of the previous change, which consists of:

  1. changing the order of the keys so that they match the partitioning one;
  2. adding/not adding the Exchange according to the modified plan.

Hence I am not sure this change you are introducing is fine in general. My understanding is that the change here is not safe in all conditions, in particular in the case when 2 re-orderings occur. If you can show that all the possible cases are safe, then it is fine, but my feeling is that it is not.
Hope this more verbose comment explains more clearly what I meant. Thanks.

@@ -276,7 +297,7 @@ case class EnsureRequirements(conf: SQLConf) extends Rule[SparkPlan] {
* introduced). This rule will change the ordering of the join keys to match with the
* partitioning of the join nodes' children.
*/
private def reorderJoinPredicates(plan: SparkPlan): SparkPlan = {
Copy link
Member

@maropu maropu Jan 29, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In the historical reason (#19257 (comment)), this method was added as a workaround. So, I feel it is compliated to extend this method for this case... basically, IMO we need a general logic here to cover this case and more. cc: @cloud-fan

@AmplabJenkins
Copy link

Can one of the admins verify this patch?

@HyukjinKwon
Copy link
Member

Closing this due to author's inactivity.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
Development

Successfully merging this pull request may close these issues.

8 participants