Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-24865] Remove AnalysisBarrier #21822

Closed
wants to merge 14 commits into from

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,9 @@ trait CheckAnalysis extends PredicateHelper {
// We transform up and order the rules so as to catch the first possible failure instead
// of the result of cascading resolution failures.
plan.foreachUp {

case p if p.analyzed => // Skip already analyzed sub-plans

case u: UnresolvedRelation =>
u.failAnalysis(s"Table or view not found: ${u.tableIdentifier}")

Expand Down Expand Up @@ -364,10 +367,11 @@ trait CheckAnalysis extends PredicateHelper {
}
extendedCheckRules.foreach(_(plan))
plan.foreachUp {
case AnalysisBarrier(child) if !child.resolved => checkAnalysis(child)
case o if !o.resolved => failAnalysis(s"unresolved operator ${o.simpleString}")
case _ =>
}

plan.setAnalyzed()
}

/**
Expand Down Expand Up @@ -531,9 +535,8 @@ trait CheckAnalysis extends PredicateHelper {

var foundNonEqualCorrelatedPred: Boolean = false

// Simplify the predicates before validating any unsupported correlation patterns
// in the plan.
BooleanSimplification(sub).foreachUp {
// Simplify the predicates before validating any unsupported correlation patterns in the plan.
AnalysisHelper.allowInvokingTransformsInAnalyzer { BooleanSimplification(sub).foreachUp {
// Whitelist operators allowed in a correlated subquery
// There are 4 categories:
// 1. Operators that are allowed anywhere in a correlated subquery, and,
Expand Down Expand Up @@ -635,6 +638,6 @@ trait CheckAnalysis extends PredicateHelper {
// are not allowed to have any correlated expressions.
case p =>
failOnOuterReferenceInSubTree(p)
}
}}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@ object DecimalPrecision extends TypeCoercionRule {
PromotePrecision(Cast(e, dataType))
}

override protected def coerceTypes(plan: LogicalPlan): LogicalPlan = plan transformUp {
override protected def coerceTypes(plan: LogicalPlan): LogicalPlan = plan resolveOperators {
// fix decimal precision for expressions
case q => q.transformExpressionsUp(
decimalAndDecimal.orElse(integralAndDecimalLiteral).orElse(nondecimalAndDecimal))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@ object ResolveHints {
}
}

def apply(plan: LogicalPlan): LogicalPlan = plan transformUp {
def apply(plan: LogicalPlan): LogicalPlan = plan resolveOperators {
case h: UnresolvedHint if BROADCAST_HINT_NAMES.contains(h.name.toUpperCase(Locale.ROOT)) =>
if (h.parameters.isEmpty) {
// If there is no table alias specified, turn the entire subtree into a BroadcastHint.
Expand All @@ -107,7 +107,7 @@ object ResolveHints {
* This must be executed after all the other hint rules are executed.
*/
object RemoveAllHints extends Rule[LogicalPlan] {
def apply(plan: LogicalPlan): LogicalPlan = plan transformUp {
def apply(plan: LogicalPlan): LogicalPlan = plan resolveOperators {
case h: UnresolvedHint => h.child
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ import org.apache.spark.sql.types.{StructField, StructType}
* An analyzer rule that replaces [[UnresolvedInlineTable]] with [[LocalRelation]].
*/
case class ResolveInlineTables(conf: SQLConf) extends Rule[LogicalPlan] with CastSupport {
override def apply(plan: LogicalPlan): LogicalPlan = plan transformUp {
override def apply(plan: LogicalPlan): LogicalPlan = plan resolveOperators {
case table: UnresolvedInlineTable if table.expressionsResolved =>
validateInputDimension(table)
validateInputEvaluable(table)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,7 @@ object ResolveTableValuedFunctions extends Rule[LogicalPlan] {
})
)

override def apply(plan: LogicalPlan): LogicalPlan = plan transformUp {
override def apply(plan: LogicalPlan): LogicalPlan = plan resolveOperators {
case u: UnresolvedTableValuedFunction if u.functionArgs.forall(_.resolved) =>
val resolvedFunc = builtinFunctions.get(u.functionName.toLowerCase(Locale.ROOT)) match {
case Some(tvf) =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ class SubstituteUnresolvedOrdinals(conf: SQLConf) extends Rule[LogicalPlan] {
case _ => false
}

def apply(plan: LogicalPlan): LogicalPlan = plan transformUp {
def apply(plan: LogicalPlan): LogicalPlan = plan resolveOperators {
case s: Sort if conf.orderByOrdinal && s.order.exists(o => isIntLiteral(o.child)) =>
val newOrders = s.order.map {
case order @ SortOrder(ordinal @ Literal(index: Int, IntegerType), _, _, _) =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -318,7 +318,7 @@ object TypeCoercion {
*/
object WidenSetOperationTypes extends Rule[LogicalPlan] {

def apply(plan: LogicalPlan): LogicalPlan = plan transformUp {
def apply(plan: LogicalPlan): LogicalPlan = plan resolveOperators {
case s @ SetOperation(left, right) if s.childrenResolved &&
left.output.length == right.output.length && !s.resolved =>
val newChildren: Seq[LogicalPlan] = buildNewChildrenWithWiderTypes(left :: right :: Nil)
Expand Down Expand Up @@ -391,7 +391,7 @@ object TypeCoercion {
}

override protected def coerceTypes(
plan: LogicalPlan): LogicalPlan = plan transformAllExpressions {
plan: LogicalPlan): LogicalPlan = plan resolveExpressions {
// Skip nodes who's children have not been resolved yet.
case e if !e.childrenResolved => e

Expand Down Expand Up @@ -453,7 +453,7 @@ object TypeCoercion {
}

override protected def coerceTypes(
plan: LogicalPlan): LogicalPlan = plan transformAllExpressions {
plan: LogicalPlan): LogicalPlan = plan resolveExpressions {
// Skip nodes who's children have not been resolved yet.
case e if !e.childrenResolved => e

Expand Down Expand Up @@ -512,7 +512,7 @@ object TypeCoercion {
private val trueValues = Seq(1.toByte, 1.toShort, 1, 1L, Decimal.ONE)
private val falseValues = Seq(0.toByte, 0.toShort, 0, 0L, Decimal.ZERO)

def apply(plan: LogicalPlan): LogicalPlan = plan transformAllExpressions {
def apply(plan: LogicalPlan): LogicalPlan = plan resolveExpressions {
// Skip nodes who's children have not been resolved yet.
case e if !e.childrenResolved => e

Expand Down Expand Up @@ -555,7 +555,7 @@ object TypeCoercion {
object FunctionArgumentConversion extends TypeCoercionRule {

override protected def coerceTypes(
plan: LogicalPlan): LogicalPlan = plan transformAllExpressions {
plan: LogicalPlan): LogicalPlan = plan resolveExpressions {
// Skip nodes who's children have not been resolved yet.
case e if !e.childrenResolved => e

Expand Down Expand Up @@ -670,7 +670,7 @@ object TypeCoercion {
*/
object Division extends TypeCoercionRule {
override protected def coerceTypes(
plan: LogicalPlan): LogicalPlan = plan transformAllExpressions {
plan: LogicalPlan): LogicalPlan = plan resolveExpressions {
// Skip nodes who has not been resolved yet,
// as this is an extra rule which should be applied at last.
case e if !e.childrenResolved => e
Expand All @@ -693,7 +693,7 @@ object TypeCoercion {
*/
object CaseWhenCoercion extends TypeCoercionRule {
override protected def coerceTypes(
plan: LogicalPlan): LogicalPlan = plan transformAllExpressions {
plan: LogicalPlan): LogicalPlan = plan resolveExpressions {
case c: CaseWhen if c.childrenResolved && !haveSameType(c.inputTypesForMerging) =>
val maybeCommonType = findWiderCommonType(c.inputTypesForMerging)
maybeCommonType.map { commonType =>
Expand All @@ -711,7 +711,7 @@ object TypeCoercion {
*/
object IfCoercion extends TypeCoercionRule {
override protected def coerceTypes(
plan: LogicalPlan): LogicalPlan = plan transformAllExpressions {
plan: LogicalPlan): LogicalPlan = plan resolveExpressions {
case e if !e.childrenResolved => e
// Find tightest common type for If, if the true value and false value have different types.
case i @ If(pred, left, right) if !haveSameType(i.inputTypesForMerging) =>
Expand All @@ -731,7 +731,7 @@ object TypeCoercion {
* Coerces NullTypes in the Stack expression to the column types of the corresponding positions.
*/
object StackCoercion extends TypeCoercionRule {
override def coerceTypes(plan: LogicalPlan): LogicalPlan = plan transformAllExpressions {
override def coerceTypes(plan: LogicalPlan): LogicalPlan = plan resolveExpressions {
case s @ Stack(children) if s.childrenResolved && s.hasFoldableNumRows =>
Stack(children.zipWithIndex.map {
// The first child is the number of rows for stack.
Expand All @@ -751,7 +751,8 @@ object TypeCoercion {
*/
case class ConcatCoercion(conf: SQLConf) extends TypeCoercionRule {

override protected def coerceTypes(plan: LogicalPlan): LogicalPlan = plan transform { case p =>
override protected def coerceTypes(
plan: LogicalPlan): LogicalPlan = plan resolveOperatorsDown { case p =>
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

im using a weird wrapping here to minimize the diff.

p transformExpressionsUp {
// Skip nodes if unresolved or empty children
case c @ Concat(children) if !c.childrenResolved || children.isEmpty => c
Expand All @@ -773,7 +774,8 @@ object TypeCoercion {
*/
case class EltCoercion(conf: SQLConf) extends TypeCoercionRule {

override protected def coerceTypes(plan: LogicalPlan): LogicalPlan = plan transform { case p =>
override protected def coerceTypes(
plan: LogicalPlan): LogicalPlan = plan resolveOperatorsDown { case p =>
p transformExpressionsUp {
// Skip nodes if unresolved or not enough children
case c @ Elt(children) if !c.childrenResolved || children.size < 2 => c
Expand Down Expand Up @@ -801,7 +803,7 @@ object TypeCoercion {

private val acceptedTypes = Seq(DateType, TimestampType, StringType)

def apply(plan: LogicalPlan): LogicalPlan = plan transformAllExpressions {
def apply(plan: LogicalPlan): LogicalPlan = plan resolveExpressions {
// Skip nodes who's children have not been resolved yet.
case e if !e.childrenResolved => e

Expand All @@ -822,7 +824,7 @@ object TypeCoercion {
private def rejectTzInString = conf.getConf(SQLConf.REJECT_TIMEZONE_IN_STRING)

override protected def coerceTypes(
plan: LogicalPlan): LogicalPlan = plan transformAllExpressions {
plan: LogicalPlan): LogicalPlan = plan resolveExpressions {
// Skip nodes who's children have not been resolved yet.
case e if !e.childrenResolved => e

Expand Down Expand Up @@ -961,7 +963,7 @@ object TypeCoercion {
*/
object WindowFrameCoercion extends TypeCoercionRule {
override protected def coerceTypes(
plan: LogicalPlan): LogicalPlan = plan transformAllExpressions {
plan: LogicalPlan): LogicalPlan = plan resolveExpressions {
case s @ WindowSpecDefinition(_, Seq(order), SpecifiedWindowFrame(RangeFrame, lower, upper))
if order.resolved =>
s.copy(frameSpecification = SpecifiedWindowFrame(
Expand Down Expand Up @@ -999,7 +1001,7 @@ trait TypeCoercionRule extends Rule[LogicalPlan] with Logging {

protected def coerceTypes(plan: LogicalPlan): LogicalPlan

private def propagateTypes(plan: LogicalPlan): LogicalPlan = plan transformUp {
private def propagateTypes(plan: LogicalPlan): LogicalPlan = plan resolveOperators {
// No propagation required for leaf nodes.
case q: LogicalPlan if q.children.isEmpty => q

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ case class ResolveTimeZone(conf: SQLConf) extends Rule[LogicalPlan] {
}

override def apply(plan: LogicalPlan): LogicalPlan =
plan.transformAllExpressions(transformTimeZoneExprs)
plan.resolveExpressions(transformTimeZoneExprs)

def resolveTimeZones(e: Expression): Expression = e.transform(transformTimeZoneExprs)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ import org.apache.spark.sql.internal.SQLConf
* completely resolved during the batch of Resolution.
*/
case class AliasViewChild(conf: SQLConf) extends Rule[LogicalPlan] with CastSupport {
override def apply(plan: LogicalPlan): LogicalPlan = plan transformUp {
override def apply(plan: LogicalPlan): LogicalPlan = plan resolveOperators {
case v @ View(desc, output, child) if child.resolved && output != child.output =>
val resolver = conf.resolver
val queryColumnNames = desc.viewQueryColumnNames
Expand Down
Loading