-
Notifications
You must be signed in to change notification settings - Fork 28.5k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[SPARK-31670][SQL] Trim unnecessary Struct field alias in Aggregate/GroupingSets #28490
Conversation
Test build #122477 has finished for PR 28490 at commit
|
cc @maropu Can you have a review ? |
def isPartOfAggregation(e: Expression): Boolean = { | ||
aggsBuffer.exists(a => a.find(_ eq e).isDefined) | ||
gid: Attribute): Seq[NamedExpression] = { | ||
val resolvedGroupByAliases = groupByAliases.map(_.transformDown { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Probably, we should not fix this issue in ResolveGroupingAnalytics
, but in ResolveRefences
just like this;
object ResolveReferences extends Rule[LogicalPlan] {
...
def apply(plan: LogicalPlan): LogicalPlan = plan.resolveOperatorsUp {
case p: LogicalPlan if !p.childrenResolved => p
....
case a @ Aggregate(groupingExprs, aggExprs, _) if both `aggExprs` and `aggExprs` have the same struct field =>
val newAgg = resolve expressions so that they have the same exprIds
newAgg
A root cause seems to be that ResolveReferences
assigns different exprIds to each#30.json_string AS json_string
s (#31
vs #32
);
20/06/03 16:22:47 WARN HiveSessionStateBuilder$$anon$1:
=== Applying Rule org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveReferences ===
!'Aggregate [cube('a, 'get_json_object('each.json_string, $.iType))], ['a, 'coalesce('get_json_object('each.json_string, $.iType), -127) AS iType#29, unresolvedalias('sum('b), None)]
+- Generate explode(c#4), false, x, [each#30]
+- SubqueryAlias t
+- Project [x AS a#2, 1 AS b#3, array(named_struct(row_id, 1, json_string, y)) AS c#4]
+- Range (0, 1, step=1, splits=Some(4))
'Aggregate [cube(a#2, 'get_json_object(each#30.json_string AS json_string#31, $.iType))], [a#2, 'coalesce('get_json_object(each#30.json_string AS json_string#32, $.iType), -127) AS iType#29, unresolvedalias('sum(b#3), None)]
+- Generate explode(c#4), false, x, [each#30]
+- SubqueryAlias t
+- Project [x AS a#2, 1 AS b#3, array(named_struct(row_id, 1, json_string, y)) AS c#4]
+- Range (0, 1, step=1, splits=Some(4))
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
cc: @cloud-fan @viirya
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Based on above analysis, seems it is caused by ResolveReferences
, instead of ResolveGroupingAnalytics
? Can we possibly have a reproducible case without CUBE?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Based on above analysis, seems it is caused by
ResolveReferences
, instead ofResolveGroupingAnalytics
? Can we possibly have a reproducible case without CUBE?
This happened with cube when construct cube LogicalPlan.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The shown query plan above seems before ResolveGroupingAnalytics
? So without CUBE is it possible to encounter similar issue? @maropu
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
A root cause seems to be that
ResolveReferences
assigns different exprIds toeach#30.json_string AS json_string
s (#31
vs#32
);
Yea, StructGetFiled will construct with a alias in ResolveReference and got different ExprID
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Probably, we should not fix this issue in
ResolveGroupingAnalytics
, but inResolveRefences
just like this;object ResolveReferences extends Rule[LogicalPlan] { ... def apply(plan: LogicalPlan): LogicalPlan = plan.resolveOperatorsUp { case p: LogicalPlan if !p.childrenResolved => p .... case a @ Aggregate(groupingExprs, aggExprs, _) if both `aggExprs` and `aggExprs` have the same struct field => val newAgg = resolve expressions so that they have the same exprIds newAgg
Also having clause.. may have this problem. I didn't thinking enough case.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The shown query plan above seems before ResolveGroupingAnalytics? So without CUBE is it possible to encounter similar issue? @maropu
Yea, all the cases have the same issue;
scala> spark.range(1).selectExpr("'x' AS a", "1 AS b", "array(named_struct('row_id', 1, 'json_string', 'y')) AS c").createOrReplaceTempView("t")
// ROLLUP
scala> sql("""
| select a, coalesce(get_json_object(each.json_string,'$.iType'),'-127') as iType, sum(b)
| from t
| LATERAL VIEW explode(c) x AS each
| group by a, get_json_object(each.json_string,'$.iType')
| with rollup
| """).show()
org.apache.spark.sql.AnalysisException: expression 'x.`each`' is neither present in the group by, nor is it an aggregate function. Add to group by or wrap in first() (or first_value) if you don't care which value you get.;;
Aggregate [a#17, get_json_object(each#9.json_string AS json_string#10, $.iType)#18, spark_grouping_id#16L], [a#17, coalesce(get_json_object(each#9.json_string, $.iType), -127) AS iType#8, sum(cast(b#3 as bigint)) AS sum(b)#13L]
+- Expand [ArrayBuffer(a#2, b#3, c#4, each#9, a#14, get_json_object(each#9.json_string AS json_string#10, $.iType)#15, 0), ArrayBuffer(a#2, b#3, c#4, each#9, a#14, null, 1), ArrayBuffer(a#2, b#3, c#4, each#9, null, null, 3)], [a#2, b#3, c#4, each#9, a#17, get_json_object(each#9.json_string AS json_string#10, $.iType)#18, spark_grouping_id#16L]
+- Project [a#2, b#3, c#4, each#9, a#2 AS a#14, get_json_object(each#9.json_string, $.iType) AS get_json_object(each#9.json_string AS json_string#10, $.iType)#15]
+- Generate explode(c#4), false, x, [each#9]
+- SubqueryAlias t
+- Project [x AS a#2, 1 AS b#3, array(named_struct(row_id, 1, json_string, y)) AS c#4]
+- Range (0, 1, step=1, splits=Some(4))
// GROUPING SETS
scala> sql("""
| select a, coalesce(get_json_object(each.json_string,'$.iType'),'-127') as iType, sum(b)
| from t
| LATERAL VIEW explode(c) x AS each
| group by grouping sets((a, get_json_object(each.json_string,'$.iType')))
| """).show()
org.apache.spark.sql.AnalysisException: expression 'x.`each`' is neither present in the group by, nor is it an aggregate function. Add to group by or wrap in first() (or first_value) if you don't care which value you get.;;
Aggregate [a#28, get_json_object(each#20.json_string AS json_string#21, $.iType)#29, spark_grouping_id#27L], [a#28, coalesce(get_json_object(each#20.json_string, $.iType), -127) AS iType#19, sum(cast(b#3 as bigint)) AS sum(b)#24L]
+- Expand [ArrayBuffer(a#2, b#3, c#4, each#20, a#25, get_json_object(each#20.json_string AS json_string#21, $.iType)#26, 0)], [a#2, b#3, c#4, each#20, a#28, get_json_object(each#20.json_string AS json_string#21, $.iType)#29, spark_grouping_id#27L]
+- Project [a#2, b#3, c#4, each#20, a#2 AS a#25, get_json_object(each#20.json_string, $.iType) AS get_json_object(each#20.json_string AS json_string#21, $.iType)#26]
+- Generate explode(c#4), false, x, [each#20]
+- SubqueryAlias t
+- Project [x AS a#2, 1 AS b#3, array(named_struct(row_id, 1, json_string, y)) AS c#4]
+- Range (0, 1, step=1, splits=Some(4))
@@ -3495,6 +3495,59 @@ class SQLQuerySuite extends QueryTest with SharedSparkSession with AdaptiveSpark | |||
assert(df4.schema.head.name === "randn(1)") | |||
checkIfSeedExistsInExplain(df2) | |||
} | |||
|
|||
test("SPARK-31670: Struct Field in groupByExpr with CUBE") { | |||
withTable("t1") { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: t1
-> t
|c array<struct<row_id:int,json_string:string>>, | ||
|d array<array<string>>, | ||
|e array<map<string, int>>) | ||
|using orc""".stripMargin) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please use a temp view for test performance. Also, its better to add some rows for answer checks in this test table instead of the current null table.
checkAnswer( | ||
sql( | ||
""" | ||
|select a, each.json_string, sum(b) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: Could you use uppercases for the SQL keywords where possible?
|from t1 | ||
|LATERAL VIEW explode(c) x AS each | ||
|group by a, each.json_string | ||
|with cube |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could you check the other analytics grouping, too, e.g., GROUPING SETS and ROLLUP?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could you check the other analytics grouping, too, e.g., GROUPING SETS and ROLLUP?
emmm grouping sets still have problem.
Test build #123516 has finished for PR 28490 at commit
|
Test build #123589 has finished for PR 28490 at commit
|
@@ -510,10 +510,12 @@ class Analyzer( | |||
// collect all the found AggregateExpression, so we can check an expression is part of | |||
// any AggregateExpression or not. | |||
val aggsBuffer = ArrayBuffer[Expression]() | |||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: plz revert the unencessary changes. (Unrelated changes might lead to revert/backport failures sometimes...)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: plz revert the unencessary changes. (Unrelated changes might lead to revert/backport failures sometimes...)
Sorry..., forgot to check diff.
|c ARRAY<STRUCT<row_id:INT,json_string:STRING>>, | ||
|d ARRAY<ARRAY<STRING>>, | ||
|e ARRAY<MAP<STRING, INT>>) | ||
|USING ORC""".stripMargin) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
See: #28490 (comment)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
See: #28490 (comment)
Test case change to view and with actual data.
@@ -3496,6 +3496,88 @@ class SQLQuerySuite extends QueryTest with SharedSparkSession with AdaptiveSpark | |||
checkIfSeedExistsInExplain(df2) | |||
} | |||
|
|||
test("SPARK-31670: Struct Field in groupByExpr with CUBE") { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could you please make the title more correct? I think we don't need the word with CUBE
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could you please make the title more correct? I think we don't need the word
with CUBE
.
It's ok for current PR title?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yea, looks fine.
|LATERAL VIEW EXPLODE(c) X AS each | ||
|GROUP BY a, each.json_string | ||
|GROUPING sets((a),(a, each.json_string)) | ||
|""".stripMargin), Nil) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could you add tests having queries with HAVING
clauses?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could you add tests having queries with
HAVING
clauses?
Seems I make a mistake, having won't have duplicate field since it won't have grouping keys in having condition
""" | ||
|SELECT a, each.json_string AS json_string, SUM(b) | ||
|FROM t | ||
|LATERAL VIEW EXPLODE(c) x AS each |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
btw, we must need lateral view
to reproduce this issue? I mean, this issue cannot happen without lateral view
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
btw, we must need
lateral view
to reproduce this issue? I mean, this issue cannot happen withoutlateral view
?
No, I changed.
case q: LogicalPlan => | ||
logTrace(s"Attempting to resolve ${q.simpleString(SQLConf.get.maxToStringFields)}") | ||
q.mapExpressions(resolveExpressionTopDown(_, q)) | ||
} | ||
|
||
def needResolveStructField(plan: LogicalPlan): Boolean = { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
private
} | ||
} | ||
|
||
def containSameStructFields( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
private
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It is not enough just to check if both sides (grpExprs
and aggExprs
) have struct fields here? We need to confirm the identity by using unresolved attributes?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It is not enough just to check if both sides (
grpExprs
andaggExprs
) have struct fields here? We need to confirm the identity by using unresolved attributes?
Yea, hear attribute is still unresolved attributes.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It is not enough just to check if both sides (
grpExprs
andaggExprs
) have struct fields here?
GroupingSets need check selected cols too, so all need to check.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think its better to avoid comparing unresolved attributes.... could we resolve them then detect the mismatched exprIds?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think its better to avoid comparing unresolved attributes.... could we resolve them then detect the mismatched exprIds?
How about current? not check, just reoslve.
} | ||
|
||
def containSameStructFields( | ||
grpExprs: Seq[Attribute], |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: grpExprs
-> groupExprs
for consistency.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit:
grpExprs
->groupExprs
for consistency.
Done
case p: LogicalPlan if needResolveStructField(p) => | ||
logTrace(s"Attempting to resolve ${p.simpleString(SQLConf.get.maxToStringFields)}") | ||
val resolved = p.mapExpressions(resolveExpressionTopDown(_, p)) | ||
val structFieldMap = new mutable.HashMap[String, Alias] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
mutable.HashMap
-> mutable.Map
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
mutable.HashMap
->mutable.Map
Done
Test build #123597 has finished for PR 28490 at commit
|
Test build #123595 has finished for PR 28490 at commit
|
retest this please |
@@ -1259,6 +1259,11 @@ class Analyzer( | |||
attr.withExprId(exprId) | |||
} | |||
|
|||
private def dedupStructField(attr: Alias, structFieldMap: Map[String, Attribute]) = { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Not used now?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Not used now?
Oh, yea
@@ -1481,7 +1486,35 @@ class Analyzer( | |||
|
|||
case q: LogicalPlan => | |||
logTrace(s"Attempting to resolve ${q.simpleString(SQLConf.get.maxToStringFields)}") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could you write this handling in an independent patten like this?
case agg @ (_: Aggregate | _: GroupingSets) =>
val resolved = agg.mapExpressions(resolveExpressionTopDown(_, agg))
val structFieldMap = mutable.Map[String, Alias]()
resolved.transformExpressionsDown {
// Plz describe some comments why we need this handling...
case a @ Alias(struct: GetStructField, _) =>
if (structFieldMap.contains(struct.sql)) {
val exprId = structFieldMap.getOrElse(struct.sql, a).exprId
Alias(a.child, a.name)(exprId, a.qualifier, a.explicitMetadata)
} else {
structFieldMap += (struct.sql -> a)
a
}
case e => e
}
case q: LogicalPlan =>
logTrace(s"Attempting to resolve ${q.simpleString(SQLConf.get.maxToStringFields)}")
q.mapExpressions(resolveExpressionTopDown(_, q))
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
w/ some code cleanup;
case agg @ (_: Aggregate | _: GroupingSets) =>
val resolved = agg.mapExpressions(resolveExpressionTopDown(_, agg))
val hasStructField = resolved.expressions.exists {
_.collectFirst { case gsf: GetStructField => gsf }.isDefined
}
if (hasStructField) {
// Plz describe some comments why we need this handling...
val structFieldMap = mutable.Map[String, Alias]()
resolved.transformExpressionsDown {
case a @ Alias(struct: GetStructField, _) =>
if (structFieldMap.contains(struct.sql)) {
val exprId = structFieldMap.getOrElse(struct.sql, a).exprId
Alias(a.child, a.name)(exprId, a.qualifier, a.explicitMetadata)
} else {
structFieldMap += (struct.sql -> a)
a
}
}
} else {
resolved
}
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
w/ some code cleanup;
Done
Could you check this? @cloud-fan @viirya |
Test build #123603 has finished for PR 28490 at commit
|
Test build #123605 has finished for PR 28490 at commit
|
Test build #127288 has finished for PR 28490 at commit
|
// trim Alias over top-level GetStructField | ||
case Alias(s: GetStructField, _) => s | ||
case other => other | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think we can add comment explaining because these expressions are not named expressions originally, we can safely trim top-level Alias.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think we can add comment explaining because these expressions are not named expressions originally, we can safely trim top-level Alias.
Extract this part as a func and add comment. cc @cloud-fan
val result = resolved match { | ||
case Alias(s: GetStructField, _) if trimAlias && !isTopLevel => s | ||
case others => others | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actually I'm wondering if there is any cases we don't want to trim nested (i.e., non top-level) Alias
? Such Alias
is useless and could possibly cause unexpected issue.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actually I'm wondering if there is any cases we don't want to trim nested (i.e., non top-level)
Alias
? SuchAlias
is useless and could possibly cause unexpected issue.
Since we will call CleanupAlias later in Analyzer, any way this field will be trimmed, but if we don't handle it here, we can't pass ResolveGroupingAnalytics.constructAggregateExprs()
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
As we don't merge this yet, can you also add a comment here?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
As we don't merge this yet, can you also add a comment here?
Yea
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
As we don't merge this yet, can you also add a comment here?
See the comment I added just now. cc @cloud-fan
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Just minor comments, otherwise LGTM.
Thanks for updating the PR description. It looks better now. |
Test build #128151 has finished for PR 28490 at commit
|
Test build #128170 has finished for PR 28490 at commit
|
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
Outdated
Show resolved
Hide resolved
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
Outdated
Show resolved
Hide resolved
// names leading to ambiguous references exception. | ||
case a @ Aggregate(groupingExprs, aggExprs, appendColumns: AppendColumns) => | ||
a.mapExpressions(resolveExpressionTopDown(_, appendColumns)) | ||
case a: Aggregate => |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We need a high-level comment to explain why we trim alias here, e.g.
// SPARK-31670: ...
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We need a high-level comment to explain why we trim alias here, e.g.
// [SPARK-31670](https://issues.apache.org/jira/browse/SPARK-31670): ...
Done
Test build #128184 has finished for PR 28490 at commit
|
retest this please |
Test build #128181 has finished for PR 28490 at commit
|
Test build #128188 has finished for PR 28490 at commit
|
retest this please |
Test build #128195 has finished for PR 28490 at commit
|
thanks, merging to master! |
// names leading to ambiguous references exception. | ||
case a @ Aggregate(groupingExprs, aggExprs, appendColumns: AppendColumns) => | ||
a.mapExpressions(resolveExpressionTopDown(_, appendColumns)) | ||
// SPARK-31607: Resolve Struct field in groupByExpressions and aggregateExpressions |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In the comment, SPARK-31607 seems to be a typo of SPARK-31670 because SPARK-31607 is Improve the perf of CTESubstitution
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In the comment, SPARK-31607 seems to be a typo of SPARK-31670 because SPARK-31607 is
Improve the perf of CTESubstitution
.
Yea,sorry for my mistake
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Never mind~, @AngersZhuuuu . :)
### What changes were proposed in this pull request? This PR fixes incorrect JIRA ids in `Analyzer.scala` introduced by SPARK-31670 (#28490) ```scala - // SPARK-31607: Resolve Struct field in selectedGroupByExprs/groupByExprs and aggregations + // SPARK-31670: Resolve Struct field in selectedGroupByExprs/groupByExprs and aggregations ``` ### Why are the changes needed? Fix the wrong information. ### Does this PR introduce _any_ user-facing change? No. ### How was this patch tested? This is a comment change. Manually review. Closes #30269 from dongjoon-hyun/SPARK-31670-MINOR. Authored-by: Dongjoon Hyun <[email protected]> Signed-off-by: HyukjinKwon <[email protected]>
…ate UnresolvedAlias ### What changes were proposed in this pull request? This PR partially backports #31758 to 3.1, to fix a backward compatibility issue caused by #28490 The query below has different output schemas in 3.0 and 3.1 ``` sql("select struct(1, 2) as s").groupBy(col("s.col1")).agg(first("s")) ``` In 3.0 the output column name is `col1`, in 3.1 it's `s.col1`. This breaks existing queries. In #28490 , we changed the logic of resolving aggregate expressions. What happened is that the input nested column `s.col1` will become `UnresolvedAlias(s.col1, None)`. In `ResolveReference`, the logic used to directly resolve `s.col` to `s.col1 AS col1` but after #28490 we enter the code path with `trimAlias = true and !isTopLevel`, so the alias is removed and resulting in `s.col1`, which will then be resolved in `ResolveAliases` as `s.col1 AS s.col1` #31758 happens to fix this issue because we no longer wrap `UnresolvedAttribute` with `UnresolvedAlias` in `RelationalGroupedDataset`. ### Why are the changes needed? Fix an unexpected query output schema change ### Does this PR introduce _any_ user-facing change? Yes as explained above. ### How was this patch tested? updated test Closes #32239 from cloud-fan/bug. Authored-by: Wenchen Fan <[email protected]> Signed-off-by: Takeshi Yamamuro <[email protected]>
…ate UnresolvedAlias ### What changes were proposed in this pull request? This PR partially backports apache#31758 to 3.1, to fix a backward compatibility issue caused by apache#28490 The query below has different output schemas in 3.0 and 3.1 ``` sql("select struct(1, 2) as s").groupBy(col("s.col1")).agg(first("s")) ``` In 3.0 the output column name is `col1`, in 3.1 it's `s.col1`. This breaks existing queries. In apache#28490 , we changed the logic of resolving aggregate expressions. What happened is that the input nested column `s.col1` will become `UnresolvedAlias(s.col1, None)`. In `ResolveReference`, the logic used to directly resolve `s.col` to `s.col1 AS col1` but after apache#28490 we enter the code path with `trimAlias = true and !isTopLevel`, so the alias is removed and resulting in `s.col1`, which will then be resolved in `ResolveAliases` as `s.col1 AS s.col1` apache#31758 happens to fix this issue because we no longer wrap `UnresolvedAttribute` with `UnresolvedAlias` in `RelationalGroupedDataset`. ### Why are the changes needed? Fix an unexpected query output schema change ### Does this PR introduce _any_ user-facing change? Yes as explained above. ### How was this patch tested? updated test Closes apache#32239 from cloud-fan/bug. Authored-by: Wenchen Fan <[email protected]> Signed-off-by: Takeshi Yamamuro <[email protected]>
…ate UnresolvedAlias ### What changes were proposed in this pull request? This PR partially backports apache#31758 to 3.1, to fix a backward compatibility issue caused by apache#28490 The query below has different output schemas in 3.0 and 3.1 ``` sql("select struct(1, 2) as s").groupBy(col("s.col1")).agg(first("s")) ``` In 3.0 the output column name is `col1`, in 3.1 it's `s.col1`. This breaks existing queries. In apache#28490 , we changed the logic of resolving aggregate expressions. What happened is that the input nested column `s.col1` will become `UnresolvedAlias(s.col1, None)`. In `ResolveReference`, the logic used to directly resolve `s.col` to `s.col1 AS col1` but after apache#28490 we enter the code path with `trimAlias = true and !isTopLevel`, so the alias is removed and resulting in `s.col1`, which will then be resolved in `ResolveAliases` as `s.col1 AS s.col1` apache#31758 happens to fix this issue because we no longer wrap `UnresolvedAttribute` with `UnresolvedAlias` in `RelationalGroupedDataset`. ### Why are the changes needed? Fix an unexpected query output schema change ### Does this PR introduce _any_ user-facing change? Yes as explained above. ### How was this patch tested? updated test Closes apache#32239 from cloud-fan/bug. Authored-by: Wenchen Fan <[email protected]> Signed-off-by: Takeshi Yamamuro <[email protected]>
What changes were proposed in this pull request?
Struct field both in GROUP BY and Aggregate Expresison with CUBE/ROLLUP/GROUPING SET will failed when analysis.
Error
For Struct type Field, when we resolve it, it will construct with Alias. When struct field in GROUP BY with CUBE/ROLLUP etc, struct field in groupByExpression and aggregateExpression will be resolved with different exprId as below
This makes
ResolveGroupingAnalytics.constructAggregateExprs()
failed to replace aggreagteExpression use expand groupByExpression attribute since there exprId is not same. then error happened.Why are the changes needed?
Fix analyze bug
Does this PR introduce any user-facing change?
NO
How was this patch tested?
Added UT