Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-22042] [SQL] ReorderJoinPredicates can break when child's partitioning is not decided #19257

Closed

Conversation

tejasapatil
Copy link
Contributor

What changes were proposed in this pull request?

See jira description for the bug : https://issues.apache.org/jira/browse/SPARK-22042

Fix done in this PR is: In EnsureRequirements, apply ReorderJoinPredicates over the input tree before doing its core logic. Since the tree is transformed bottom-up, we can assure that the children are resolved before doing ReorderJoinPredicates.

Theoretically this will guarantee to cover all such cases while keeping the code simple. My small grudge is for cosmetic reasons. This PR will look weird given that we don't call rules from other rules (not to my knowledge). I could have moved all the logic for ReorderJoinPredicates into EnsureRequirements but that will make it a but crowded. I am happy to discuss if there are better options.

How was this patch tested?

Added a new test case

@tejasapatil
Copy link
Contributor Author

Jenkins test this please

@SparkQA
Copy link

SparkQA commented Sep 17, 2017

Test build #81847 has finished for PR 19257 at commit 6ff4ed0.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@tejasapatil
Copy link
Contributor Author

cc @cloud-fan @gatorsmile @sameeragarwal for review

@cloud-fan
Copy link
Contributor

Maybe we need to rethink about the planning phase for adding shuffles. How about we add a placeholder for shuffle node and then replace the placeholder with actual shuffle node in EnsureRequirements? Then we can make sure the plan tree is always resolved.

@tejasapatil
Copy link
Contributor Author

By "placeholder shuffle nodes" you mean dummy ones ? We need to know the exact partitioning of the children which dummy nodes won't give (maybe I didn't get what you meant). My fear is that EnsureRequirements might choose to make some decisions about shuffle which would affect the results of ReorderJoinPredicates rule.

I agree that we need to rethink about the planning phase for adding shuffles.

@cloud-fan
Copy link
Contributor

cloud-fan commented Sep 21, 2017

We need to know the exact partitioning of the children which dummy nodes won't give

We only add the dummy shuffle node when it's necessary, e.g.

         hash-join
          /      \
    child1   child2

Let's say hash-join needs children to be clustered by a, b, and child1 is already partitioned by a, and child2 has no partitioning. After adding the dummy nodes:

         hash-join
          /      \
         /     dummy-shuffle
        /            |
    child1       child2

Now we still keep exact partitioning, i.e. left child is partitioned by a, right child is partitioned by a,b

@dongjoon-hyun
Copy link
Member

Hi, All.
Master branch still has this problem. Can we proceed this?

@tejasapatil
Copy link
Contributor Author

@dongjoon-hyun : It will take me time to get back to this. Having said that , its not ideal to have master is bad state. How about disabling the rule by default (using a config) ?

@tejasapatil
Copy link
Contributor Author

Or we could move forward with the current approach and defer the refactoring around how shuffles are added in planning phase.

@dongjoon-hyun
Copy link
Member

Thank you for your update, @tejasapatil .

@cloud-fan and @gatorsmile . Could you give us some direction?

@felixcheung
Copy link
Member

how is this coming? it will be good to fix this in 2.2?

@cloud-fan
Copy link
Contributor

@felixcheung Don't worry, the bug only exists in the master branch, so it won't block the 2.2.1 release. I have corrected the JIRA ticket's affected version to 2.3 . Also I'm looking into this issue

|) c
|JOIN table2
|ON c.i = table2.i
|""".stripMargin).explain()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

use checkAnswer instead of explain in the test

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

changed

@cloud-fan
Copy link
Contributor

After some more thoughts, I think the best choice is to do planning bottom up. That requires a lot of refactoring and I'm fine to merge this workaround first.

LGTM except one minor comment for the test.

@dongjoon-hyun
Copy link
Member

Thank you for the decision, @cloud-fan . It's great to see the progress on this!

@@ -31,6 +32,8 @@ import org.apache.spark.sql.internal.SQLConf
* input partition ordering requirements are met.
*/
case class EnsureRequirements(conf: SQLConf) extends Rule[SparkPlan] {
private val reorderJoinPredicates = new ReorderJoinPredicates
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Change class ReorderJoinPredicates to object ReorderJoinPredicates ?

@@ -265,6 +268,7 @@ case class EnsureRequirements(conf: SQLConf) extends Rule[SparkPlan] {
if (childPartitioning.guarantees(partitioning)) child else operator
case _ => operator
}
case operator: SparkPlan => ensureDistributionAndOrdering(operator)
case operator: SparkPlan =>
ensureDistributionAndOrdering(reorderJoinPredicates.apply(operator))
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Then, do something like

ensureDistributionAndOrdering(ReorderJoinPredicates(operator))

Copy link
Member

@gatorsmile gatorsmile Nov 22, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you add a comment to explain why we do it here? It is hard for new comers to understand the assumptions we made here.

Copy link
Contributor Author

@tejasapatil tejasapatil Nov 27, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It feels like having a rule invoked in such fashion is in-consistent compared to rest of the codebase .... from point of view of someone new to codebase, it will look odd. I removed the rule and instead moved these methods inside EnsureRequirements. Let me know how you feel about the changed version

@tejasapatil tejasapatil force-pushed the SPARK-22042_ReorderJoinPredicates branch from 6ff4ed0 to d9620ef Compare November 27, 2017 22:29
@SparkQA
Copy link

SparkQA commented Nov 28, 2017

Test build #84232 has finished for PR 19257 at commit d9620ef.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@ash211
Copy link
Contributor

ash211 commented Nov 29, 2017

@cloud-fan @gatorsmile any more changes needed on this PR before merging? I don't see any un-addressed comments left.

@dongjoon-hyun
Copy link
Member

Gentle ping~

@gatorsmile
Copy link
Member

retest this please

@SparkQA
Copy link

SparkQA commented Dec 13, 2017

Test build #84806 has finished for PR 19257 at commit d9620ef.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

* partitioning of the join nodes' children.
*/
def reorderJoinPredicates(plan: SparkPlan): SparkPlan = {
def reorderJoinKeys(
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We do not prefer the embedded function.

* introduced). This rule will change the ordering of the join keys to match with the
* partitioning of the join nodes' children.
*/
def reorderJoinPredicates(plan: SparkPlan): SparkPlan = {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

private

rightPartitioning: Partitioning): (Seq[Expression], Seq[Expression]) = {

def reorder(expectedOrderOfKeys: Seq[Expression],
currentOrderOfKeys: Seq[Expression]): (Seq[Expression], Seq[Expression]) = {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

indents.

|ON a.i = b.i
|JOIN table2 c
|ON a.i = c.i
|""".stripMargin))
Copy link
Member

@gatorsmile gatorsmile Dec 13, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please follow the other test cases.

"""
  | xyz
"""

@gatorsmile
Copy link
Member

LGTM except a few style comments. We can merge it and fix it in the follow-up PR. Thanks!

@gatorsmile
Copy link
Member

Thanks! Merged to master.

@tejasapatil
Copy link
Contributor Author

Created #20041 for addressing the follow-up comments by @gatorsmile

@tejasapatil tejasapatil deleted the SPARK-22042_ReorderJoinPredicates branch December 21, 2017 00:42
asfgit pushed a commit that referenced this pull request Dec 21, 2017
…ild's partitioning is not decided

## What changes were proposed in this pull request?

This is a followup PR of #19257 where gatorsmile had left couple comments wrt code style.

## How was this patch tested?

Doesn't change any functionality. Will depend on build to see if no checkstyle rules are violated.

Author: Tejas Patil <[email protected]>

Closes #20041 from tejasapatil/followup_19257.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

7 participants