-
Notifications
You must be signed in to change notification settings - Fork 28.5k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[SPARK-12505] [SQL] Pushdown a Limit on top of an Outer-Join #10454
Conversation
case _ => f // DO Nothing for the other join types | ||
} | ||
} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is it possible that we add an extra limit in every iteration ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You are right, but, after this rule, there exists another rule CombineLimits
, which can combine the extra limits. I think we should still fix it anyway. Thanks!
Can you add an example in the description to illustrate this optimization (and the rational of it)? |
Test build #48252 has finished for PR 10454 at commit
|
object PushLimitThroughOuterJoin extends Rule[LogicalPlan] with PredicateHelper { | ||
|
||
def apply(plan: LogicalPlan): LogicalPlan = plan transform { | ||
case f @ Limit(expr, Join(left, right, joinType, joinCondition)) => |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We also can push it down through the Join
's Projection
.
Test build #48263 has finished for PR 10454 at commit
|
case f @ Limit(expr, Join(left, right, joinType, joinCondition)) => | ||
joinType match { | ||
case RightOuter => | ||
Limit(expr, Join(left, CombineLimits(Limit(expr, right)), joinType, joinCondition)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
need a stop condition to stop pushing Limit
if it's already pushed.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thank you for your review! Since we call CombineLimits
here to combine two consecutive Limit
s, we will not add extra Limit
in the subsequent iteration. Thus, no change will be made in the plan. Thus, RuleExecutor
will stop automatically. Am I right?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ah, it's right, nvm
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thank you!
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
That is right. However I think checking if it is already pushed would reduce unnecessary multiple applying this rule and CombineLimits
. But the result should be the same. So it is minor.
Test build #48298 has finished for PR 10454 at commit
|
The new fix contains two parts:
Thank you for your reviews and comments! @yhuai @cloud-fan @viirya |
@gatorsmile let's close the limit push down pull requests. We will need to design this more properly because it is expensive to push down large limits. |
Sure, let me close it. |
This patch adds a new optimizer rule for performing limit pushdown. Limits will now be pushed down in two cases: - If a limit is on top of a `UNION ALL` operator, then a partition-local limit operator will be pushed to each of the union operator's children. - If a limit is on top of an `OUTER JOIN` then a partition-local limit will be pushed to one side of the join. For `LEFT OUTER` and `RIGHT OUTER` joins, the limit will be pushed to the left and right side, respectively. For `FULL OUTER` join, we will only push limits when at most one of the inputs is already limited: if one input is limited we will push a smaller limit on top of it and if neither input is limited then we will limit the input which is estimated to be larger. These optimizations were proposed previously by gatorsmile in #10451 and #10454, but those earlier PRs were closed and deferred for later because at that time Spark's physical `Limit` operator would trigger a full shuffle to perform global limits so there was a chance that pushdowns could actually harm performance by causing additional shuffles/stages. In #7334, we split the `Limit` operator into separate `LocalLimit` and `GlobalLimit` operators, so we can now push down only local limits (which don't require extra shuffles). This patch is based on both of gatorsmile's patches, with changes and simplifications due to partition-local-limiting. When we push down the limit, we still keep the original limit in place, so we need a mechanism to ensure that the optimizer rule doesn't keep pattern-matching once the limit has been pushed down. In order to handle this, this patch adds a `maxRows` method to `SparkPlan` which returns the maximum number of rows that the plan can compute, then defines the pushdown rules to only push limits to children if the children's maxRows are greater than the limit's maxRows. This idea is carried over from #10451; see that patch for additional discussion. Author: Josh Rosen <[email protected]> Closes #11121 from JoshRosen/limit-pushdown-2.
"Rule that applies to a Limit on top of an OUTER Join. The original Limit won't go away after applying this rule, but additional Limit node(s) will be created on top of the outer-side child (or children if it's a FULL OUTER Join). "
– from https://issues.apache.org/jira/browse/CALCITE-832
Also, the same topic in Hive: https://issues.apache.org/jira/browse/HIVE-11684 This has been merged to HIVE.
This PR is for performance improvement. The idea is like predicate pushdown. It can reduce the number of rows processed by
Outer Joins
. The performance improvement is significant when the number of rows in Limit is small but the cost ofJoin
is large.For example,
After the improvement, we can see the changes in the optimized plan: