-
Notifications
You must be signed in to change notification settings - Fork 28.5k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[SPARK-12616] [SQL] Making Logical Operator Union
Support Arbitrary Number of Children
#10577
Conversation
@rxin Could you check if this implementation is what you expects? Thanks! |
Maybe we should just remove the old Union and call the new one Union? |
Yeah, it will be better. Will do the change tonight. Thanks! |
+1, I'd prefer if there is only one operator that performs unions |
*/ | ||
object CombineUnions extends Rule[LogicalPlan] { | ||
private def collectUnionChildren(plan: LogicalPlan): Seq[LogicalPlan] = plan match { | ||
case Union(l, r) => collectUnionChildren(l) ++ collectUnionChildren(r) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
you should write this without using recursion to avoid stack overflow.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I see. Removing Union
introduces a lot of work, but almost done. Will submit a commit tomorrow. Thanks!
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Another option would just be to do this at construction time, that way we can avoid paying the cost in the analyzer. This would still limit the cases we could cache (i.e. we'd miss cached data unioned with other data), but that doesn't seem like a huge deal.
I'd leave this rule here either way.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
+1
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hi @marmbrus Could I ask you a question regarding your comment here? I don't understand
the following sentence. Could you give me an example? Thanks!
i.e. we'd miss cached data unioned with other data
Major changes in this commit:
Todo:
Thanks! |
Test build #48756 has finished for PR 10577 at commit
|
/** | ||
* A pattern that collects all adjacent unions and returns their children as a Seq. | ||
*/ | ||
object Unions { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm not sure I would get rid of this, just use it in your optimization rule.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sure, will reimplement it using this way.
You don't need to add any new APIs, just call the optimizer rule directly on any existing API that adds a |
Understood it. Thank you! Will not introduce new APIs. |
require(children.forall(_.output.length == children.head.output.length)) | ||
|
||
val castedTypes: Seq[Option[DataType]] = | ||
children.tail.foldLeft(children.head.output.map(a => Option(a.dataType))) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There is a bug in this function. Will fix it tonight. Thanks!
Union
Support Arbitrary Number of Children
Test build #49634 has finished for PR 10577 at commit
|
Test build #49644 has finished for PR 10577 at commit
|
case (e, _) => e | ||
} | ||
Project(casted, plan) | ||
if (casted.exists(_.isInstanceOf[Alias])) Project(casted, plan) else plan |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
no need to do this optimization, the Optimizer
is smart enough to remove this unnecessary Project
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sure, let me remove it. : )
LGTM except one minor comment |
Test build #49722 has finished for PR 10577 at commit
|
The test failure was caused by a PR, which has been reverted. |
retest this please. |
Test build #49760 has finished for PR 10577 at commit
|
The latest merge is for resolving the conflicts. |
Test build #49792 has finished for PR 10577 at commit
|
LGTM |
Thanks - I'm going to merge this. |
Really appreciate your reviews!!! : ) |
Hi @gatorsmile could you please kindly explain the comment for union method?
it would be really helpful to give me an example. |
The cache key is the logical plan. If |
@cloud-fan thanks for your quick response. I have a simple test, in which a is cached and b is not and here is the output of explain method:
It seems the cached a is used. Please bear my ignorance and correct me what's wrong here. |
it works because |
Thanks @cloud-fan !! |
The existing
Union
logical operator only supports two children. Thus, adding a new logical operatorUnions
which can have arbitrary number of children to replace the existing one.Union
logical plan is a binary node. However, a typical use case for union is to union a very large number of input sources (DataFrames, RDDs, or files). It is not uncommon to union hundreds of thousands of files. In this case, our optimizer can become very slow due to the large number of logical unions. We should change the Union logical plan to support an arbitrary number of children, and add a single rule in the optimizer to collapse all adjacentUnions
into a singleUnions
. Note that this problem doesn't exist in physical plan, because the physicalUnions
already supports arbitrary number of children.