Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-12616] [SQL] Making Logical Operator Union Support Arbitrary Number of Children #10577

Closed
wants to merge 58 commits into from

Conversation

gatorsmile
Copy link
Member

The existing Union logical operator only supports two children. Thus, adding a new logical operator Unions which can have arbitrary number of children to replace the existing one.

Union logical plan is a binary node. However, a typical use case for union is to union a very large number of input sources (DataFrames, RDDs, or files). It is not uncommon to union hundreds of thousands of files. In this case, our optimizer can become very slow due to the large number of logical unions. We should change the Union logical plan to support an arbitrary number of children, and add a single rule in the optimizer to collapse all adjacent Unions into a single Unions. Note that this problem doesn't exist in physical plan, because the physical Unions already supports arbitrary number of children.

@gatorsmile
Copy link
Member Author

@rxin Could you check if this implementation is what you expects? Thanks!

@rxin
Copy link
Contributor

rxin commented Jan 4, 2016

Maybe we should just remove the old Union and call the new one Union?

@gatorsmile
Copy link
Member Author

Yeah, it will be better. Will do the change tonight. Thanks!

@marmbrus
Copy link
Contributor

marmbrus commented Jan 4, 2016

+1, I'd prefer if there is only one operator that performs unions

*/
object CombineUnions extends Rule[LogicalPlan] {
private def collectUnionChildren(plan: LogicalPlan): Seq[LogicalPlan] = plan match {
case Union(l, r) => collectUnionChildren(l) ++ collectUnionChildren(r)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

you should write this without using recursion to avoid stack overflow.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see. Removing Union introduces a lot of work, but almost done. Will submit a commit tomorrow. Thanks!

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Another option would just be to do this at construction time, that way we can avoid paying the cost in the analyzer. This would still limit the cases we could cache (i.e. we'd miss cached data unioned with other data), but that doesn't seem like a huge deal.

I'd leave this rule here either way.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

To do this at construction time, we need to introduce a new Dataframe API unionAll that can combine more than two Dataframes? @marmbrus @rxin

Is my understanding correct? Thank you!

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hi @marmbrus Could I ask you a question regarding your comment here? I don't understand
the following sentence. Could you give me an example? Thanks!

i.e. we'd miss cached data unioned with other data

@gatorsmile
Copy link
Member Author

Major changes in this commit:

  • Remove the old binary logical operator node. Union.
  • Replace the previous recursive-based combineUnioins solution with a solution based on foldLeft.

Todo:

  • Will add the new Dataframe and Dataset APIs for unionAll, if my understanding is correct.
  • Will change the optimizer rule for pushing Filter and Project through Unions.
  • Will rename Unions to Union

Thanks!

@SparkQA
Copy link

SparkQA commented Jan 5, 2016

Test build #48756 has finished for PR 10577 at commit c1f66f7.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

/**
* A pattern that collects all adjacent unions and returns their children as a Seq.
*/
object Unions {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure I would get rid of this, just use it in your optimization rule.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure, will reimplement it using this way.

@marmbrus
Copy link
Contributor

marmbrus commented Jan 5, 2016

Will add the new Dataframe and Dataset APIs for unionAll, if my understanding is correct.

You don't need to add any new APIs, just call the optimizer rule directly on any existing API that adds a Union.

@gatorsmile
Copy link
Member Author

Understood it. Thank you! Will not introduce new APIs.

require(children.forall(_.output.length == children.head.output.length))

val castedTypes: Seq[Option[DataType]] =
children.tail.foldLeft(children.head.output.map(a => Option(a.dataType))) {
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There is a bug in this function. Will fix it tonight. Thanks!

@gatorsmile gatorsmile changed the title [SPARK-12616] [SQL] Adding a New Logical Operator Unions [SPARK-12616] [SQL] Making Logical Operator Union Support Arbitrary Number of Children Jan 6, 2016
@SparkQA
Copy link

SparkQA commented Jan 19, 2016

Test build #49634 has finished for PR 10577 at commit f112026.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
    • case class Union(children: Seq[LogicalPlan]) extends LogicalPlan

@SparkQA
Copy link

SparkQA commented Jan 19, 2016

Test build #49644 has finished for PR 10577 at commit 4f71741.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

case (e, _) => e
}
Project(casted, plan)
if (casted.exists(_.isInstanceOf[Alias])) Project(casted, plan) else plan
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

no need to do this optimization, the Optimizer is smart enough to remove this unnecessary Project

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure, let me remove it. : )

@cloud-fan
Copy link
Contributor

LGTM except one minor comment

@SparkQA
Copy link

SparkQA commented Jan 20, 2016

Test build #49722 has finished for PR 10577 at commit c63f237.

  • This patch fails PySpark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@gatorsmile
Copy link
Member Author

The test failure was caused by a PR, which has been reverted.

@gatorsmile
Copy link
Member Author

retest this please.

@SparkQA
Copy link

SparkQA commented Jan 20, 2016

Test build #49760 has finished for PR 10577 at commit c63f237.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@gatorsmile
Copy link
Member Author

The latest merge is for resolving the conflicts.

@SparkQA
Copy link

SparkQA commented Jan 20, 2016

Test build #49792 has finished for PR 10577 at commit c18381e.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
    • case class Union(children: Seq[LogicalPlan]) extends LogicalPlan

@gatorsmile
Copy link
Member Author

@rxin @marmbrus , could you please review the latest changes? Thank you!

@cloud-fan
Copy link
Contributor

LGTM

@rxin
Copy link
Contributor

rxin commented Jan 20, 2016

Thanks - I'm going to merge this.

@asfgit asfgit closed this in 8f90c15 Jan 20, 2016
@gatorsmile
Copy link
Member Author

Really appreciate your reviews!!! : )

@Huang-yi-3456
Copy link
Contributor

Hi @gatorsmile could you please kindly explain the comment for union method?
def union(other: Dataset[T]): Dataset[T] = withSetOperator {
// This breaks caching, but it's usually ok because it addresses a very specific use case:
// using union to union many files or partitions.
CombineUnions(Union(logicalPlan, other.logicalPlan)).mapChildren(AnalysisBarrier)
}
What does it mean?

This breaks caching

it would be really helpful to give me an example.
thanks very much in advance.

@cloud-fan
Copy link
Contributor

The cache key is the logical plan. If a is cached, ideally a.union(b) should leverage the cache of a, but we can't as a tradeoff to make the logical plan simple.

@Huang-yi-3456
Copy link
Contributor

Huang-yi-3456 commented Apr 8, 2020

@cloud-fan thanks for your quick response. I have a simple test, in which a is cached and b is not and here is the output of explain method:

== Parsed Logical Plan ==
Union
:- AnalysisBarrier
: +- LogicalRDD [number#2, word#3], false
+- AnalysisBarrier
+- LogicalRDD [number#8, word#9], false

== Analyzed Logical Plan ==
number: int, word: string
Union
:- LogicalRDD [number#2, word#3], false
+- LogicalRDD [number#8, word#9], false

== Optimized Logical Plan ==
Union
:- InMemoryRelation [number#2, word#3], true, 10000, StorageLevel(disk, memory, deserialized, 1 replicas)
: +- Scan ExistingRDD[number#2,word#3]
+- LogicalRDD [number#8, word#9], false

== Physical Plan ==
Union
:- InMemoryTableScan [number#2, word#3]
: +- InMemoryRelation [number#2, word#3], true, 10000, StorageLevel(disk, memory, deserialized, 1 replicas)
: +- Scan ExistingRDD[number#2,word#3]
+- Scan ExistingRDD[number#8,word#9]

It seems the cached a is used. Please bear my ignorance and correct me what's wrong here.
BTW, the spark i use is 2.3.0.
Thanks.

@cloud-fan
Copy link
Contributor

it works because a is not a union so this change is a noop to it. you can cache a.union(b) and then run a.union(b).union(c).

@Huang-yi-3456
Copy link
Contributor

Thanks @cloud-fan !!

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

9 participants