Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-34639][SQL] Always remove unnecessary Alias in Analyzer.resolveExpression #31758

Closed
wants to merge 2 commits into from
Closed
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -1628,11 +1628,11 @@ class Analyzer(override val catalogManager: CatalogManager)
}

val resolvedGroupingExprs = a.groupingExpressions
.map(resolveExpressionByPlanChildren(_, planForResolve, trimAlias = true))
.map(resolveExpressionByPlanChildren(_, planForResolve))
.map(trimTopLevelGetStructFieldAlias)

val resolvedAggExprs = a.aggregateExpressions
.map(resolveExpressionByPlanChildren(_, planForResolve, trimAlias = true))
.map(resolveExpressionByPlanChildren(_, planForResolve))
.map(_.asInstanceOf[NamedExpression])

a.copy(resolvedGroupingExprs, resolvedAggExprs, a.child)
Expand All @@ -1644,15 +1644,15 @@ class Analyzer(override val catalogManager: CatalogManager)
// of GetStructField here.
case g: GroupingSets =>
val resolvedSelectedExprs = g.selectedGroupByExprs
.map(_.map(resolveExpressionByPlanChildren(_, g, trimAlias = true))
.map(_.map(resolveExpressionByPlanChildren(_, g))
.map(trimTopLevelGetStructFieldAlias))

val resolvedGroupingExprs = g.groupByExprs
.map(resolveExpressionByPlanChildren(_, g, trimAlias = true))
.map(resolveExpressionByPlanChildren(_, g))
.map(trimTopLevelGetStructFieldAlias)

val resolvedAggExprs = g.aggregations
.map(resolveExpressionByPlanChildren(_, g, trimAlias = true))
.map(resolveExpressionByPlanChildren(_, g))
.map(_.asInstanceOf[NamedExpression])

g.copy(resolvedSelectedExprs, resolvedGroupingExprs, g.child, resolvedAggExprs)
Expand Down Expand Up @@ -1891,26 +1891,22 @@ class Analyzer(override val catalogManager: CatalogManager)
plan: LogicalPlan,
resolveColumnByName: Seq[String] => Option[Expression],
resolveColumnByOrdinal: Int => Attribute,
trimAlias: Boolean,
throws: Boolean): Expression = {
def innerResolve(e: Expression, isTopLevel: Boolean): Expression = {
if (e.resolved) return e
e match {
case f: LambdaFunction if !f.bound => f
case GetColumnByOrdinal(ordinal, _) => resolveColumnByOrdinal(ordinal)
case u @ UnresolvedAttribute(nameParts) =>
val resolved = withPosition(u) {
resolveColumnByName(nameParts)
.orElse(resolveLiteralFunction(nameParts, u, plan))
.getOrElse(u)
}
val result = resolved match {
// When trimAlias = true, we will trim unnecessary alias of `GetStructField` and
// we won't trim the alias of top-level `GetStructField`. Since we will call
// CleanupAliases later in Analyzer, trim non top-level unnecessary alias of
// `GetStructField` here is safe.
case Alias(s: GetStructField, _) if trimAlias && !isTopLevel => s
case others => others
val result = withPosition(u) {
resolveColumnByName(nameParts).map {
// We trim unnecessary alias here. Note that, we cannot trim the alias at top-level,
// as we should resolve `UnresolvedAttribute` to a named expression. The caller side
// can trim the top-level alias if it's safe to do so. Since we will call
// CleanupAliases later in Analyzer, trim non top-level unnecessary alias is safe.
case Alias(child, _) if !isTopLevel => child
case other => other
}.orElse(resolveLiteralFunction(nameParts, u, plan)).getOrElse(u)
}
logDebug(s"Resolving $u to $result")
result
Expand Down Expand Up @@ -1958,7 +1954,6 @@ class Analyzer(override val catalogManager: CatalogManager)
assert(ordinal >= 0 && ordinal < plan.output.length)
plan.output(ordinal)
},
trimAlias = false,
throws = throws)
}

Expand All @@ -1968,28 +1963,22 @@ class Analyzer(override val catalogManager: CatalogManager)
*
* @param e The expression need to be resolved.
* @param q The LogicalPlan whose children are used to resolve expression's attribute.
* @param trimAlias When true, trim unnecessary alias of GetStructField`. Note that,
* we cannot trim the alias of top-level `GetStructField`, as we should
* resolve `UnresolvedAttribute` to a named expression. The caller side
* can trim the alias of top-level `GetStructField` if it's safe to do so.
* @return resolved Expression.
*/
def resolveExpressionByPlanChildren(
e: Expression,
q: LogicalPlan,
trimAlias: Boolean = false): Expression = {
q: LogicalPlan): Expression = {
resolveExpression(
e,
q,
resolveColumnByName = nameParts => {
q.resolveChildren(nameParts, resolver)
},
resolveColumnByOrdinal = ordinal => {
val candidates = q.children.flatMap(_.output)
assert(ordinal >= 0 && ordinal < candidates.length)
candidates.apply(ordinal)
assert(q.children.length == 1)
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This change is not related to this PR, but a small followup from #31728 (comment)

assert(ordinal >= 0 && ordinal < q.children.head.output.length)
q.children.head.output(ordinal)
},
trimAlias = trimAlias,
throws = true)
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ import scala.collection.JavaConverters._
import org.apache.spark.annotation.Stable
import org.apache.spark.api.python.PythonEvalType
import org.apache.spark.broadcast.Broadcast
import org.apache.spark.sql.catalyst.analysis.{Star, UnresolvedAlias, UnresolvedAttribute, UnresolvedFunction}
import org.apache.spark.sql.catalyst.analysis.{Star, UnresolvedAlias, UnresolvedFunction}
import org.apache.spark.sql.catalyst.encoders.encoderFor
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.expressions.aggregate._
Expand Down Expand Up @@ -80,11 +80,7 @@ class RelationalGroupedDataset protected[sql](
}
}

// Wrap UnresolvedAttribute with UnresolvedAlias, as when we resolve UnresolvedAttribute, we
// will remove intermediate Alias for ExtractValue chain, and we need to alias it again to
// make it a NamedExpression.
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The comment is wrong as we don't remove top-level aliases for aggregate expressions. It causes problems as it wraps UnresolvedAttribute with UnresolvedAlias, making it not top-level anymore. Then the alias will be removed after this patch and UnresolvedAlias generates a different name.

For nested field a.b, previously the resolved expression is Alias(GetStructField(...), "b") and the Alias is not removed. UnresolvedAlias is useless and will be simply removed. So the final output column name is b. Now we remove the Alias, and UnresolvedAlias kicks in and generates a new Alias with the name a.b, which is a behavior change.

Here I simply remove this UnresolvedAlias, to make the behavior the same as before.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm, this code is pretty old, 2015.

I saw alias is also used to add alias around grouping expressions, not just aggregate expressions. Seems the comment is more for the case?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is only for Aggregate.aggregateExpressions not Aggregate.groupingExpressions. Aggregate.aggregateExpressions can include grouping expressions, but it doesn't matter. It needs to be Seq[NamedExpression] and Spark won't remove the top-level alias in it.

private[this] def alias(expr: Expression): NamedExpression = expr match {
case u: UnresolvedAttribute => UnresolvedAlias(u)
case expr: NamedExpression => expr
case a: AggregateExpression if a.aggregateFunction.isInstanceOf[TypedAggregateExpression] =>
UnresolvedAlias(a, Some(Column.generateAlias))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ struct<ID:int,NST:string>
-- !query
SELECT ID, STRUCT(ST.C as STC, ST.D as STD).STD FROM tbl_x
-- !query schema
struct<ID:int,struct(ST.C AS C AS STC, ST.D AS D AS STD).STD:string>
struct<ID:int,struct(ST.C AS STC, ST.D AS STD).STD:string>
-- !query output
1 delta
2 eta
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -344,7 +344,7 @@ class UDFSuite extends QueryTest with SharedSparkSession {
Console.withOut(outputStream) {
spark.sql("SELECT f(a._1) FROM x").show
}
assert(outputStream.toString.contains("f(a._1 AS _1)"))
assert(outputStream.toString.contains("f(a._1)"))
}
}

Expand Down