Skip to content

Commit

Permalink
[SPARK-34639][SQL][3.1] RelationalGroupedDataset.alias should not cre…
Browse files Browse the repository at this point in the history
…ate UnresolvedAlias

### What changes were proposed in this pull request?

This PR partially backports #31758 to 3.1, to fix a backward compatibility issue caused by #28490

The query below has different output schemas in 3.0 and 3.1
```
sql("select struct(1, 2) as s").groupBy(col("s.col1")).agg(first("s"))
```

In 3.0 the output column name is `col1`, in 3.1  it's `s.col1`. This breaks existing queries.

In #28490 , we changed the logic of resolving aggregate expressions. What happened is that the input nested column `s.col1` will become `UnresolvedAlias(s.col1, None)`. In `ResolveReference`, the logic used to directly resolve `s.col` to `s.col1 AS col1` but after #28490 we enter the code path with `trimAlias = true and !isTopLevel`, so the alias is removed and resulting in `s.col1`, which will then be resolved in `ResolveAliases` as `s.col1 AS s.col1`

#31758 happens to fix this issue because we no longer wrap `UnresolvedAttribute` with `UnresolvedAlias` in `RelationalGroupedDataset`.

### Why are the changes needed?

Fix an unexpected query output schema change

### Does this PR introduce _any_ user-facing change?

Yes as explained above.

### How was this patch tested?

updated test

Closes #32239 from cloud-fan/bug.

Authored-by: Wenchen Fan <[email protected]>
Signed-off-by: Takeshi Yamamuro <[email protected]>
  • Loading branch information
cloud-fan authored and maropu committed Apr 21, 2021
1 parent 034ba76 commit 5f48abe
Show file tree
Hide file tree
Showing 2 changed files with 4 additions and 5 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ import scala.collection.JavaConverters._
import org.apache.spark.annotation.Stable
import org.apache.spark.api.python.PythonEvalType
import org.apache.spark.broadcast.Broadcast
import org.apache.spark.sql.catalyst.analysis.{Star, UnresolvedAlias, UnresolvedAttribute, UnresolvedFunction}
import org.apache.spark.sql.catalyst.analysis.{Star, UnresolvedAlias, UnresolvedFunction}
import org.apache.spark.sql.catalyst.encoders.encoderFor
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.expressions.aggregate._
Expand Down Expand Up @@ -80,11 +80,7 @@ class RelationalGroupedDataset protected[sql](
}
}

// Wrap UnresolvedAttribute with UnresolvedAlias, as when we resolve UnresolvedAttribute, we
// will remove intermediate Alias for ExtractValue chain, and we need to alias it again to
// make it a NamedExpression.
private[this] def alias(expr: Expression): NamedExpression = expr match {
case u: UnresolvedAttribute => UnresolvedAlias(u)
case expr: NamedExpression => expr
case a: AggregateExpression if a.aggregateFunction.isInstanceOf[TypedAggregateExpression] =>
UnresolvedAlias(a, Some(Column.generateAlias))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3596,6 +3596,9 @@ class SQLQuerySuite extends QueryTest with SharedSparkSession with AdaptiveSpark
Row("A", null, 5) :: Row("A", "{\"i\": 1}", 3) :: Row("A", "{\"i\": 2}", 2) ::
Row("B", null, 1) :: Row("B", "{\"i\": 1}", 1) ::
Row("C", null, 3) :: Row("C", "{\"i\": 1}", 3) :: Nil)

assert(spark.table("t").groupBy($"c.json_string").count().schema.fieldNames ===
Seq("json_string", "count"))
}
}

Expand Down

0 comments on commit 5f48abe

Please sign in to comment.