-
Notifications
You must be signed in to change notification settings - Fork 28.5k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[SPARK-12616] [SQL] Making Logical Operator Union
Support Arbitrary Number of Children
#10577
Changes from all commits
01e4cdf
6835704
9180687
b38a21e
d2b84af
fda8025
ac0dccd
6e0018b
0546772
b37a64f
73270c8
d9811c7
5d031a7
c1f66f7
c1dcd02
51ad5b2
c2a872c
5681ca8
7a54c8f
95e2349
6a6003e
15ec058
5e06647
ab6dbd7
b821af0
2229932
b3327b1
4276356
2dab708
723c0da
42b81a8
4e0387f
b03d813
ab732c1
0458770
1debdfa
7320e21
741371a
031a5d8
f3d23dc
a56e595
abfcf93
763706d
e8e19a1
3b13ddf
b88bdeb
3041864
6259fd9
4de6ec1
f112026
4f71741
9422a4f
59b5895
c63f237
2e8562d
a571998
52bdf48
c18381e
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -66,7 +66,8 @@ class Analyzer( | |
lazy val batches: Seq[Batch] = Seq( | ||
Batch("Substitution", fixedPoint, | ||
CTESubstitution, | ||
WindowsSubstitution), | ||
WindowsSubstitution, | ||
EliminateUnions), | ||
Batch("Resolution", fixedPoint, | ||
ResolveRelations :: | ||
ResolveReferences :: | ||
|
@@ -1170,6 +1171,15 @@ object EliminateSubQueries extends Rule[LogicalPlan] { | |
} | ||
} | ||
|
||
/** | ||
* Removes [[Union]] operators from the plan if it just has one child. | ||
*/ | ||
object EliminateUnions extends Rule[LogicalPlan] { | ||
def apply(plan: LogicalPlan): LogicalPlan = plan transform { | ||
case Union(children) if children.size == 1 => children.head | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. nit: can be There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Thats actually only going to match when the |
||
} | ||
} | ||
|
||
/** | ||
* Cleans up unnecessary Aliases inside the plan. Basically we only need Alias as a top level | ||
* expression in Project(project list) or Aggregate(aggregate expressions) or | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -19,6 +19,9 @@ package org.apache.spark.sql.catalyst.analysis | |
|
||
import javax.annotation.Nullable | ||
|
||
import scala.annotation.tailrec | ||
import scala.collection.mutable | ||
|
||
import org.apache.spark.sql.catalyst.expressions._ | ||
import org.apache.spark.sql.catalyst.expressions.aggregate._ | ||
import org.apache.spark.sql.catalyst.plans.logical._ | ||
|
@@ -27,7 +30,7 @@ import org.apache.spark.sql.types._ | |
|
||
|
||
/** | ||
* A collection of [[Rule Rules]] that can be used to coerce differing types that participate in | ||
* A collection of [[Rule]] that can be used to coerce differing types that participate in | ||
* operations into compatible ones. | ||
* | ||
* Most of these rules are based on Hive semantics, but they do not introduce any dependencies on | ||
|
@@ -219,31 +222,59 @@ object HiveTypeCoercion { | |
def apply(plan: LogicalPlan): LogicalPlan = plan resolveOperators { | ||
case p if p.analyzed => p | ||
|
||
case s @ SetOperation(left, right) if s.childrenResolved | ||
&& left.output.length == right.output.length && !s.resolved => | ||
case s @ SetOperation(left, right) if s.childrenResolved && | ||
left.output.length == right.output.length && !s.resolved => | ||
val newChildren: Seq[LogicalPlan] = buildNewChildrenWithWiderTypes(left :: right :: Nil) | ||
assert(newChildren.length == 2) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Is this necessary? Looks like it is impossible to have different length here. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Yeah, it is impossible now, but this relies on the implementation of |
||
s.makeCopy(Array(newChildren.head, newChildren.last)) | ||
|
||
// Tracks the list of data types to widen. | ||
// Some(dataType) means the right-hand side and the left-hand side have different types, | ||
// and there is a target type to widen both sides to. | ||
val targetTypes: Seq[Option[DataType]] = left.output.zip(right.output).map { | ||
case (lhs, rhs) if lhs.dataType != rhs.dataType => | ||
findWiderTypeForTwo(lhs.dataType, rhs.dataType) | ||
case other => None | ||
} | ||
case s: Union if s.childrenResolved && | ||
s.children.forall(_.output.length == s.children.head.output.length) && !s.resolved => | ||
val newChildren: Seq[LogicalPlan] = buildNewChildrenWithWiderTypes(s.children) | ||
s.makeCopy(Array(newChildren)) | ||
} | ||
|
||
if (targetTypes.exists(_.isDefined)) { | ||
// There is at least one column to widen. | ||
s.makeCopy(Array(widenTypes(left, targetTypes), widenTypes(right, targetTypes))) | ||
} else { | ||
// If we cannot find any column to widen, then just return the original set. | ||
s | ||
} | ||
/** Build new children with the widest types for each attribute among all the children */ | ||
private def buildNewChildrenWithWiderTypes(children: Seq[LogicalPlan]): Seq[LogicalPlan] = { | ||
require(children.forall(_.output.length == children.head.output.length)) | ||
|
||
// Get a sequence of data types, each of which is the widest type of this specific attribute | ||
// in all the children | ||
val targetTypes: Seq[DataType] = | ||
getWidestTypes(children, attrIndex = 0, mutable.Queue[DataType]()) | ||
|
||
if (targetTypes.nonEmpty) { | ||
// Add an extra Project if the targetTypes are different from the original types. | ||
children.map(widenTypes(_, targetTypes)) | ||
} else { | ||
// Unable to find a target type to widen, then just return the original set. | ||
children | ||
} | ||
} | ||
|
||
/** Get the widest type for each attribute in all the children */ | ||
@tailrec private def getWidestTypes( | ||
children: Seq[LogicalPlan], | ||
attrIndex: Int, | ||
castedTypes: mutable.Queue[DataType]): Seq[DataType] = { | ||
// Return the result after the widen data types have been found for all the children | ||
if (attrIndex >= children.head.output.length) return castedTypes.toSeq | ||
|
||
// For the attrIndex-th attribute, find the widest type | ||
findWiderCommonType(children.map(_.output(attrIndex).dataType)) match { | ||
// If unable to find an appropriate widen type for this column, return an empty Seq | ||
case None => Seq.empty[DataType] | ||
// Otherwise, record the result in the queue and find the type for the next column | ||
case Some(widenType) => | ||
castedTypes.enqueue(widenType) | ||
getWidestTypes(children, attrIndex + 1, castedTypes) | ||
} | ||
} | ||
|
||
/** Given a plan, add an extra project on top to widen some columns' data types. */ | ||
private def widenTypes(plan: LogicalPlan, targetTypes: Seq[Option[DataType]]): LogicalPlan = { | ||
private def widenTypes(plan: LogicalPlan, targetTypes: Seq[DataType]): LogicalPlan = { | ||
val casted = plan.output.zip(targetTypes).map { | ||
case (e, Some(dt)) if e.dataType != dt => Alias(Cast(e, dt), e.name)() | ||
case (e, dt) if e.dataType != dt => Alias(Cast(e, dt), e.name)() | ||
case (e, _) => e | ||
} | ||
Project(casted, plan) | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -23,7 +23,7 @@ import org.apache.spark.sql.catalyst.analysis.{CleanupAliases, EliminateSubQueri | |
import org.apache.spark.sql.catalyst.expressions._ | ||
import org.apache.spark.sql.catalyst.expressions.Literal.{FalseLiteral, TrueLiteral} | ||
import org.apache.spark.sql.catalyst.expressions.aggregate._ | ||
import org.apache.spark.sql.catalyst.planning.ExtractFiltersAndInnerJoins | ||
import org.apache.spark.sql.catalyst.planning.{ExtractFiltersAndInnerJoins, Unions} | ||
import org.apache.spark.sql.catalyst.plans.{FullOuter, Inner, LeftOuter, LeftSemi, RightOuter} | ||
import org.apache.spark.sql.catalyst.plans.logical._ | ||
import org.apache.spark.sql.catalyst.rules._ | ||
|
@@ -45,6 +45,13 @@ abstract class Optimizer extends RuleExecutor[LogicalPlan] { | |
////////////////////////////////////////////////////////////////////////////////////////// | ||
// Optimizer rules start here | ||
////////////////////////////////////////////////////////////////////////////////////////// | ||
// - Do the first call of CombineUnions before starting the major Optimizer rules, | ||
// since it can reduce the number of iteration and the other rules could add/move | ||
// extra operators between two adjacent Union operators. | ||
// - Call CombineUnions again in Batch("Operator Optimizations"), | ||
// since the other rules might make two separate Unions operators adjacent. | ||
Batch("Union", Once, | ||
CombineUnions) :: | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Do you have a case that we must call There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think this is done for performance. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. (we should document it inline) There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I will also add performance issue in the comment. @cloud-fan #10451 This PR will add extra
|
||
Batch("Aggregate", FixedPoint(100), | ||
ReplaceDistinctWithAggregate, | ||
RemoveLiteralFromGroupExpressions) :: | ||
|
@@ -62,6 +69,7 @@ abstract class Optimizer extends RuleExecutor[LogicalPlan] { | |
ProjectCollapsing, | ||
CombineFilters, | ||
CombineLimits, | ||
CombineUnions, | ||
// Constant folding and strength reduction | ||
NullPropagation, | ||
OptimizeIn, | ||
|
@@ -138,11 +146,9 @@ object SetOperationPushDown extends Rule[LogicalPlan] with PredicateHelper { | |
/** | ||
* Maps Attributes from the left side to the corresponding Attribute on the right side. | ||
*/ | ||
private def buildRewrites(bn: BinaryNode): AttributeMap[Attribute] = { | ||
assert(bn.isInstanceOf[Union] || bn.isInstanceOf[Intersect] || bn.isInstanceOf[Except]) | ||
assert(bn.left.output.size == bn.right.output.size) | ||
|
||
AttributeMap(bn.left.output.zip(bn.right.output)) | ||
private def buildRewrites(left: LogicalPlan, right: LogicalPlan): AttributeMap[Attribute] = { | ||
assert(left.output.size == right.output.size) | ||
AttributeMap(left.output.zip(right.output)) | ||
} | ||
|
||
/** | ||
|
@@ -176,32 +182,38 @@ object SetOperationPushDown extends Rule[LogicalPlan] with PredicateHelper { | |
} | ||
|
||
def apply(plan: LogicalPlan): LogicalPlan = plan transform { | ||
// Push down filter into union | ||
case Filter(condition, u @ Union(left, right)) => | ||
val (deterministic, nondeterministic) = partitionByDeterministic(condition) | ||
val rewrites = buildRewrites(u) | ||
Filter(nondeterministic, | ||
Union( | ||
Filter(deterministic, left), | ||
Filter(pushToRight(deterministic, rewrites), right) | ||
) | ||
) | ||
|
||
// Push down deterministic projection through UNION ALL | ||
case p @ Project(projectList, u @ Union(left, right)) => | ||
case p @ Project(projectList, Union(children)) => | ||
assert(children.nonEmpty) | ||
if (projectList.forall(_.deterministic)) { | ||
val rewrites = buildRewrites(u) | ||
Union( | ||
Project(projectList, left), | ||
Project(projectList.map(pushToRight(_, rewrites)), right)) | ||
val newFirstChild = Project(projectList, children.head) | ||
val newOtherChildren = children.tail.map ( child => { | ||
val rewrites = buildRewrites(children.head, child) | ||
Project(projectList.map(pushToRight(_, rewrites)), child) | ||
} ) | ||
Union(newFirstChild +: newOtherChildren) | ||
} else { | ||
p | ||
} | ||
|
||
// Push down filter into union | ||
case Filter(condition, Union(children)) => | ||
assert(children.nonEmpty) | ||
val (deterministic, nondeterministic) = partitionByDeterministic(condition) | ||
val newFirstChild = Filter(deterministic, children.head) | ||
val newOtherChildren = children.tail.map { | ||
child => { | ||
val rewrites = buildRewrites(children.head, child) | ||
Filter(pushToRight(deterministic, rewrites), child) | ||
} | ||
} | ||
Filter(nondeterministic, Union(newFirstChild +: newOtherChildren)) | ||
|
||
// Push down filter through INTERSECT | ||
case Filter(condition, i @ Intersect(left, right)) => | ||
case Filter(condition, Intersect(left, right)) => | ||
val (deterministic, nondeterministic) = partitionByDeterministic(condition) | ||
val rewrites = buildRewrites(i) | ||
val rewrites = buildRewrites(left, right) | ||
Filter(nondeterministic, | ||
Intersect( | ||
Filter(deterministic, left), | ||
|
@@ -210,9 +222,9 @@ object SetOperationPushDown extends Rule[LogicalPlan] with PredicateHelper { | |
) | ||
|
||
// Push down filter through EXCEPT | ||
case Filter(condition, e @ Except(left, right)) => | ||
case Filter(condition, Except(left, right)) => | ||
val (deterministic, nondeterministic) = partitionByDeterministic(condition) | ||
val rewrites = buildRewrites(e) | ||
val rewrites = buildRewrites(left, right) | ||
Filter(nondeterministic, | ||
Except( | ||
Filter(deterministic, left), | ||
|
@@ -662,6 +674,15 @@ object SimplifyConditionals extends Rule[LogicalPlan] with PredicateHelper { | |
} | ||
} | ||
|
||
/** | ||
* Combines all adjacent [[Union]] operators into a single [[Union]]. | ||
*/ | ||
object CombineUnions extends Rule[LogicalPlan] { | ||
def apply(plan: LogicalPlan): LogicalPlan = plan transform { | ||
case Unions(children) => Union(children) | ||
} | ||
} | ||
|
||
/** | ||
* Combines two adjacent [[Filter]] operators into one, merging the | ||
* conditions into one conjunctive predicate. | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -17,6 +17,9 @@ | |
|
||
package org.apache.spark.sql.catalyst.planning | ||
|
||
import scala.annotation.tailrec | ||
import scala.collection.mutable | ||
|
||
import org.apache.spark.Logging | ||
import org.apache.spark.sql.catalyst.expressions._ | ||
import org.apache.spark.sql.catalyst.plans._ | ||
|
@@ -170,17 +173,29 @@ object ExtractFiltersAndInnerJoins extends PredicateHelper { | |
} | ||
} | ||
|
||
|
||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I'm not sure I would get rid of this, just use it in your optimization rule. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Sure, will reimplement it using this way. |
||
/** | ||
* A pattern that collects all adjacent unions and returns their children as a Seq. | ||
*/ | ||
object Unions { | ||
def unapply(plan: LogicalPlan): Option[Seq[LogicalPlan]] = plan match { | ||
case u: Union => Some(collectUnionChildren(u)) | ||
case u: Union => Some(collectUnionChildren(mutable.Stack(u), Seq.empty[LogicalPlan])) | ||
case _ => None | ||
} | ||
|
||
private def collectUnionChildren(plan: LogicalPlan): Seq[LogicalPlan] = plan match { | ||
case Union(l, r) => collectUnionChildren(l) ++ collectUnionChildren(r) | ||
case other => other :: Nil | ||
// Doing a depth-first tree traversal to combine all the union children. | ||
@tailrec | ||
private def collectUnionChildren( | ||
plans: mutable.Stack[LogicalPlan], | ||
children: Seq[LogicalPlan]): Seq[LogicalPlan] = { | ||
if (plans.isEmpty) children | ||
else { | ||
plans.pop match { | ||
case Union(grandchildren) => | ||
grandchildren.reverseMap(plans.push(_)) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. instead of reverse here, why not just use There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Using a stack is for doing a depth-first tree traversal. For example, the users might expect the order of unions in the following two cases should be the same? Or they might not care it?
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. order do matters, you are right |
||
collectUnionChildren(plans, children) | ||
case other => collectUnionChildren(plans, children :+ other) | ||
} | ||
} | ||
} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Minor: you might actually just make
Union
smart enough to return the node itself when there is only one.There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Oh, sorry I assumed it was varargs. Maybe this is hard to do.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
After the merge, I am planning to introduce a new rule in
Optimizer
to detect if any child ofUnion
can be pruned or if there exists only one child (e.g., after pruning), we can remove theUnion
operator from the logical plan. I am wondering if this sounds good to you? Thank you!There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
now we will eliminate one-child
Union
during analysis, so it's ok to just returnUnion
hereThere was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This one is pretty tricky. If we add
Union
here, many test cases will fail even if we eliminateUnion
in the early stage of the analyzer. Let me show you my analysis:In the following test case,
If we always add
Union
in the parser as you suggested above, the testcase will not execute the statement:sql("INSERT OVERWRITE TABLE p SELECT * FROM t")
In Dataframe.scala, we have codes to handle
InsertIntoTable
in an eager way. : (I assume we still want to keep this as a special case? Please let me know what you think. Thanks! @cloud-fan
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ah makes sense, let's keep it