-
Notifications
You must be signed in to change notification settings - Fork 28.5k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[SPARK-12503][SPARK-12505] Limit pushdown in UNION ALL and OUTER JOIN #11121
Conversation
Test build #50940 has finished for PR 11121 at commit
|
case RightOuter => join.copy(right = maybePushLimit(exp, right)) | ||
case LeftOuter => join.copy(left = maybePushLimit(exp, left)) | ||
case FullOuter => | ||
join.copy(left = maybePushLimit(exp, left), right = maybePushLimit(exp, right)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is not right. Please check the original PR. @yhuai and I had a discussion about this issue.
Since the full outer join will remove the duplicates, we are unable to add the extra limit to both sides. As long as we can ensure the completeness of one Child, the generated results will be still correct like the left/right outer join.
Thanks!
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'll fix this up now and will add a brief comment summarizing this.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Great! Please also update the description of this PR. Thanks!
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I have one concern about the rule as implemented in your PR:
If we have a full outer join which initially has neither of its children limited and then we push a limit to the side with larger statistics, then a second firing of the LimitPushDown
rule would match on one of the cases where only a single side is limited and would push a limit to the other side, leading to the wrong answer because we would have limited both sides.
Therefore, I think we might want to restrict this rule to only fire in the case where a neither side of the full outer join has a pre-existing limit.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Also, I wonder whether we should check whether maxRows
is defined rather than checking whether the outer join's children are Limit
s, since that frees us from having to reason about whether the limit could be further pushed. On the other hand, if we always leave the original LocalLimit
in place then I don't think we currently need to worry about the limit being further pushed down to a point where the child would no longer be a limit.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah, you are right. If one side has a pre-existing limit, we just need to add it in that side. Of course, two adjacent limits can be combined.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah, maxRows
was added for this purpose. This original idea is from @marmbrus .
…ds to be idempotent.
Alright, updated to address the |
val optimized = Optimize.execute(originalQuery.analyze) | ||
val correctAnswer = Limit(1, x.join(LocalLimit(1, yBig), FullOuter)).analyze | ||
comparePlans(optimized, correctAnswer) | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'll also add tests for the cases where both inputs are limited.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I've added more tests now.
Test build #50947 has finished for PR 11121 at commit
|
Note: when we merge this, we should remove the triggering of the rule from the optimizer, and only add it back once we have whole-stage codegen for Limit. |
Test build #51045 has finished for PR 11121 at commit
|
Test build #51052 has finished for PR 11121 at commit
|
I've updated this to disable the optimizer rule for now (it's still tested in the LimitPushdownSuite, though). |
cc @cloud-fan for review |
* Any operator that a Limit can be pushed passed should override this function (e.g., Union). | ||
* Any operator that can push through a Limit should override this function (e.g., Project). | ||
*/ | ||
def maxRows: Option[Expression] = None |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
are we going to handle non-literal maxRow
in the future? If not, maybe define it as Option[Long]
is simpler and better?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I experimented with this but ran into problems because the argument of LIMIT
can be an expression.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
how about returning None
if the argument of Limit
is non-literal expression?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
+1
It feels to me that this is only useful when we know the value, not if it is some subquery.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
How does this interact with the constant-folding rules? Will expressions be maximally constant-folded before this is invoked? Just trying to reason about whether there are any ordering issues here.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It's a def
not a lazy val
, I think it's fine with constant-folding rules.
BTW using Option[Expression]
may be sub-optimal for something like Union
, as its maxRow: Some(children.flatMap(_.maxRows).reduce { (a, b) => Add(a, b) })
can't be constant-folded(maxRow
is a method) and will always be a non-literal expression, we can't push limit through it.
LGTM except one comment |
I've updated this patch to change |
Jenkins, retest this please. |
Test build #51227 has finished for PR 11121 at commit
|
#11171 did a few changes in conversion from logical plan to SQL. I believe this is the cause of build failure. |
Test build #51234 has finished for PR 11121 at commit
|
I've updated this to fix compilation after that SQLBuilder change. One quick question: do we have to worry about the logical plan -> SQL conversion being applied to optimized plans? If so, things might get tricky because we now have separate GlobalLimit and LocalLimit logical plan nodes. Because of the |
IMO, in the future, if we need to convert the optimized plans to SQL, we need to add a few rules in SQLBuilder to revert back the changes of some optimization rules. Otherwise, the Parser is unable to parse the generated SQL. I already hit a couple of issues caused by Analyzer rules. Also cc @liancheng |
We could consider adding a partition local limit or some hint at some point in the parser. cc @hvanhovell |
To confirm: is merging this patch blocking on anything or can concerns related to converting optimized plans to SQL be addressed in a followup patch? Would you like me to add explicit test cases for plan -> SQL generation for a variety of queries involving limits? |
I think this is good to go, we can defer the "converting optimized plans to SQL" in a followup patch. |
Yea - I don't think we ever turn optimized plan into sql right now. |
@cloud-fan I agree. For SQL generation, currently we can only focus on resolved plans parsed from HiveQL. @gatorsmile I think after finishing that part, we may gain better knowledge about how to handle arbitrary resolved logical plans. How do you think? |
I'm going to merge this in master. |
@liancheng I agree. : ) |
This patch adds a new optimizer rule for performing limit pushdown. Limits will now be pushed down in two cases:
UNION ALL
operator, then a partition-local limit operator will be pushed to each of the union operator's children.OUTER JOIN
then a partition-local limit will be pushed to one side of the join. ForLEFT OUTER
andRIGHT OUTER
joins, the limit will be pushed to the left and right side, respectively. ForFULL OUTER
join, we will only push limits when at most one of the inputs is already limited: if one input is limited we will push a smaller limit on top of it and if neither input is limited then we will limit the input which is estimated to be larger.These optimizations were proposed previously by @gatorsmile in #10451 and #10454, but those earlier PRs were closed and deferred for later because at that time Spark's physical
Limit
operator would trigger a full shuffle to perform global limits so there was a chance that pushdowns could actually harm performance by causing additional shuffles/stages. In #7334, we split theLimit
operator into separateLocalLimit
andGlobalLimit
operators, so we can now push down only local limits (which don't require extra shuffles). This patch is based on both of @gatorsmile's patches, with changes and simplifications due to partition-local-limiting.When we push down the limit, we still keep the original limit in place, so we need a mechanism to ensure that the optimizer rule doesn't keep pattern-matching once the limit has been pushed down. In order to handle this, this patch adds a
maxRows
method toSparkPlan
which returns the maximum number of rows that the plan can compute, then defines the pushdown rules to only push limits to children if the children's maxRows are greater than the limit's maxRows. This idea is carried over from #10451; see that patch for additional discussion.