-
Notifications
You must be signed in to change notification settings - Fork 28.5k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[SPARK-24865] Remove AnalysisBarrier #21822
Closed
Closed
Changes from 4 commits
Commits
Show all changes
14 commits
Select commit
Hold shift + click to select a range
f6f2bcc
[SPARK-24865] Remove AnalysisBarrier
rxin 8ccafca
Minimize change
rxin 0afa7ea
Fix bug
rxin 738e99c
More bug fixes
rxin 83ffa51
more fixes
rxin 7c76c83
bypass EliminateSubqueryAliases
rxin 14ac09c
Added BooleanSimplification back
rxin 38980ad
revert mistake
rxin abfd0a8
Switch to use thread local to make things go faster.
rxin 75fb114
Test cases
rxin f2f1a97
Remove one TODO
rxin e7d3f53
Merge remote-tracking branch 'upstream/master' into SPARK-24865-xiao
gatorsmile 7995272
add a dummy fix
gatorsmile fe52801
Merge pull request #22 from gatorsmile/SPARK-24865-xiao
rxin File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
136 changes: 61 additions & 75 deletions
136
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
Large diffs are not rendered by default.
Oops, something went wrong.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -318,7 +318,7 @@ object TypeCoercion { | |
*/ | ||
object WidenSetOperationTypes extends Rule[LogicalPlan] { | ||
|
||
def apply(plan: LogicalPlan): LogicalPlan = plan transformUp { | ||
def apply(plan: LogicalPlan): LogicalPlan = plan resolveOperators { | ||
case s @ SetOperation(left, right) if s.childrenResolved && | ||
left.output.length == right.output.length && !s.resolved => | ||
val newChildren: Seq[LogicalPlan] = buildNewChildrenWithWiderTypes(left :: right :: Nil) | ||
|
@@ -391,7 +391,7 @@ object TypeCoercion { | |
} | ||
|
||
override protected def coerceTypes( | ||
plan: LogicalPlan): LogicalPlan = plan transformAllExpressions { | ||
plan: LogicalPlan): LogicalPlan = plan resolveExpressions { | ||
// Skip nodes who's children have not been resolved yet. | ||
case e if !e.childrenResolved => e | ||
|
||
|
@@ -453,7 +453,7 @@ object TypeCoercion { | |
} | ||
|
||
override protected def coerceTypes( | ||
plan: LogicalPlan): LogicalPlan = plan transformAllExpressions { | ||
plan: LogicalPlan): LogicalPlan = plan resolveExpressions { | ||
// Skip nodes who's children have not been resolved yet. | ||
case e if !e.childrenResolved => e | ||
|
||
|
@@ -512,7 +512,7 @@ object TypeCoercion { | |
private val trueValues = Seq(1.toByte, 1.toShort, 1, 1L, Decimal.ONE) | ||
private val falseValues = Seq(0.toByte, 0.toShort, 0, 0L, Decimal.ZERO) | ||
|
||
def apply(plan: LogicalPlan): LogicalPlan = plan transformAllExpressions { | ||
def apply(plan: LogicalPlan): LogicalPlan = plan resolveExpressions { | ||
// Skip nodes who's children have not been resolved yet. | ||
case e if !e.childrenResolved => e | ||
|
||
|
@@ -555,7 +555,7 @@ object TypeCoercion { | |
object FunctionArgumentConversion extends TypeCoercionRule { | ||
|
||
override protected def coerceTypes( | ||
plan: LogicalPlan): LogicalPlan = plan transformAllExpressions { | ||
plan: LogicalPlan): LogicalPlan = plan resolveExpressions { | ||
// Skip nodes who's children have not been resolved yet. | ||
case e if !e.childrenResolved => e | ||
|
||
|
@@ -670,7 +670,7 @@ object TypeCoercion { | |
*/ | ||
object Division extends TypeCoercionRule { | ||
override protected def coerceTypes( | ||
plan: LogicalPlan): LogicalPlan = plan transformAllExpressions { | ||
plan: LogicalPlan): LogicalPlan = plan resolveExpressions { | ||
// Skip nodes who has not been resolved yet, | ||
// as this is an extra rule which should be applied at last. | ||
case e if !e.childrenResolved => e | ||
|
@@ -693,7 +693,7 @@ object TypeCoercion { | |
*/ | ||
object CaseWhenCoercion extends TypeCoercionRule { | ||
override protected def coerceTypes( | ||
plan: LogicalPlan): LogicalPlan = plan transformAllExpressions { | ||
plan: LogicalPlan): LogicalPlan = plan resolveExpressions { | ||
case c: CaseWhen if c.childrenResolved && !haveSameType(c.inputTypesForMerging) => | ||
val maybeCommonType = findWiderCommonType(c.inputTypesForMerging) | ||
maybeCommonType.map { commonType => | ||
|
@@ -711,7 +711,7 @@ object TypeCoercion { | |
*/ | ||
object IfCoercion extends TypeCoercionRule { | ||
override protected def coerceTypes( | ||
plan: LogicalPlan): LogicalPlan = plan transformAllExpressions { | ||
plan: LogicalPlan): LogicalPlan = plan resolveExpressions { | ||
case e if !e.childrenResolved => e | ||
// Find tightest common type for If, if the true value and false value have different types. | ||
case i @ If(pred, left, right) if !haveSameType(i.inputTypesForMerging) => | ||
|
@@ -731,7 +731,7 @@ object TypeCoercion { | |
* Coerces NullTypes in the Stack expression to the column types of the corresponding positions. | ||
*/ | ||
object StackCoercion extends TypeCoercionRule { | ||
override def coerceTypes(plan: LogicalPlan): LogicalPlan = plan transformAllExpressions { | ||
override def coerceTypes(plan: LogicalPlan): LogicalPlan = plan resolveExpressions { | ||
case s @ Stack(children) if s.childrenResolved && s.hasFoldableNumRows => | ||
Stack(children.zipWithIndex.map { | ||
// The first child is the number of rows for stack. | ||
|
@@ -751,7 +751,8 @@ object TypeCoercion { | |
*/ | ||
case class ConcatCoercion(conf: SQLConf) extends TypeCoercionRule { | ||
|
||
override protected def coerceTypes(plan: LogicalPlan): LogicalPlan = plan transform { case p => | ||
override protected def coerceTypes( | ||
plan: LogicalPlan): LogicalPlan = plan resolveOperatorsDown { case p => | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. im using a weird wrapping here to minimize the diff. |
||
p transformExpressionsUp { | ||
// Skip nodes if unresolved or empty children | ||
case c @ Concat(children) if !c.childrenResolved || children.isEmpty => c | ||
|
@@ -773,7 +774,8 @@ object TypeCoercion { | |
*/ | ||
case class EltCoercion(conf: SQLConf) extends TypeCoercionRule { | ||
|
||
override protected def coerceTypes(plan: LogicalPlan): LogicalPlan = plan transform { case p => | ||
override protected def coerceTypes( | ||
plan: LogicalPlan): LogicalPlan = plan resolveOperatorsDown { case p => | ||
p transformExpressionsUp { | ||
// Skip nodes if unresolved or not enough children | ||
case c @ Elt(children) if !c.childrenResolved || children.size < 2 => c | ||
|
@@ -801,7 +803,7 @@ object TypeCoercion { | |
|
||
private val acceptedTypes = Seq(DateType, TimestampType, StringType) | ||
|
||
def apply(plan: LogicalPlan): LogicalPlan = plan transformAllExpressions { | ||
def apply(plan: LogicalPlan): LogicalPlan = plan resolveExpressions { | ||
// Skip nodes who's children have not been resolved yet. | ||
case e if !e.childrenResolved => e | ||
|
||
|
@@ -822,7 +824,7 @@ object TypeCoercion { | |
private def rejectTzInString = conf.getConf(SQLConf.REJECT_TIMEZONE_IN_STRING) | ||
|
||
override protected def coerceTypes( | ||
plan: LogicalPlan): LogicalPlan = plan transformAllExpressions { | ||
plan: LogicalPlan): LogicalPlan = plan resolveExpressions { | ||
// Skip nodes who's children have not been resolved yet. | ||
case e if !e.childrenResolved => e | ||
|
||
|
@@ -961,7 +963,7 @@ object TypeCoercion { | |
*/ | ||
object WindowFrameCoercion extends TypeCoercionRule { | ||
override protected def coerceTypes( | ||
plan: LogicalPlan): LogicalPlan = plan transformAllExpressions { | ||
plan: LogicalPlan): LogicalPlan = plan resolveExpressions { | ||
case s @ WindowSpecDefinition(_, Seq(order), SpecifiedWindowFrame(RangeFrame, lower, upper)) | ||
if order.resolved => | ||
s.copy(frameSpecification = SpecifiedWindowFrame( | ||
|
@@ -999,7 +1001,7 @@ trait TypeCoercionRule extends Rule[LogicalPlan] with Logging { | |
|
||
protected def coerceTypes(plan: LogicalPlan): LogicalPlan | ||
|
||
private def propagateTypes(plan: LogicalPlan): LogicalPlan = plan transformUp { | ||
private def propagateTypes(plan: LogicalPlan): LogicalPlan = plan resolveOperators { | ||
// No propagation required for leaf nodes. | ||
case q: LogicalPlan if q.children.isEmpty => q | ||
|
||
|
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -25,6 +25,21 @@ import org.apache.spark.sql.catalyst.plans.QueryPlan | |
import org.apache.spark.sql.catalyst.plans.logical.statsEstimation.LogicalPlanStats | ||
import org.apache.spark.sql.catalyst.trees.CurrentOrigin | ||
import org.apache.spark.sql.types.StructType | ||
import org.apache.spark.util.Utils | ||
|
||
|
||
object LogicalPlan { | ||
private val bypassTransformAnalyzerCheckFlag = new ThreadLocal[Boolean] { | ||
override def initialValue(): Boolean = false | ||
} | ||
|
||
def bypassTransformAnalyzerCheck[T](p: => T): T = { | ||
bypassTransformAnalyzerCheckFlag.set(true) | ||
try p finally { | ||
bypassTransformAnalyzerCheckFlag.set(false) | ||
} | ||
} | ||
} | ||
|
||
|
||
abstract class LogicalPlan | ||
|
@@ -33,6 +48,98 @@ abstract class LogicalPlan | |
with QueryPlanConstraints | ||
with Logging { | ||
|
||
private var _analyzed: Boolean = false | ||
|
||
/** | ||
* Marks this plan as already analyzed. This should only be called by [[CheckAnalysis]]. | ||
*/ | ||
private[catalyst] def setAnalyzed(): Unit = { _analyzed = true } | ||
|
||
/** | ||
* Returns true if this node and its children have already been gone through analysis and | ||
* verification. Note that this is only an optimization used to avoid analyzing trees that | ||
* have already been analyzed, and can be reset by transformations. | ||
*/ | ||
def analyzed: Boolean = _analyzed | ||
|
||
/** | ||
* Returns a copy of this node where `rule` has been recursively applied first to all of its | ||
* children and then itself (post-order, bottom-up). When `rule` does not apply to a given node, | ||
* it is left unchanged. This function is similar to `transformUp`, but skips sub-trees that | ||
* have already been marked as analyzed. | ||
* | ||
* @param rule the function use to transform this nodes children | ||
*/ | ||
def resolveOperators(rule: PartialFunction[LogicalPlan, LogicalPlan]): LogicalPlan = { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. todo: add unit tests |
||
if (!analyzed) { | ||
val afterRuleOnChildren = mapChildren(_.resolveOperators(rule)) | ||
if (this fastEquals afterRuleOnChildren) { | ||
CurrentOrigin.withOrigin(origin) { | ||
rule.applyOrElse(this, identity[LogicalPlan]) | ||
} | ||
} else { | ||
CurrentOrigin.withOrigin(origin) { | ||
rule.applyOrElse(afterRuleOnChildren, identity[LogicalPlan]) | ||
} | ||
} | ||
} else { | ||
this | ||
} | ||
} | ||
|
||
/** Similar to [[resolveOperators]], but does it top-down. */ | ||
def resolveOperatorsDown(rule: PartialFunction[LogicalPlan, LogicalPlan]): LogicalPlan = { | ||
if (!analyzed) { | ||
val afterRule = CurrentOrigin.withOrigin(origin) { | ||
rule.applyOrElse(this, identity[LogicalPlan]) | ||
} | ||
|
||
// Check if unchanged and then possibly return old copy to avoid gc churn. | ||
if (this fastEquals afterRule) { | ||
mapChildren(_.resolveOperatorsDown(rule)) | ||
} else { | ||
afterRule.mapChildren(_.resolveOperatorsDown(rule)) | ||
} | ||
} else { | ||
this | ||
} | ||
} | ||
|
||
/** | ||
* Recursively transforms the expressions of a tree, skipping nodes that have already | ||
* been analyzed. | ||
*/ | ||
def resolveExpressions(r: PartialFunction[Expression, Expression]): LogicalPlan = { | ||
this resolveOperators { | ||
case p => p.transformExpressions(r) | ||
} | ||
} | ||
|
||
protected def assertNotAnalysisRule(): Unit = { | ||
if (Utils.isTesting && !LogicalPlan.bypassTransformAnalyzerCheckFlag.get) { | ||
if (Thread.currentThread.getStackTrace.exists(_.getClassName.contains("Analyzer"))) { | ||
val e = new RuntimeException("This method should not be called in the analyzer") | ||
e.printStackTrace() | ||
throw e | ||
} | ||
} | ||
} | ||
|
||
override def transformDown(rule: PartialFunction[LogicalPlan, LogicalPlan]): LogicalPlan = { | ||
assertNotAnalysisRule() | ||
super.transformDown(rule) | ||
} | ||
|
||
override def transformUp(rule: PartialFunction[LogicalPlan, LogicalPlan]): LogicalPlan = { | ||
assertNotAnalysisRule() | ||
super.transformUp(rule) | ||
} | ||
|
||
override def transformAllExpressions(rule: PartialFunction[Expression, Expression]): this.type = { | ||
assertNotAnalysisRule() | ||
super.transformAllExpressions(rule) | ||
} | ||
|
||
/** Returns true if this subtree has data from a streaming data source. */ | ||
def isStreaming: Boolean = children.exists(_.isStreaming == true) | ||
|
||
|
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
cc @dilipbiswal @cloud-fan @gatorsmile
Why did we need BooleanSimplification here?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@rxin From what i remember Reynold, most of this logic was housed in Analyzer before and we moved it to optimizer. In the old code we used to walk the plan after simplifying the predicates. The comment used to read "Simplify the predicates before pulling them out.". I just retained that semantics.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@rxin I tracked down the PR that introduced the change in the Analyzer. Here is the link -
#12954
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks. I'm going to add it back.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah, I added boolean simplification here. I didn't quite like it back then, and I still don't like it. I was hoping this was happening in the
Optimizer
now.There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@hvanhovell Hi Herman, as you said, we do the actual pulling up of the predicates in the optimizer in PullupCorrelatedPredicates in subquery.scala. We are also doing a BooleanSimplication first before traversing the plan there. In here, we are doing the error reporting and i thought it would be better to keep the traversal the same way. Basically previously we did the error reporting and rewriting in Analyzer and now, we do the error reporting in checkAnalysis and rewriting in Optimizer. Just to refresh your memory so you can help to take the right call here :-)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Well tests fail without it, so we don't really have a choice here. For a second I thought we could also create some utils class, but that would just mean moving the code in BooleanSimplification in there just for esthetics.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@hvanhovell Yeah. I agree.