Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-24865] Remove AnalysisBarrier #21822

Closed
wants to merge 14 commits into from

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,9 @@ trait CheckAnalysis extends PredicateHelper {
// We transform up and order the rules so as to catch the first possible failure instead
// of the result of cascading resolution failures.
plan.foreachUp {

case p if p.analyzed => // Skip already analyzed sub-plans

case u: UnresolvedRelation =>
u.failAnalysis(s"Table or view not found: ${u.tableIdentifier}")

Expand Down Expand Up @@ -364,10 +367,11 @@ trait CheckAnalysis extends PredicateHelper {
}
extendedCheckRules.foreach(_(plan))
plan.foreachUp {
case AnalysisBarrier(child) if !child.resolved => checkAnalysis(child)
case o if !o.resolved => failAnalysis(s"unresolved operator ${o.simpleString}")
case _ =>
}

plan.foreach(_.setAnalyzed())
}

/**
Expand Down Expand Up @@ -533,7 +537,8 @@ trait CheckAnalysis extends PredicateHelper {

// Simplify the predicates before validating any unsupported correlation patterns
// in the plan.
BooleanSimplification(sub).foreachUp {
// TODO(rxin): Why did this need to call BooleanSimplification???
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

cc @dilipbiswal @cloud-fan @gatorsmile

Why did we need BooleanSimplification here?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@rxin From what i remember Reynold, most of this logic was housed in Analyzer before and we moved it to optimizer. In the old code we used to walk the plan after simplifying the predicates. The comment used to read "Simplify the predicates before pulling them out.". I just retained that semantics.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@rxin I tracked down the PR that introduced the change in the Analyzer. Here is the link -
#12954

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks. I'm going to add it back.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, I added boolean simplification here. I didn't quite like it back then, and I still don't like it. I was hoping this was happening in the Optimizer now.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@hvanhovell Hi Herman, as you said, we do the actual pulling up of the predicates in the optimizer in PullupCorrelatedPredicates in subquery.scala. We are also doing a BooleanSimplication first before traversing the plan there. In here, we are doing the error reporting and i thought it would be better to keep the traversal the same way. Basically previously we did the error reporting and rewriting in Analyzer and now, we do the error reporting in checkAnalysis and rewriting in Optimizer. Just to refresh your memory so you can help to take the right call here :-)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Well tests fail without it, so we don't really have a choice here. For a second I thought we could also create some utils class, but that would just mean moving the code in BooleanSimplification in there just for esthetics.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@hvanhovell Yeah. I agree.

sub.foreachUp {
// Whitelist operators allowed in a correlated subquery
// There are 4 categories:
// 1. Operators that are allowed anywhere in a correlated subquery, and,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@ object DecimalPrecision extends TypeCoercionRule {
PromotePrecision(Cast(e, dataType))
}

override protected def coerceTypes(plan: LogicalPlan): LogicalPlan = plan transformUp {
override protected def coerceTypes(plan: LogicalPlan): LogicalPlan = plan resolveOperators {
// fix decimal precision for expressions
case q => q.transformExpressionsUp(
decimalAndDecimal.orElse(integralAndDecimalLiteral).orElse(nondecimalAndDecimal))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@ object ResolveHints {
}
}

def apply(plan: LogicalPlan): LogicalPlan = plan transformUp {
def apply(plan: LogicalPlan): LogicalPlan = plan resolveOperators {
case h: UnresolvedHint if BROADCAST_HINT_NAMES.contains(h.name.toUpperCase(Locale.ROOT)) =>
if (h.parameters.isEmpty) {
// If there is no table alias specified, turn the entire subtree into a BroadcastHint.
Expand All @@ -107,7 +107,7 @@ object ResolveHints {
* This must be executed after all the other hint rules are executed.
*/
object RemoveAllHints extends Rule[LogicalPlan] {
def apply(plan: LogicalPlan): LogicalPlan = plan transformUp {
def apply(plan: LogicalPlan): LogicalPlan = plan resolveOperators {
case h: UnresolvedHint => h.child
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ import org.apache.spark.sql.types.{StructField, StructType}
* An analyzer rule that replaces [[UnresolvedInlineTable]] with [[LocalRelation]].
*/
case class ResolveInlineTables(conf: SQLConf) extends Rule[LogicalPlan] with CastSupport {
override def apply(plan: LogicalPlan): LogicalPlan = plan transformUp {
override def apply(plan: LogicalPlan): LogicalPlan = plan resolveOperators {
case table: UnresolvedInlineTable if table.expressionsResolved =>
validateInputDimension(table)
validateInputEvaluable(table)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,7 @@ object ResolveTableValuedFunctions extends Rule[LogicalPlan] {
})
)

override def apply(plan: LogicalPlan): LogicalPlan = plan transformUp {
override def apply(plan: LogicalPlan): LogicalPlan = plan resolveOperators {
case u: UnresolvedTableValuedFunction if u.functionArgs.forall(_.resolved) =>
val resolvedFunc = builtinFunctions.get(u.functionName.toLowerCase(Locale.ROOT)) match {
case Some(tvf) =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ class SubstituteUnresolvedOrdinals(conf: SQLConf) extends Rule[LogicalPlan] {
case _ => false
}

def apply(plan: LogicalPlan): LogicalPlan = plan transformUp {
def apply(plan: LogicalPlan): LogicalPlan = plan resolveOperators {
case s: Sort if conf.orderByOrdinal && s.order.exists(o => isIntLiteral(o.child)) =>
val newOrders = s.order.map {
case order @ SortOrder(ordinal @ Literal(index: Int, IntegerType), _, _, _) =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -318,7 +318,7 @@ object TypeCoercion {
*/
object WidenSetOperationTypes extends Rule[LogicalPlan] {

def apply(plan: LogicalPlan): LogicalPlan = plan transformUp {
def apply(plan: LogicalPlan): LogicalPlan = plan resolveOperators {
case s @ SetOperation(left, right) if s.childrenResolved &&
left.output.length == right.output.length && !s.resolved =>
val newChildren: Seq[LogicalPlan] = buildNewChildrenWithWiderTypes(left :: right :: Nil)
Expand Down Expand Up @@ -391,7 +391,7 @@ object TypeCoercion {
}

override protected def coerceTypes(
plan: LogicalPlan): LogicalPlan = plan transformAllExpressions {
plan: LogicalPlan): LogicalPlan = plan resolveExpressions {
// Skip nodes who's children have not been resolved yet.
case e if !e.childrenResolved => e

Expand Down Expand Up @@ -453,7 +453,7 @@ object TypeCoercion {
}

override protected def coerceTypes(
plan: LogicalPlan): LogicalPlan = plan transformAllExpressions {
plan: LogicalPlan): LogicalPlan = plan resolveExpressions {
// Skip nodes who's children have not been resolved yet.
case e if !e.childrenResolved => e

Expand Down Expand Up @@ -512,7 +512,7 @@ object TypeCoercion {
private val trueValues = Seq(1.toByte, 1.toShort, 1, 1L, Decimal.ONE)
private val falseValues = Seq(0.toByte, 0.toShort, 0, 0L, Decimal.ZERO)

def apply(plan: LogicalPlan): LogicalPlan = plan transformAllExpressions {
def apply(plan: LogicalPlan): LogicalPlan = plan resolveExpressions {
// Skip nodes who's children have not been resolved yet.
case e if !e.childrenResolved => e

Expand Down Expand Up @@ -555,7 +555,7 @@ object TypeCoercion {
object FunctionArgumentConversion extends TypeCoercionRule {

override protected def coerceTypes(
plan: LogicalPlan): LogicalPlan = plan transformAllExpressions {
plan: LogicalPlan): LogicalPlan = plan resolveExpressions {
// Skip nodes who's children have not been resolved yet.
case e if !e.childrenResolved => e

Expand Down Expand Up @@ -670,7 +670,7 @@ object TypeCoercion {
*/
object Division extends TypeCoercionRule {
override protected def coerceTypes(
plan: LogicalPlan): LogicalPlan = plan transformAllExpressions {
plan: LogicalPlan): LogicalPlan = plan resolveExpressions {
// Skip nodes who has not been resolved yet,
// as this is an extra rule which should be applied at last.
case e if !e.childrenResolved => e
Expand All @@ -693,7 +693,7 @@ object TypeCoercion {
*/
object CaseWhenCoercion extends TypeCoercionRule {
override protected def coerceTypes(
plan: LogicalPlan): LogicalPlan = plan transformAllExpressions {
plan: LogicalPlan): LogicalPlan = plan resolveExpressions {
case c: CaseWhen if c.childrenResolved && !haveSameType(c.inputTypesForMerging) =>
val maybeCommonType = findWiderCommonType(c.inputTypesForMerging)
maybeCommonType.map { commonType =>
Expand All @@ -711,7 +711,7 @@ object TypeCoercion {
*/
object IfCoercion extends TypeCoercionRule {
override protected def coerceTypes(
plan: LogicalPlan): LogicalPlan = plan transformAllExpressions {
plan: LogicalPlan): LogicalPlan = plan resolveExpressions {
case e if !e.childrenResolved => e
// Find tightest common type for If, if the true value and false value have different types.
case i @ If(pred, left, right) if !haveSameType(i.inputTypesForMerging) =>
Expand All @@ -731,7 +731,7 @@ object TypeCoercion {
* Coerces NullTypes in the Stack expression to the column types of the corresponding positions.
*/
object StackCoercion extends TypeCoercionRule {
override def coerceTypes(plan: LogicalPlan): LogicalPlan = plan transformAllExpressions {
override def coerceTypes(plan: LogicalPlan): LogicalPlan = plan resolveExpressions {
case s @ Stack(children) if s.childrenResolved && s.hasFoldableNumRows =>
Stack(children.zipWithIndex.map {
// The first child is the number of rows for stack.
Expand All @@ -751,7 +751,8 @@ object TypeCoercion {
*/
case class ConcatCoercion(conf: SQLConf) extends TypeCoercionRule {

override protected def coerceTypes(plan: LogicalPlan): LogicalPlan = plan transform { case p =>
override protected def coerceTypes(
plan: LogicalPlan): LogicalPlan = plan resolveOperatorsDown { case p =>
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

im using a weird wrapping here to minimize the diff.

p transformExpressionsUp {
// Skip nodes if unresolved or empty children
case c @ Concat(children) if !c.childrenResolved || children.isEmpty => c
Expand All @@ -773,7 +774,8 @@ object TypeCoercion {
*/
case class EltCoercion(conf: SQLConf) extends TypeCoercionRule {

override protected def coerceTypes(plan: LogicalPlan): LogicalPlan = plan transform { case p =>
override protected def coerceTypes(
plan: LogicalPlan): LogicalPlan = plan resolveOperatorsDown { case p =>
p transformExpressionsUp {
// Skip nodes if unresolved or not enough children
case c @ Elt(children) if !c.childrenResolved || children.size < 2 => c
Expand Down Expand Up @@ -801,7 +803,7 @@ object TypeCoercion {

private val acceptedTypes = Seq(DateType, TimestampType, StringType)

def apply(plan: LogicalPlan): LogicalPlan = plan transformAllExpressions {
def apply(plan: LogicalPlan): LogicalPlan = plan resolveExpressions {
// Skip nodes who's children have not been resolved yet.
case e if !e.childrenResolved => e

Expand All @@ -822,7 +824,7 @@ object TypeCoercion {
private def rejectTzInString = conf.getConf(SQLConf.REJECT_TIMEZONE_IN_STRING)

override protected def coerceTypes(
plan: LogicalPlan): LogicalPlan = plan transformAllExpressions {
plan: LogicalPlan): LogicalPlan = plan resolveExpressions {
// Skip nodes who's children have not been resolved yet.
case e if !e.childrenResolved => e

Expand Down Expand Up @@ -961,7 +963,7 @@ object TypeCoercion {
*/
object WindowFrameCoercion extends TypeCoercionRule {
override protected def coerceTypes(
plan: LogicalPlan): LogicalPlan = plan transformAllExpressions {
plan: LogicalPlan): LogicalPlan = plan resolveExpressions {
case s @ WindowSpecDefinition(_, Seq(order), SpecifiedWindowFrame(RangeFrame, lower, upper))
if order.resolved =>
s.copy(frameSpecification = SpecifiedWindowFrame(
Expand Down Expand Up @@ -999,7 +1001,7 @@ trait TypeCoercionRule extends Rule[LogicalPlan] with Logging {

protected def coerceTypes(plan: LogicalPlan): LogicalPlan

private def propagateTypes(plan: LogicalPlan): LogicalPlan = plan transformUp {
private def propagateTypes(plan: LogicalPlan): LogicalPlan = plan resolveOperators {
// No propagation required for leaf nodes.
case q: LogicalPlan if q.children.isEmpty => q

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ case class ResolveTimeZone(conf: SQLConf) extends Rule[LogicalPlan] {
}

override def apply(plan: LogicalPlan): LogicalPlan =
plan.transformAllExpressions(transformTimeZoneExprs)
plan.resolveExpressions(transformTimeZoneExprs)

def resolveTimeZones(e: Expression): Expression = e.transform(transformTimeZoneExprs)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ import org.apache.spark.sql.internal.SQLConf
* completely resolved during the batch of Resolution.
*/
case class AliasViewChild(conf: SQLConf) extends Rule[LogicalPlan] with CastSupport {
override def apply(plan: LogicalPlan): LogicalPlan = plan transformUp {
override def apply(plan: LogicalPlan): LogicalPlan = plan resolveOperators {
case v @ View(desc, output, child) if child.resolved && output != child.output =>
val resolver = conf.resolver
val queryColumnNames = desc.viewQueryColumnNames
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,21 @@ import org.apache.spark.sql.catalyst.plans.QueryPlan
import org.apache.spark.sql.catalyst.plans.logical.statsEstimation.LogicalPlanStats
import org.apache.spark.sql.catalyst.trees.CurrentOrigin
import org.apache.spark.sql.types.StructType
import org.apache.spark.util.Utils


object LogicalPlan {
private val bypassTransformAnalyzerCheckFlag = new ThreadLocal[Boolean] {
override def initialValue(): Boolean = false
}

def bypassTransformAnalyzerCheck[T](p: => T): T = {
bypassTransformAnalyzerCheckFlag.set(true)
try p finally {
bypassTransformAnalyzerCheckFlag.set(false)
}
}
}


abstract class LogicalPlan
Expand All @@ -33,6 +48,98 @@ abstract class LogicalPlan
with QueryPlanConstraints
with Logging {

private var _analyzed: Boolean = false

/**
* Marks this plan as already analyzed. This should only be called by [[CheckAnalysis]].
*/
private[catalyst] def setAnalyzed(): Unit = { _analyzed = true }

/**
* Returns true if this node and its children have already been gone through analysis and
* verification. Note that this is only an optimization used to avoid analyzing trees that
* have already been analyzed, and can be reset by transformations.
*/
def analyzed: Boolean = _analyzed

/**
* Returns a copy of this node where `rule` has been recursively applied first to all of its
* children and then itself (post-order, bottom-up). When `rule` does not apply to a given node,
* it is left unchanged. This function is similar to `transformUp`, but skips sub-trees that
* have already been marked as analyzed.
*
* @param rule the function use to transform this nodes children
*/
def resolveOperators(rule: PartialFunction[LogicalPlan, LogicalPlan]): LogicalPlan = {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

todo: add unit tests

if (!analyzed) {
val afterRuleOnChildren = mapChildren(_.resolveOperators(rule))
if (this fastEquals afterRuleOnChildren) {
CurrentOrigin.withOrigin(origin) {
rule.applyOrElse(this, identity[LogicalPlan])
}
} else {
CurrentOrigin.withOrigin(origin) {
rule.applyOrElse(afterRuleOnChildren, identity[LogicalPlan])
}
}
} else {
this
}
}

/** Similar to [[resolveOperators]], but does it top-down. */
def resolveOperatorsDown(rule: PartialFunction[LogicalPlan, LogicalPlan]): LogicalPlan = {
if (!analyzed) {
val afterRule = CurrentOrigin.withOrigin(origin) {
rule.applyOrElse(this, identity[LogicalPlan])
}

// Check if unchanged and then possibly return old copy to avoid gc churn.
if (this fastEquals afterRule) {
mapChildren(_.resolveOperatorsDown(rule))
} else {
afterRule.mapChildren(_.resolveOperatorsDown(rule))
}
} else {
this
}
}

/**
* Recursively transforms the expressions of a tree, skipping nodes that have already
* been analyzed.
*/
def resolveExpressions(r: PartialFunction[Expression, Expression]): LogicalPlan = {
this resolveOperators {
case p => p.transformExpressions(r)
}
}

protected def assertNotAnalysisRule(): Unit = {
if (Utils.isTesting && !LogicalPlan.bypassTransformAnalyzerCheckFlag.get) {
if (Thread.currentThread.getStackTrace.exists(_.getClassName.contains("Analyzer"))) {
val e = new RuntimeException("This method should not be called in the analyzer")
e.printStackTrace()
throw e
}
}
}

override def transformDown(rule: PartialFunction[LogicalPlan, LogicalPlan]): LogicalPlan = {
assertNotAnalysisRule()
super.transformDown(rule)
}

override def transformUp(rule: PartialFunction[LogicalPlan, LogicalPlan]): LogicalPlan = {
assertNotAnalysisRule()
super.transformUp(rule)
}

override def transformAllExpressions(rule: PartialFunction[Expression, Expression]): this.type = {
assertNotAnalysisRule()
super.transformAllExpressions(rule)
}

/** Returns true if this subtree has data from a streaming data source. */
def isStreaming: Boolean = children.exists(_.isStreaming == true)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -545,20 +545,6 @@ class AnalysisSuite extends AnalysisTest with Matchers {
}
}

test("SPARK-20392: analysis barrier") {
// [[AnalysisBarrier]] will be removed after analysis
checkAnalysis(
Project(Seq(UnresolvedAttribute("tbl.a")),
AnalysisBarrier(SubqueryAlias("tbl", testRelation))),
Project(testRelation.output, SubqueryAlias("tbl", testRelation)))

// Verify we won't go through a plan wrapped in a barrier.
// Since we wrap an unresolved plan and analyzer won't go through it. It remains unresolved.
val barrier = AnalysisBarrier(Project(Seq(UnresolvedAttribute("tbl.b")),
SubqueryAlias("tbl", testRelation)))
assertAnalysisError(barrier, Seq("cannot resolve '`tbl.b`'"))
}

test("SPARK-24208: analysis fails on self-join with FlatMapGroupsInPandas") {
val pythonUdf = PythonUDF("pyUDF", null,
StructType(Seq(StructField("a", LongType))),
Expand Down
Loading