Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-12616] [SQL] Making Logical Operator Union Support Arbitrary Number of Children #10577

Closed
wants to merge 58 commits into from
Closed
Show file tree
Hide file tree
Changes from 2 commits
Commits
Show all changes
58 commits
Select commit Hold shift + click to select a range
01e4cdf
Merge remote-tracking branch 'upstream/master'
gatorsmile Nov 13, 2015
6835704
Merge remote-tracking branch 'upstream/master'
gatorsmile Nov 14, 2015
9180687
Merge remote-tracking branch 'upstream/master'
gatorsmile Nov 14, 2015
b38a21e
SPARK-11633
gatorsmile Nov 17, 2015
d2b84af
Merge remote-tracking branch 'upstream/master' into joinMakeCopy
gatorsmile Nov 17, 2015
fda8025
Merge remote-tracking branch 'upstream/master'
gatorspark Nov 17, 2015
ac0dccd
Merge branch 'master' of https://github.com/gatorsmile/spark
gatorspark Nov 17, 2015
6e0018b
Merge remote-tracking branch 'upstream/master'
Nov 20, 2015
0546772
converge
gatorsmile Nov 20, 2015
b37a64f
converge
gatorsmile Nov 20, 2015
73270c8
added a new logical operator UNIONS
gatorsmile Jan 4, 2016
d9811c7
Merge remote-tracking branch 'upstream/master' into unionAllMultiChil…
gatorsmile Jan 4, 2016
5d031a7
remove the old operator union
gatorsmile Jan 4, 2016
c1f66f7
remove the old operator union #2.
gatorsmile Jan 5, 2016
c1dcd02
rename.
gatorsmile Jan 5, 2016
51ad5b2
address the comments.
gatorsmile Jan 6, 2016
c2a872c
Merge remote-tracking branch 'upstream/master'
gatorsmile Jan 6, 2016
5681ca8
Merge branch 'unionAllMultiChildren' into unionAllMC
gatorsmile Jan 6, 2016
7a54c8f
Change the optimizer rule for pushing Filter and Project through new …
gatorsmile Jan 6, 2016
95e2349
refactored WidenSetOperationTypes and added test cases
gatorsmile Jan 6, 2016
6a6003e
addressed comments.
gatorsmile Jan 6, 2016
15ec058
replace list by arrayBuffer in combineUnions
gatorsmile Jan 6, 2016
5e06647
address comments.
gatorsmile Jan 6, 2016
ab6dbd7
Merge remote-tracking branch 'upstream/master'
gatorsmile Jan 6, 2016
b821af0
Merge branch 'unionAllMC' into unionAllMCMerged
gatorsmile Jan 6, 2016
2229932
move changes in HiveQI.scala to CatalystQI.scala
gatorsmile Jan 6, 2016
b3327b1
add lazy.
gatorsmile Jan 6, 2016
4276356
Merge remote-tracking branch 'upstream/master'
gatorsmile Jan 6, 2016
2dab708
Merge remote-tracking branch 'upstream/master'
gatorsmile Jan 7, 2016
723c0da
resolve comments.
gatorsmile Jan 7, 2016
42b81a8
resolve comments.
gatorsmile Jan 8, 2016
4e0387f
Merge remote-tracking branch 'upstream/master' into unionAllMCMerged
gatorsmile Jan 8, 2016
b03d813
Merge remote-tracking branch 'upstream/master' into unionAllMCMerged
gatorsmile Jan 8, 2016
ab732c1
Remove the unneeded parm.
gatorsmile Jan 8, 2016
0458770
Merge remote-tracking branch 'upstream/master'
gatorsmile Jan 8, 2016
1debdfa
Merge remote-tracking branch 'upstream/master'
gatorsmile Jan 9, 2016
7320e21
Merge branch 'unionAllMCMerged' into unionAllMCMergedNew
gatorsmile Jan 9, 2016
741371a
Merge remote-tracking branch 'upstream/master' into unionAllMCMergedNew
gatorsmile Jan 9, 2016
031a5d8
changed the implementation of Union in sql generation
gatorsmile Jan 9, 2016
f3d23dc
fixed the implementation of Union in sql generation
gatorsmile Jan 9, 2016
a56e595
Merge remote-tracking branch 'upstream/master' into unionAllMCMergedNew
gatorsmile Jan 13, 2016
abfcf93
address comments.
gatorsmile Jan 14, 2016
763706d
Merge remote-tracking branch 'upstream/master'
gatorsmile Jan 14, 2016
e8e19a1
Merge branch 'unionAllMCMergedNew' into unionAllMCMergedNewNew
gatorsmile Jan 14, 2016
3b13ddf
address comments.
gatorsmile Jan 15, 2016
b88bdeb
added a comment.
gatorsmile Jan 15, 2016
3041864
address comments.
gatorsmile Jan 16, 2016
6259fd9
reimplement it based on the latest change.
gatorsmile Jan 18, 2016
4de6ec1
Merge remote-tracking branch 'upstream/master'
gatorsmile Jan 18, 2016
f112026
Merge branch 'unionAllMCMergedNewNew' into unionAllMCMergedNewNewNew
gatorsmile Jan 18, 2016
4f71741
address comments.
gatorsmile Jan 19, 2016
9422a4f
Merge remote-tracking branch 'upstream/master'
gatorsmile Jan 19, 2016
59b5895
Merge branch 'unionAllMCMergedNewNewNew' into unionAllMCMergedNewNewN…
gatorsmile Jan 19, 2016
c63f237
address comments.
gatorsmile Jan 19, 2016
2e8562d
Merge remote-tracking branch 'upstream/master' into unionAllMCMergedN…
gatorsmile Jan 20, 2016
a571998
Merge remote-tracking branch 'upstream/master' into unionAllMCMergedN…
gatorsmile Jan 20, 2016
52bdf48
Merge remote-tracking branch 'upstream/master'
gatorsmile Jan 20, 2016
c18381e
Merge branch 'unionAllMCMergedNewNewNewNew' into unionAllMCMergedNewN…
gatorsmile Jan 20, 2016
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,8 @@ abstract class Optimizer extends RuleExecutor[LogicalPlan] {
Batch("Aggregate", FixedPoint(100),
ReplaceDistinctWithAggregate,
RemoveLiteralFromGroupExpressions) ::
Batch("Unions", FixedPoint(100),
CombineUnions) ::
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

you should explain in comments why CombineUnions appear twice.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure, will do.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

and maybe move this before aggregate?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure.

Batch("Operator Optimizations", FixedPoint(100),
// Operator push down
SetOperationPushDown,
Expand All @@ -54,6 +56,7 @@ abstract class Optimizer extends RuleExecutor[LogicalPlan] {
ProjectCollapsing,
CombineFilters,
CombineLimits,
CombineUnions,
// Constant folding
NullPropagation,
OptimizeIn,
Expand Down Expand Up @@ -594,6 +597,22 @@ object BooleanSimplification extends Rule[LogicalPlan] with PredicateHelper {
}
}

/**
* Combines all adjacent [[Union]] and [[Unions]] operators into a single [[Unions]].
*/
object CombineUnions extends Rule[LogicalPlan] {
private def collectUnionChildren(plan: LogicalPlan): Seq[LogicalPlan] = plan match {
case Union(l, r) => collectUnionChildren(l) ++ collectUnionChildren(r)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

you should write this without using recursion to avoid stack overflow.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see. Removing Union introduces a lot of work, but almost done. Will submit a commit tomorrow. Thanks!

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Another option would just be to do this at construction time, that way we can avoid paying the cost in the analyzer. This would still limit the cases we could cache (i.e. we'd miss cached data unioned with other data), but that doesn't seem like a huge deal.

I'd leave this rule here either way.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

To do this at construction time, we need to introduce a new Dataframe API unionAll that can combine more than two Dataframes? @marmbrus @rxin

Is my understanding correct? Thank you!

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hi @marmbrus Could I ask you a question regarding your comment here? I don't understand
the following sentence. Could you give me an example? Thanks!

i.e. we'd miss cached data unioned with other data

case Unions(children) => children.flatMap(collectUnionChildren)
case other => other :: Nil
}

def apply(plan: LogicalPlan): LogicalPlan = plan transformDown {
case u: Union => Unions(collectUnionChildren(u))
case u: Unions => Unions(collectUnionChildren(u))
}
}

/**
* Combines two adjacent [[Filter]] operators into one, merging the
* conditions into one conjunctive predicate.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -169,18 +169,3 @@ object ExtractFiltersAndInnerJoins extends PredicateHelper {
case _ => None
}
}

/**
* A pattern that collects all adjacent unions and returns their children as a Seq.
*/
object Unions {
def unapply(plan: LogicalPlan): Option[Seq[LogicalPlan]] = plan match {
case u: Union => Some(collectUnionChildren(u))
case _ => None
}

private def collectUnionChildren(plan: LogicalPlan): Seq[LogicalPlan] = plan match {
case Union(l, r) => collectUnionChildren(l) ++ collectUnionChildren(r)
case other => other :: Nil
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,13 @@ private[sql] object SetOperation {
def unapply(p: SetOperation): Option[(LogicalPlan, LogicalPlan)] = Some((p.left, p.right))
}

case class Intersect(left: LogicalPlan, right: LogicalPlan) extends SetOperation(left, right)

case class Except(left: LogicalPlan, right: LogicalPlan) extends SetOperation(left, right) {
/** We don't use right.output because those rows get excluded from the set. */
override def output: Seq[Attribute] = left.output
}

case class Union(left: LogicalPlan, right: LogicalPlan) extends SetOperation(left, right) {

override def statistics: Statistics = {
Expand All @@ -115,11 +122,20 @@ case class Union(left: LogicalPlan, right: LogicalPlan) extends SetOperation(lef
}
}

case class Intersect(left: LogicalPlan, right: LogicalPlan) extends SetOperation(left, right)
case class Unions(children: Seq[LogicalPlan]) extends LogicalPlan {

case class Except(left: LogicalPlan, right: LogicalPlan) extends SetOperation(left, right) {
/** We don't use right.output because those rows get excluded from the set. */
override def output: Seq[Attribute] = left.output
override def output: Seq[Attribute] = {
children.tail.foldLeft(children.head.output) { case (currentOutput, child) =>
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd also add some comment here explaining what's going on. at a high level, you are just updating nullability right?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yeah

currentOutput.zip(child.output).map { case (a1, a2) =>
a1.withNullability(a1.nullable || a2.nullable)
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

how about

children.map(_.output).transpose.map(attrs => attrs.head.withNullability(attrs.exists(_.nullable)))

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you!

}
}

override def statistics: Statistics = {
val sizeInBytes = children.map(_.statistics.sizeInBytes).sum
Statistics(sizeInBytes = sizeInBytes)
}
}

case class Join(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,9 @@ class SetOperationPushDownSuite extends PlanTest {
EliminateSubQueries) ::
Batch("Union Pushdown", Once,
SetOperationPushDown,
SimplifyFilters) :: Nil
SimplifyFilters) ::
Batch("Unions", Once,
CombineUnions) :: Nil
}

val testRelation = LocalRelation('a.int, 'b.int, 'c.int)
Expand All @@ -40,6 +42,20 @@ class SetOperationPushDownSuite extends PlanTest {
val testIntersect = Intersect(testRelation, testRelation2)
val testExcept = Except(testRelation, testRelation2)

test("union: combine unions into one unions") {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it's not kind of push-down test, how about we create a new test suite for Union?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

oh, maybe just rename this test suite to SetOperationSuite

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ok, sure, will rename it.

val unionQuery1 = Union(Union(testRelation, testRelation2), testRelation)
val unionQuery2 = Union(testRelation, Union(testRelation2, testRelation))
val unionOptimized1 = Optimize.execute(unionQuery1.analyze)
val unionOptimized2 = Optimize.execute(unionQuery2.analyze)
comparePlans(unionOptimized1, unionOptimized2)

val combinedUnions = Unions(unionOptimized1 :: unionOptimized2 :: Nil)
val combinedUnionsOptimized = Optimize.execute(combinedUnions.analyze)
val unionQuery3 = Union(unionQuery1, unionQuery2)
val unionOptimized3 = Optimize.execute(unionQuery3.analyze)
comparePlans(combinedUnionsOptimized, unionOptimized3)
}

test("union/intersect/except: filter to each side") {
val unionQuery = testUnion.where('a === 1)
val intersectQuery = testIntersect.where('b < 10)
Expand All @@ -50,7 +66,7 @@ class SetOperationPushDownSuite extends PlanTest {
val exceptOptimized = Optimize.execute(exceptQuery.analyze)

val unionCorrectAnswer =
Union(testRelation.where('a === 1), testRelation2.where('d === 1)).analyze
Unions(testRelation.where('a === 1) :: testRelation2.where('d === 1) :: Nil).analyze
val intersectCorrectAnswer =
Intersect(testRelation.where('b < 10), testRelation2.where('e < 10)).analyze
val exceptCorrectAnswer =
Expand All @@ -65,7 +81,7 @@ class SetOperationPushDownSuite extends PlanTest {
val unionQuery = testUnion.select('a)
val unionOptimized = Optimize.execute(unionQuery.analyze)
val unionCorrectAnswer =
Union(testRelation.select('a), testRelation2.select('d)).analyze
Unions(testRelation.select('a) :: testRelation2.select('d) :: Nil ).analyze
comparePlans(unionOptimized, unionCorrectAnswer)
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -347,7 +347,7 @@ private[sql] abstract class SparkStrategies extends QueryPlanner[SparkPlan] {
LocalTableScan(output, data) :: Nil
case logical.Limit(IntegerLiteral(limit), child) =>
execution.Limit(limit, planLater(child)) :: Nil
case Unions(unionChildren) =>
case logical.Unions(unionChildren) =>
execution.Union(unionChildren.map(planLater)) :: Nil
case logical.Except(left, right) =>
execution.Except(planLater(left), planLater(right)) :: Nil
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ import scala.util.Random
import org.scalatest.Matchers._

import org.apache.spark.SparkException
import org.apache.spark.sql.catalyst.plans.logical.OneRowRelation
import org.apache.spark.sql.catalyst.plans.logical.{Unions, OneRowRelation}
import org.apache.spark.sql.execution.Exchange
import org.apache.spark.sql.execution.aggregate.TungstenAggregate
import org.apache.spark.sql.functions._
Expand Down Expand Up @@ -98,6 +98,19 @@ class DataFrameSuite extends QueryTest with SharedSQLContext {
testData.collect().toSeq)
}

test ("union all") {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

remove the space after test

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done. : )

val unionsDF = testData.unionAll(testData).unionAll(testData)
.unionAll(testData).unionAll(testData)

assert(unionsDF.queryExecution.optimizedPlan.collect {
case j @ Unions(Seq(_, _, _, _, _)) => j }.size === 1)

checkAnswer(
unionsDF.agg(avg('key), max('key), min('key), sum('key)),
Row(50.5, 100, 1, 25250) :: Nil
)
}

test("empty data frame") {
assert(sqlContext.emptyDataFrame.columns.toSeq === Seq.empty[String])
assert(sqlContext.emptyDataFrame.count() === 0)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,18 +51,6 @@ class PlannerSuite extends SharedSQLContext {
s"The plan of query $query does not have partial aggregations.")
}

test("unions are collapsed") {
val planner = sqlContext.planner
import planner._
val query = testData.unionAll(testData).unionAll(testData).logicalPlan
val planned = BasicOperators(query).head
val logicalUnions = query collect { case u: logical.Union => u }
val physicalUnions = planned collect { case u: execution.Union => u }

assert(logicalUnions.size === 2)
assert(physicalUnions.size === 1)
}

test("count is partially aggregated") {
val query = testData.groupBy('value).agg(count('key)).queryExecution.analyzed
testPartialAggregationPlan(query)
Expand Down