-
Notifications
You must be signed in to change notification settings - Fork 28.5k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[SPARK-12503] [SQL] Pushing Limit Through Union All #10451
Conversation
Test build #48248 has finished for PR 10451 at commit
|
Limit(exp, left), | ||
Limit(exp, right) | ||
) | ||
) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
A bug exists here. Will fix it soon. Thanks!
Test build #48261 has finished for PR 10451 at commit
|
CombineLimits(Limit(exp, left)), | ||
CombineLimits(Limit(exp, right)) | ||
) | ||
) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
we need a stop condition, or it will keep pushing Limit
forever
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thank you for your review! Since we call CombineLimits
here, we will not add extra Limit
in the subsequent iteration. Thus, I think it will cause the plan change. Thus, it will stop automatically, right?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
After think it more, there may be a problem: If left
or right
is an operator that can push Limit
down(currently there is no such operator, but we can't guarantee there won't be). Then every time you push down a Limit
here, it will be pushed down further. Thus the CombineLimits
can NOT detect that you have already pushed the Limit
down, and keeps generating new Limit
and pushing it down.
I think we should have a better way to detect whether we have pushed Limit
down or not, or add some comments to say that this rule assumes the newly added Limit
on top of left
and right
won't be removed by other optimization rules.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You are right. Limit
might not converge to the same position after multiple pushdown.
Let me think about it. Thank you!
Test build #48299 has finished for PR 10451 at commit
|
Test build #48400 has finished for PR 10451 at commit
|
Thanks for working on this. I think its getting pretty close. A few minor cleanups that might be nice:
|
// push-down rule that is unable to infer the value of maxRows. Any operator that a Limit can | ||
// be pushed passed should override this function. | ||
case Limit(exp, Union(left, right)) | ||
if left.maxRows.isEmpty || right.maxRows.isEmpty => |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is there a reason to not check left and right separately?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Below is the example. If one side has a limit child/descendant, we still can push it down to reduce the number of returned rows.
* Any operator that a Limit can be pushed passed should override the maxRows function. | ||
* | ||
* Note: This rule has to be done when the logical plan is stable; | ||
* Otherwise, it could impact the other rules. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm not sure what this means?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If we push Limit
through Filter
, Aggregate
and Distinct
, the results will be wrong. For example, df.aggregate().limit(1)
and df.limit(1).aggregate()
will generate the different results.
This statement is true iff we can push Limit
through some operators. So far, we did not find any eligible operators except outer/left-outer/right-outer Join
and Union
. Thus, let me revert them back. Thanks!
After rethinking the Feel free to let me know if the codes need an update. Thank you! |
// safe to pushdown Limit through it. Once we add UNION DISTINCT, we will not be able to | ||
// pushdown Limit. | ||
case Limit(exp, Union(left, right)) | ||
if left.maxRows.isEmpty || right.maxRows.isEmpty => |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Okay, but why not break this into two parts. So that we push to the left when the left is not limited and we push to the right when the right is not limited. Now you push to both sides if either is not limited.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah, you are right. : )
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
should we also check the limit value? If the maxRows
is larger than the limit we wanna push down, seems it still makes sense to push it down?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah, that also makes sense. Will do the change after these three running test cases. : )
Test build #48435 has finished for PR 10451 at commit
|
Test build #48431 has finished for PR 10451 at commit
|
Test build #48433 has finished for PR 10451 at commit
|
The latest version covers both cases:
Hopefully, you like the latest implementation. : ) @marmbrus @cloud-fan |
Test build #48461 has finished for PR 10451 at commit
|
Test build #48465 has finished for PR 10451 at commit
|
Let's close the limit push down pull requests. We will need to design this more properly because it is expensive to push down large limits. |
Sure, let me close it. |
This patch adds a new optimizer rule for performing limit pushdown. Limits will now be pushed down in two cases: - If a limit is on top of a `UNION ALL` operator, then a partition-local limit operator will be pushed to each of the union operator's children. - If a limit is on top of an `OUTER JOIN` then a partition-local limit will be pushed to one side of the join. For `LEFT OUTER` and `RIGHT OUTER` joins, the limit will be pushed to the left and right side, respectively. For `FULL OUTER` join, we will only push limits when at most one of the inputs is already limited: if one input is limited we will push a smaller limit on top of it and if neither input is limited then we will limit the input which is estimated to be larger. These optimizations were proposed previously by gatorsmile in #10451 and #10454, but those earlier PRs were closed and deferred for later because at that time Spark's physical `Limit` operator would trigger a full shuffle to perform global limits so there was a chance that pushdowns could actually harm performance by causing additional shuffles/stages. In #7334, we split the `Limit` operator into separate `LocalLimit` and `GlobalLimit` operators, so we can now push down only local limits (which don't require extra shuffles). This patch is based on both of gatorsmile's patches, with changes and simplifications due to partition-local-limiting. When we push down the limit, we still keep the original limit in place, so we need a mechanism to ensure that the optimizer rule doesn't keep pattern-matching once the limit has been pushed down. In order to handle this, this patch adds a `maxRows` method to `SparkPlan` which returns the maximum number of rows that the plan can compute, then defines the pushdown rules to only push limits to children if the children's maxRows are greater than the limit's maxRows. This idea is carried over from #10451; see that patch for additional discussion. Author: Josh Rosen <[email protected]> Closes #11121 from JoshRosen/limit-pushdown-2.
"Rule that applies to a Limit on top of a Union. The original Limit won't go away after applying this rule, but additional Limit nodes will be created on top of each child of Union, so that these children produce less rows and Limit can be further optimized for children Relations."
– from https://issues.apache.org/jira/browse/CALCITE-832
Also, the same topic in Hive: https://issues.apache.org/jira/browse/HIVE-11775. This has been merged to HIVE.
This PR is for performance improvement. The idea is like predicate pushdown. It can reduce the number of rows processed by
Union All
.After the improvement, we can see the changes in the optimized plan: