-
Notifications
You must be signed in to change notification settings - Fork 28.5k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[SPARK-12705] [SPARK-10777] [SQL] Analyzer Rule ResolveSortReferences #10678
Changes from all commits
c2fcaa8
5ca4630
da6baf2
b5de079
d164342
27fcaa5
7fc98e4
0311239
522626b
26945fa
bd3ed13
831baf5
7f11395
598a673
1964884
ba02f46
5bfda35
ddfebbf
c2964da
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -17,6 +17,7 @@ | |
|
||
package org.apache.spark.sql.catalyst.analysis | ||
|
||
import scala.annotation.tailrec | ||
import scala.collection.mutable.ArrayBuffer | ||
|
||
import org.apache.spark.sql.AnalysisException | ||
|
@@ -440,7 +441,7 @@ class Analyzer( | |
} | ||
|
||
// When resolve `SortOrder`s in Sort based on child, don't report errors as | ||
// we still have chance to resolve it based on grandchild | ||
// we still have chance to resolve it based on its descendants | ||
case s @ Sort(ordering, global, child) if child.resolved && !s.resolved => | ||
val newOrdering = resolveSortOrders(ordering, child, throws = false) | ||
Sort(newOrdering, global, child) | ||
|
@@ -521,38 +522,96 @@ class Analyzer( | |
*/ | ||
object ResolveSortReferences extends Rule[LogicalPlan] { | ||
def apply(plan: LogicalPlan): LogicalPlan = plan resolveOperators { | ||
case s @ Sort(ordering, global, p @ Project(projectList, child)) | ||
if !s.resolved && p.resolved => | ||
val (newOrdering, missing) = resolveAndFindMissing(ordering, p, child) | ||
// Skip sort with aggregate. This will be handled in ResolveAggregateFunctions | ||
case sa @ Sort(_, _, child: Aggregate) => sa | ||
|
||
// If this rule was not a no-op, return the transformed plan, otherwise return the original. | ||
if (missing.nonEmpty) { | ||
// Add missing attributes and then project them away after the sort. | ||
Project(p.output, | ||
Sort(newOrdering, global, | ||
Project(projectList ++ missing, child))) | ||
} else { | ||
logDebug(s"Failed to find $missing in ${p.output.mkString(", ")}") | ||
case s @ Sort(_, _, child) if !s.resolved && child.resolved => | ||
val (newOrdering, missingResolvableAttrs) = collectResolvableMissingAttrs(s.order, child) | ||
|
||
if (missingResolvableAttrs.isEmpty) { | ||
val unresolvableAttrs = s.order.filterNot(_.resolved) | ||
logDebug(s"Failed to find $unresolvableAttrs in ${child.output.mkString(", ")}") | ||
s // Nothing we can do here. Return original plan. | ||
} else { | ||
// Add the missing attributes into projectList of Project/Window or | ||
// aggregateExpressions of Aggregate, if they are in the inputSet | ||
// but not in the outputSet of the plan. | ||
val newChild = child transformUp { | ||
case p: Project => | ||
p.copy(projectList = p.projectList ++ | ||
missingResolvableAttrs.filter((p.inputSet -- p.outputSet).contains)) | ||
case w: Window => | ||
w.copy(projectList = w.projectList ++ | ||
missingResolvableAttrs.filter((w.inputSet -- w.outputSet).contains)) | ||
case a: Aggregate => | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This case can never happen right? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. In the following query, we can trigger this case. Actually, this query is based on the failed TPCDS queries. Thus, we added it as a test case. The column If we remove this case, we will get this error:
|
||
val resolvableAttrs = missingResolvableAttrs.filter(a.groupingExpressions.contains) | ||
val notResolvedAttrs = resolvableAttrs.filterNot(a.aggregateExpressions.contains) | ||
val newAggregateExpressions = a.aggregateExpressions ++ notResolvedAttrs | ||
a.copy(aggregateExpressions = newAggregateExpressions) | ||
case o => o | ||
} | ||
|
||
// Add missing attributes and then project them away after the sort. | ||
Project(child.output, | ||
Sort(newOrdering, s.global, newChild)) | ||
} | ||
} | ||
|
||
/** | ||
* Given a child and a grandchild that are present beneath a sort operator, try to resolve | ||
* the sort ordering and returns it with a list of attributes that are missing from the | ||
* child but are present in the grandchild. | ||
* Traverse the tree until resolving the sorting attributes | ||
* Return all the resolvable missing sorting attributes | ||
*/ | ||
@tailrec | ||
private def collectResolvableMissingAttrs( | ||
ordering: Seq[SortOrder], | ||
plan: LogicalPlan): (Seq[SortOrder], Seq[Attribute]) = { | ||
plan match { | ||
// Only Windows and Project have projectList-like attribute. | ||
case un: UnaryNode if un.isInstanceOf[Project] || un.isInstanceOf[Window] => | ||
val (newOrdering, missingAttrs) = resolveAndFindMissing(ordering, un, un.child) | ||
// If missingAttrs is non empty, that means we got it and return it; | ||
// Otherwise, continue to traverse the tree. | ||
if (missingAttrs.nonEmpty) { | ||
(newOrdering, missingAttrs) | ||
} else { | ||
collectResolvableMissingAttrs(ordering, un.child) | ||
} | ||
case a: Aggregate => | ||
val (newOrdering, missingAttrs) = resolveAndFindMissing(ordering, a, a.child) | ||
// For Aggregate, all the order by columns must be specified in group by clauses | ||
if (missingAttrs.nonEmpty && | ||
missingAttrs.forall(ar => a.groupingExpressions.exists(_.semanticEquals(ar)))) { | ||
(newOrdering, missingAttrs) | ||
} else { | ||
// If missingAttrs is empty, we are unable to resolve any unresolved missing attributes | ||
(Seq.empty[SortOrder], Seq.empty[Attribute]) | ||
} | ||
// Jump over the following UnaryNode types | ||
// The output of these types is the same as their child's output | ||
case _: Distinct | | ||
_: Filter | | ||
_: RepartitionByExpression => | ||
collectResolvableMissingAttrs(ordering, plan.asInstanceOf[UnaryNode].child) | ||
// If hitting the other unsupported operators, we are unable to resolve it. | ||
case other => (Seq.empty[SortOrder], Seq.empty[Attribute]) | ||
} | ||
} | ||
|
||
/** | ||
* Try to resolve the sort ordering and returns it with a list of attributes that are missing | ||
* from the plan but are present in the child. | ||
*/ | ||
def resolveAndFindMissing( | ||
private def resolveAndFindMissing( | ||
ordering: Seq[SortOrder], | ||
child: LogicalPlan, | ||
grandchild: LogicalPlan): (Seq[SortOrder], Seq[Attribute]) = { | ||
val newOrdering = resolveSortOrders(ordering, grandchild, throws = true) | ||
plan: LogicalPlan, | ||
child: LogicalPlan): (Seq[SortOrder], Seq[Attribute]) = { | ||
val newOrdering = resolveSortOrders(ordering, child, throws = false) | ||
// Construct a set that contains all of the attributes that we need to evaluate the | ||
// ordering. | ||
val requiredAttributes = AttributeSet(newOrdering).filter(_.resolved) | ||
// Figure out which ones are missing from the projection, so that we can add them and | ||
// remove them after the sort. | ||
val missingInProject = requiredAttributes -- child.output | ||
val missingInProject = requiredAttributes -- plan.outputSet | ||
// It is important to return the new SortOrders here, instead of waiting for the standard | ||
// resolving process as adding attributes to the project below can actually introduce | ||
// ambiguity that was not present before. | ||
|
@@ -707,7 +766,7 @@ class Analyzer( | |
} | ||
} | ||
|
||
protected def containsAggregate(condition: Expression): Boolean = { | ||
def containsAggregate(condition: Expression): Boolean = { | ||
condition.find(_.isInstanceOf[AggregateExpression]).isDefined | ||
} | ||
} | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: too many spaces