From 5f48abeb37f284d5f64e783310ef2c9b4924a5a2 Mon Sep 17 00:00:00 2001 From: Wenchen Fan Date: Wed, 21 Apr 2021 09:14:11 +0900 Subject: [PATCH] [SPARK-34639][SQL][3.1] RelationalGroupedDataset.alias should not create UnresolvedAlias ### What changes were proposed in this pull request? This PR partially backports https://github.com/apache/spark/pull/31758 to 3.1, to fix a backward compatibility issue caused by https://github.com/apache/spark/pull/28490 The query below has different output schemas in 3.0 and 3.1 ``` sql("select struct(1, 2) as s").groupBy(col("s.col1")).agg(first("s")) ``` In 3.0 the output column name is `col1`, in 3.1 it's `s.col1`. This breaks existing queries. In https://github.com/apache/spark/pull/28490 , we changed the logic of resolving aggregate expressions. What happened is that the input nested column `s.col1` will become `UnresolvedAlias(s.col1, None)`. In `ResolveReference`, the logic used to directly resolve `s.col` to `s.col1 AS col1` but after #28490 we enter the code path with `trimAlias = true and !isTopLevel`, so the alias is removed and resulting in `s.col1`, which will then be resolved in `ResolveAliases` as `s.col1 AS s.col1` #31758 happens to fix this issue because we no longer wrap `UnresolvedAttribute` with `UnresolvedAlias` in `RelationalGroupedDataset`. ### Why are the changes needed? Fix an unexpected query output schema change ### Does this PR introduce _any_ user-facing change? Yes as explained above. ### How was this patch tested? updated test Closes #32239 from cloud-fan/bug. Authored-by: Wenchen Fan Signed-off-by: Takeshi Yamamuro --- .../org/apache/spark/sql/RelationalGroupedDataset.scala | 6 +----- .../src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala | 3 +++ 2 files changed, 4 insertions(+), 5 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/RelationalGroupedDataset.scala b/sql/core/src/main/scala/org/apache/spark/sql/RelationalGroupedDataset.scala index c40ce0f4777c6..7e735eecbac3e 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/RelationalGroupedDataset.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/RelationalGroupedDataset.scala @@ -24,7 +24,7 @@ import scala.collection.JavaConverters._ import org.apache.spark.annotation.Stable import org.apache.spark.api.python.PythonEvalType import org.apache.spark.broadcast.Broadcast -import org.apache.spark.sql.catalyst.analysis.{Star, UnresolvedAlias, UnresolvedAttribute, UnresolvedFunction} +import org.apache.spark.sql.catalyst.analysis.{Star, UnresolvedAlias, UnresolvedFunction} import org.apache.spark.sql.catalyst.encoders.encoderFor import org.apache.spark.sql.catalyst.expressions._ import org.apache.spark.sql.catalyst.expressions.aggregate._ @@ -80,11 +80,7 @@ class RelationalGroupedDataset protected[sql]( } } - // Wrap UnresolvedAttribute with UnresolvedAlias, as when we resolve UnresolvedAttribute, we - // will remove intermediate Alias for ExtractValue chain, and we need to alias it again to - // make it a NamedExpression. private[this] def alias(expr: Expression): NamedExpression = expr match { - case u: UnresolvedAttribute => UnresolvedAlias(u) case expr: NamedExpression => expr case a: AggregateExpression if a.aggregateFunction.isInstanceOf[TypedAggregateExpression] => UnresolvedAlias(a, Some(Column.generateAlias)) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala index f88eee7b0c2b8..6df57b1918d2d 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala @@ -3596,6 +3596,9 @@ class SQLQuerySuite extends QueryTest with SharedSparkSession with AdaptiveSpark Row("A", null, 5) :: Row("A", "{\"i\": 1}", 3) :: Row("A", "{\"i\": 2}", 2) :: Row("B", null, 1) :: Row("B", "{\"i\": 1}", 1) :: Row("C", null, 3) :: Row("C", "{\"i\": 1}", 3) :: Nil) + + assert(spark.table("t").groupBy($"c.json_string").count().schema.fieldNames === + Seq("json_string", "count")) } }