Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-31670][SQL] Trim unnecessary Struct field alias in Aggregate/GroupingSets #28490

Closed
wants to merge 28 commits into from

Conversation

AngersZhuuuu
Copy link
Contributor

@AngersZhuuuu AngersZhuuuu commented May 10, 2020

What changes were proposed in this pull request?

Struct field both in GROUP BY and Aggregate Expresison with CUBE/ROLLUP/GROUPING SET will failed when analysis.

test("SPARK-31670") {
  withTable("t1") {
      sql(
        """
          |CREATE TEMPORARY VIEW t(a, b, c) AS
          |SELECT * FROM VALUES
          |('A', 1, NAMED_STRUCT('row_id', 1, 'json_string', '{"i": 1}')),
          |('A', 2, NAMED_STRUCT('row_id', 2, 'json_string', '{"i": 1}')),
          |('A', 2, NAMED_STRUCT('row_id', 2, 'json_string', '{"i": 2}')),
          |('B', 1, NAMED_STRUCT('row_id', 3, 'json_string', '{"i": 1}')),
          |('C', 3, NAMED_STRUCT('row_id', 4, 'json_string', '{"i": 1}'))
        """.stripMargin)

      checkAnswer(
        sql(
          """
            |SELECT a, c.json_string, SUM(b)
            |FROM t
            |GROUP BY a, c.json_string
            |WITH CUBE
            |""".stripMargin),
        Row("A", "{\"i\": 1}", 3) :: Row("A", "{\"i\": 2}", 2) :: Row("A", null, 5) ::
          Row("B", "{\"i\": 1}", 1) :: Row("B", null, 1) ::
          Row("C", "{\"i\": 1}", 3) :: Row("C", null, 3) ::
          Row(null, "{\"i\": 1}", 7) :: Row(null, "{\"i\": 2}", 2) :: Row(null, null, 9) :: Nil)

  }
}

Error 

[info] - SPARK-31670 *** FAILED *** (2 seconds, 857 milliseconds)
[info]   Failed to analyze query: org.apache.spark.sql.AnalysisException: expression 't.`c`' is neither present in the group by, nor is it an aggregate function. Add to group by or wrap in first() (or first_value) if you don't care which value you get.;;
[info]   Aggregate [a#247, json_string#248, spark_grouping_id#246L], [a#247, c#223.json_string AS json_string#241, sum(cast(b#222 as bigint)) AS sum(b)#243L]
[info]   +- Expand [List(a#221, b#222, c#223, a#244, json_string#245, 0), List(a#221, b#222, c#223, a#244, null, 1), List(a#221, b#222, c#223, null, json_string#245, 2), List(a#221, b#222, c#223, null, null, 3)], [a#221, b#222, c#223, a#247, json_string#248, spark_grouping_id#246L]
[info]      +- Project [a#221, b#222, c#223, a#221 AS a#244, c#223.json_string AS json_string#245]
[info]         +- SubqueryAlias t
[info]            +- Project [col1#218 AS a#221, col2#219 AS b#222, col3#220 AS c#223]
[info]               +- Project [col1#218, col2#219, col3#220]
[info]                  +- LocalRelation [col1#218, col2#219, col3#220]
[info]

For Struct type Field, when we resolve it, it will construct with Alias. When struct field in GROUP BY with CUBE/ROLLUP etc, struct field in groupByExpression and aggregateExpression will be resolved with different exprId as below

'Aggregate [cube(a#221, c#223.json_string AS json_string#240)], [a#221, c#223.json_string AS json_string#241, sum(cast(b#222 as bigint)) AS sum(b)#243L]
+- SubqueryAlias t
   +- Project [col1#218 AS a#221, col2#219 AS b#222, col3#220 AS c#223]
      +- Project [col1#218, col2#219, col3#220]
         +- LocalRelation [col1#218, col2#219, col3#220]

This makes ResolveGroupingAnalytics.constructAggregateExprs() failed to replace aggreagteExpression use expand groupByExpression attribute since there exprId is not same. then error happened.

Why are the changes needed?

Fix analyze bug

Does this PR introduce any user-facing change?

NO

How was this patch tested?

Added UT

@SparkQA
Copy link

SparkQA commented May 10, 2020

Test build #122477 has finished for PR 28490 at commit 27c495b.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@AngersZhuuuu AngersZhuuuu changed the title [SPARK-31670][WIP]Struct Field in groupByExpr with CUBE [SPARK-31670][SQL]Struct Field in groupByExpr with CUBE May 21, 2020
@AngersZhuuuu
Copy link
Contributor Author

cc @maropu Can you have a review ?

def isPartOfAggregation(e: Expression): Boolean = {
aggsBuffer.exists(a => a.find(_ eq e).isDefined)
gid: Attribute): Seq[NamedExpression] = {
val resolvedGroupByAliases = groupByAliases.map(_.transformDown {
Copy link
Member

@maropu maropu Jun 3, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Probably, we should not fix this issue in ResolveGroupingAnalytics, but in ResolveRefences just like this;

object ResolveReferences extends Rule[LogicalPlan] {
    ...
    def apply(plan: LogicalPlan): LogicalPlan = plan.resolveOperatorsUp {
      case p: LogicalPlan if !p.childrenResolved => p
      ....

      case a @ Aggregate(groupingExprs, aggExprs, _) if both `aggExprs` and `aggExprs` have the same struct field =>
       val newAgg = resolve expressions so that they have the same exprIds
       newAgg

A root cause seems to be that ResolveReferences assigns different exprIds to each#30.json_string AS json_strings (#31 vs #32);

20/06/03 16:22:47 WARN HiveSessionStateBuilder$$anon$1: 
=== Applying Rule org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveReferences ===
!'Aggregate [cube('a, 'get_json_object('each.json_string, $.iType))], ['a, 'coalesce('get_json_object('each.json_string, $.iType), -127) AS iType#29, unresolvedalias('sum('b), None)]
   +- Generate explode(c#4), false, x, [each#30]
      +- SubqueryAlias t
         +- Project [x AS a#2, 1 AS b#3, array(named_struct(row_id, 1, json_string, y)) AS c#4]
            +- Range (0, 1, step=1, splits=Some(4))

'Aggregate [cube(a#2, 'get_json_object(each#30.json_string AS json_string#31, $.iType))], [a#2, 'coalesce('get_json_object(each#30.json_string AS json_string#32, $.iType), -127) AS iType#29, unresolvedalias('sum(b#3), None)]
   +- Generate explode(c#4), false, x, [each#30]
      +- SubqueryAlias t
         +- Project [x AS a#2, 1 AS b#3, array(named_struct(row_id, 1, json_string, y)) AS c#4]
            +- Range (0, 1, step=1, splits=Some(4))

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Based on above analysis, seems it is caused by ResolveReferences, instead of ResolveGroupingAnalytics? Can we possibly have a reproducible case without CUBE?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Based on above analysis, seems it is caused by ResolveReferences, instead of ResolveGroupingAnalytics? Can we possibly have a reproducible case without CUBE?

This happened with cube when construct cube LogicalPlan.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The shown query plan above seems before ResolveGroupingAnalytics? So without CUBE is it possible to encounter similar issue? @maropu

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A root cause seems to be that ResolveReferences assigns different exprIds to each#30.json_string AS json_strings (#31 vs #32);

Yea, StructGetFiled will construct with a alias in ResolveReference and got different ExprID

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Probably, we should not fix this issue in ResolveGroupingAnalytics, but in ResolveRefences just like this;

object ResolveReferences extends Rule[LogicalPlan] {
    ...
    def apply(plan: LogicalPlan): LogicalPlan = plan.resolveOperatorsUp {
      case p: LogicalPlan if !p.childrenResolved => p
      ....

      case a @ Aggregate(groupingExprs, aggExprs, _) if both `aggExprs` and `aggExprs` have the same struct field =>
       val newAgg = resolve expressions so that they have the same exprIds
       newAgg

Also having clause.. may have this problem. I didn't thinking enough case.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The shown query plan above seems before ResolveGroupingAnalytics? So without CUBE is it possible to encounter similar issue? @maropu

Yea, all the cases have the same issue;

scala> spark.range(1).selectExpr("'x' AS a", "1 AS b", "array(named_struct('row_id', 1, 'json_string', 'y')) AS c").createOrReplaceTempView("t")

// ROLLUP
scala> sql("""
     |   select a, coalesce(get_json_object(each.json_string,'$.iType'),'-127') as iType, sum(b)
     |   from t
     |   LATERAL VIEW explode(c) x AS each
     |   group by a, get_json_object(each.json_string,'$.iType')
     |   with rollup
     | """).show()
org.apache.spark.sql.AnalysisException: expression 'x.`each`' is neither present in the group by, nor is it an aggregate function. Add to group by or wrap in first() (or first_value) if you don't care which value you get.;;
Aggregate [a#17, get_json_object(each#9.json_string AS json_string#10, $.iType)#18, spark_grouping_id#16L], [a#17, coalesce(get_json_object(each#9.json_string, $.iType), -127) AS iType#8, sum(cast(b#3 as bigint)) AS sum(b)#13L]
+- Expand [ArrayBuffer(a#2, b#3, c#4, each#9, a#14, get_json_object(each#9.json_string AS json_string#10, $.iType)#15, 0), ArrayBuffer(a#2, b#3, c#4, each#9, a#14, null, 1), ArrayBuffer(a#2, b#3, c#4, each#9, null, null, 3)], [a#2, b#3, c#4, each#9, a#17, get_json_object(each#9.json_string AS json_string#10, $.iType)#18, spark_grouping_id#16L]
   +- Project [a#2, b#3, c#4, each#9, a#2 AS a#14, get_json_object(each#9.json_string, $.iType) AS get_json_object(each#9.json_string AS json_string#10, $.iType)#15]
      +- Generate explode(c#4), false, x, [each#9]
         +- SubqueryAlias t
            +- Project [x AS a#2, 1 AS b#3, array(named_struct(row_id, 1, json_string, y)) AS c#4]
               +- Range (0, 1, step=1, splits=Some(4))

// GROUPING SETS
scala> sql("""
     |   select a, coalesce(get_json_object(each.json_string,'$.iType'),'-127') as iType, sum(b)
     |   from t
     |   LATERAL VIEW explode(c) x AS each
     |   group by grouping sets((a, get_json_object(each.json_string,'$.iType')))
     | """).show()
org.apache.spark.sql.AnalysisException: expression 'x.`each`' is neither present in the group by, nor is it an aggregate function. Add to group by or wrap in first() (or first_value) if you don't care which value you get.;;
Aggregate [a#28, get_json_object(each#20.json_string AS json_string#21, $.iType)#29, spark_grouping_id#27L], [a#28, coalesce(get_json_object(each#20.json_string, $.iType), -127) AS iType#19, sum(cast(b#3 as bigint)) AS sum(b)#24L]
+- Expand [ArrayBuffer(a#2, b#3, c#4, each#20, a#25, get_json_object(each#20.json_string AS json_string#21, $.iType)#26, 0)], [a#2, b#3, c#4, each#20, a#28, get_json_object(each#20.json_string AS json_string#21, $.iType)#29, spark_grouping_id#27L]
   +- Project [a#2, b#3, c#4, each#20, a#2 AS a#25, get_json_object(each#20.json_string, $.iType) AS get_json_object(each#20.json_string AS json_string#21, $.iType)#26]
      +- Generate explode(c#4), false, x, [each#20]
         +- SubqueryAlias t
            +- Project [x AS a#2, 1 AS b#3, array(named_struct(row_id, 1, json_string, y)) AS c#4]
               +- Range (0, 1, step=1, splits=Some(4))

@@ -3495,6 +3495,59 @@ class SQLQuerySuite extends QueryTest with SharedSparkSession with AdaptiveSpark
assert(df4.schema.head.name === "randn(1)")
checkIfSeedExistsInExplain(df2)
}

test("SPARK-31670: Struct Field in groupByExpr with CUBE") {
withTable("t1") {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: t1 -> t

|c array<struct<row_id:int,json_string:string>>,
|d array<array<string>>,
|e array<map<string, int>>)
|using orc""".stripMargin)
Copy link
Member

@maropu maropu Jun 3, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please use a temp view for test performance. Also, its better to add some rows for answer checks in this test table instead of the current null table.

checkAnswer(
sql(
"""
|select a, each.json_string, sum(b)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: Could you use uppercases for the SQL keywords where possible?

|from t1
|LATERAL VIEW explode(c) x AS each
|group by a, each.json_string
|with cube
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you check the other analytics grouping, too, e.g., GROUPING SETS and ROLLUP?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you check the other analytics grouping, too, e.g., GROUPING SETS and ROLLUP?

emmm grouping sets still have problem.

@SparkQA
Copy link

SparkQA commented Jun 4, 2020

Test build #123516 has finished for PR 28490 at commit 4c0b04c.

  • This patch fails Scala style tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Jun 6, 2020

Test build #123589 has finished for PR 28490 at commit c4ff823.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@@ -510,10 +510,12 @@ class Analyzer(
// collect all the found AggregateExpression, so we can check an expression is part of
// any AggregateExpression or not.
val aggsBuffer = ArrayBuffer[Expression]()

Copy link
Member

@maropu maropu Jun 7, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: plz revert the unencessary changes. (Unrelated changes might lead to revert/backport failures sometimes...)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: plz revert the unencessary changes. (Unrelated changes might lead to revert/backport failures sometimes...)

Sorry..., forgot to check diff.

|c ARRAY<STRUCT<row_id:INT,json_string:STRING>>,
|d ARRAY<ARRAY<STRING>>,
|e ARRAY<MAP<STRING, INT>>)
|USING ORC""".stripMargin)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

See: #28490 (comment)

Test case change to view and with actual data.

@@ -3496,6 +3496,88 @@ class SQLQuerySuite extends QueryTest with SharedSparkSession with AdaptiveSpark
checkIfSeedExistsInExplain(df2)
}

test("SPARK-31670: Struct Field in groupByExpr with CUBE") {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you please make the title more correct? I think we don't need the word with CUBE.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you please make the title more correct? I think we don't need the word with CUBE.

It's ok for current PR title?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yea, looks fine.

|LATERAL VIEW EXPLODE(c) X AS each
|GROUP BY a, each.json_string
|GROUPING sets((a),(a, each.json_string))
|""".stripMargin), Nil)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you add tests having queries with HAVING clauses?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you add tests having queries with HAVING clauses?

Seems I make a mistake, having won't have duplicate field since it won't have grouping keys in having condition

"""
|SELECT a, each.json_string AS json_string, SUM(b)
|FROM t
|LATERAL VIEW EXPLODE(c) x AS each
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

btw, we must need lateral view to reproduce this issue? I mean, this issue cannot happen without lateral view?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

btw, we must need lateral view to reproduce this issue? I mean, this issue cannot happen without lateral view?

No, I changed.

case q: LogicalPlan =>
logTrace(s"Attempting to resolve ${q.simpleString(SQLConf.get.maxToStringFields)}")
q.mapExpressions(resolveExpressionTopDown(_, q))
}

def needResolveStructField(plan: LogicalPlan): Boolean = {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

private

}
}

def containSameStructFields(
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

private

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It is not enough just to check if both sides (grpExprs and aggExprs) have struct fields here? We need to confirm the identity by using unresolved attributes?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It is not enough just to check if both sides (grpExprs and aggExprs) have struct fields here? We need to confirm the identity by using unresolved attributes?

Yea, hear attribute is still unresolved attributes.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It is not enough just to check if both sides (grpExprs and aggExprs) have struct fields here?

GroupingSets need check selected cols too, so all need to check.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think its better to avoid comparing unresolved attributes.... could we resolve them then detect the mismatched exprIds?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think its better to avoid comparing unresolved attributes.... could we resolve them then detect the mismatched exprIds?

How about current? not check, just reoslve.

}

def containSameStructFields(
grpExprs: Seq[Attribute],
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: grpExprs -> groupExprs for consistency.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: grpExprs -> groupExprs for consistency.

Done

case p: LogicalPlan if needResolveStructField(p) =>
logTrace(s"Attempting to resolve ${p.simpleString(SQLConf.get.maxToStringFields)}")
val resolved = p.mapExpressions(resolveExpressionTopDown(_, p))
val structFieldMap = new mutable.HashMap[String, Alias]
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

mutable.HashMap -> mutable.Map

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

mutable.HashMap -> mutable.Map

Done

@AngersZhuuuu AngersZhuuuu changed the title [SPARK-31670][SQL]Struct Field in groupByExpr with CUBE [SPARK-31670][SQL]Resolve Struct Field in Aggregate with same ExprId Jun 7, 2020
@AngersZhuuuu AngersZhuuuu changed the title [SPARK-31670][SQL]Resolve Struct Field in Aggregate with same ExprId [SPARK-31670][SQL]Resolve Struct Field in Grouping Aggregate with same ExprId Jun 7, 2020
@SparkQA
Copy link

SparkQA commented Jun 7, 2020

Test build #123597 has finished for PR 28490 at commit 1ee0542.

  • This patch fails due to an unknown error code, -9.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Jun 7, 2020

Test build #123595 has finished for PR 28490 at commit e28b084.

  • This patch fails due to an unknown error code, -9.
  • This patch merges cleanly.
  • This patch adds no public classes.

@maropu
Copy link
Member

maropu commented Jun 7, 2020

retest this please

@@ -1259,6 +1259,11 @@ class Analyzer(
attr.withExprId(exprId)
}

private def dedupStructField(attr: Alias, structFieldMap: Map[String, Attribute]) = {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not used now?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not used now?

Oh, yea

@@ -1481,7 +1486,35 @@ class Analyzer(

case q: LogicalPlan =>
logTrace(s"Attempting to resolve ${q.simpleString(SQLConf.get.maxToStringFields)}")
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you write this handling in an independent patten like this?

      case agg @ (_: Aggregate | _: GroupingSets) =>
        val resolved = agg.mapExpressions(resolveExpressionTopDown(_, agg))
        val structFieldMap = mutable.Map[String, Alias]()
        resolved.transformExpressionsDown {
          // Plz describe some comments why we need this handling...
          case a @ Alias(struct: GetStructField, _) =>
            if (structFieldMap.contains(struct.sql)) {
              val exprId = structFieldMap.getOrElse(struct.sql, a).exprId
              Alias(a.child, a.name)(exprId, a.qualifier, a.explicitMetadata)
            } else {
              structFieldMap += (struct.sql -> a)
              a
            }
          case e => e
        }

      case q: LogicalPlan =>
        logTrace(s"Attempting to resolve ${q.simpleString(SQLConf.get.maxToStringFields)}")
        q.mapExpressions(resolveExpressionTopDown(_, q))

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

w/ some code cleanup;

      case agg @ (_: Aggregate | _: GroupingSets) =>
        val resolved = agg.mapExpressions(resolveExpressionTopDown(_, agg))
        val hasStructField = resolved.expressions.exists {
          _.collectFirst { case gsf: GetStructField => gsf }.isDefined
        }
        if (hasStructField) {
          // Plz describe some comments why we need this handling...
          val structFieldMap = mutable.Map[String, Alias]()
          resolved.transformExpressionsDown {
            case a @ Alias(struct: GetStructField, _) =>
              if (structFieldMap.contains(struct.sql)) {
                val exprId = structFieldMap.getOrElse(struct.sql, a).exprId
                Alias(a.child, a.name)(exprId, a.qualifier, a.explicitMetadata)
              } else {
                structFieldMap += (struct.sql -> a)
                a
              }
          }
        } else {
          resolved
        }

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

w/ some code cleanup;

Done

@maropu
Copy link
Member

maropu commented Jun 7, 2020

Could you check this? @cloud-fan @viirya

@SparkQA
Copy link

SparkQA commented Jun 7, 2020

Test build #123603 has finished for PR 28490 at commit 1ee0542.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Jun 7, 2020

Test build #123605 has finished for PR 28490 at commit 0af3166.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Aug 10, 2020

Test build #127288 has finished for PR 28490 at commit 0af3166.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

Comment on lines 1460 to 1463
// trim Alias over top-level GetStructField
case Alias(s: GetStructField, _) => s
case other => other
}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we can add comment explaining because these expressions are not named expressions originally, we can safely trim top-level Alias.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we can add comment explaining because these expressions are not named expressions originally, we can safely trim top-level Alias.

Extract this part as a func and add comment. cc @cloud-fan

Comment on lines 1354 to 1357
val result = resolved match {
case Alias(s: GetStructField, _) if trimAlias && !isTopLevel => s
case others => others
}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actually I'm wondering if there is any cases we don't want to trim nested (i.e., non top-level) Alias? Such Alias is useless and could possibly cause unexpected issue.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actually I'm wondering if there is any cases we don't want to trim nested (i.e., non top-level) Alias? Such Alias is useless and could possibly cause unexpected issue.

Since we will call CleanupAlias later in Analyzer, any way this field will be trimmed, but if we don't handle it here, we can't pass ResolveGroupingAnalytics.constructAggregateExprs().

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As we don't merge this yet, can you also add a comment here?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As we don't merge this yet, can you also add a comment here?

Yea

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As we don't merge this yet, can you also add a comment here?

See the comment I added just now. cc @cloud-fan

Copy link
Member

@viirya viirya left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just minor comments, otherwise LGTM.

@viirya
Copy link
Member

viirya commented Sep 1, 2020

Thanks for updating the PR description. It looks better now.

@SparkQA
Copy link

SparkQA commented Sep 1, 2020

Test build #128151 has finished for PR 28490 at commit 51cea07.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Sep 2, 2020

Test build #128170 has finished for PR 28490 at commit 84e65af.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

// names leading to ambiguous references exception.
case a @ Aggregate(groupingExprs, aggExprs, appendColumns: AppendColumns) =>
a.mapExpressions(resolveExpressionTopDown(_, appendColumns))
case a: Aggregate =>
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We need a high-level comment to explain why we trim alias here, e.g.

// SPARK-31670: ...

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We need a high-level comment to explain why we trim alias here, e.g.

// [SPARK-31670](https://issues.apache.org/jira/browse/SPARK-31670): ...

Done

@SparkQA
Copy link

SparkQA commented Sep 2, 2020

Test build #128184 has finished for PR 28490 at commit e6fb91f.

  • This patch fails MiMa tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@AngersZhuuuu
Copy link
Contributor Author

retest this please

@SparkQA
Copy link

SparkQA commented Sep 2, 2020

Test build #128181 has finished for PR 28490 at commit 9411887.

  • This patch fails due to an unknown error code, -9.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Sep 2, 2020

Test build #128188 has finished for PR 28490 at commit e6fb91f.

  • This patch fails due to an unknown error code, -9.
  • This patch merges cleanly.
  • This patch adds no public classes.

@AngersZhuuuu
Copy link
Contributor Author

retest this please

@SparkQA
Copy link

SparkQA commented Sep 2, 2020

Test build #128195 has finished for PR 28490 at commit e6fb91f.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@cloud-fan
Copy link
Contributor

thanks, merging to master!

@cloud-fan cloud-fan closed this in 5e6173e Sep 2, 2020
// names leading to ambiguous references exception.
case a @ Aggregate(groupingExprs, aggExprs, appendColumns: AppendColumns) =>
a.mapExpressions(resolveExpressionTopDown(_, appendColumns))
// SPARK-31607: Resolve Struct field in groupByExpressions and aggregateExpressions
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In the comment, SPARK-31607 seems to be a typo of SPARK-31670 because SPARK-31607 is Improve the perf of CTESubstitution.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In the comment, SPARK-31607 seems to be a typo of SPARK-31670 because SPARK-31607 is Improve the perf of CTESubstitution.

Yea,sorry for my mistake

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Never mind~, @AngersZhuuuu . :)

HyukjinKwon pushed a commit that referenced this pull request Nov 6, 2020
### What changes were proposed in this pull request?

This PR fixes incorrect JIRA ids in `Analyzer.scala` introduced by  SPARK-31670 (#28490)
```scala
- // SPARK-31607: Resolve Struct field in selectedGroupByExprs/groupByExprs and aggregations
+ // SPARK-31670: Resolve Struct field in selectedGroupByExprs/groupByExprs and aggregations
```

### Why are the changes needed?

Fix the wrong information.

### Does this PR introduce _any_ user-facing change?

No.

### How was this patch tested?

This is a comment change. Manually review.

Closes #30269 from dongjoon-hyun/SPARK-31670-MINOR.

Authored-by: Dongjoon Hyun <[email protected]>
Signed-off-by: HyukjinKwon <[email protected]>
maropu pushed a commit that referenced this pull request Apr 21, 2021
…ate UnresolvedAlias

### What changes were proposed in this pull request?

This PR partially backports #31758 to 3.1, to fix a backward compatibility issue caused by #28490

The query below has different output schemas in 3.0 and 3.1
```
sql("select struct(1, 2) as s").groupBy(col("s.col1")).agg(first("s"))
```

In 3.0 the output column name is `col1`, in 3.1  it's `s.col1`. This breaks existing queries.

In #28490 , we changed the logic of resolving aggregate expressions. What happened is that the input nested column `s.col1` will become `UnresolvedAlias(s.col1, None)`. In `ResolveReference`, the logic used to directly resolve `s.col` to `s.col1 AS col1` but after #28490 we enter the code path with `trimAlias = true and !isTopLevel`, so the alias is removed and resulting in `s.col1`, which will then be resolved in `ResolveAliases` as `s.col1 AS s.col1`

#31758 happens to fix this issue because we no longer wrap `UnresolvedAttribute` with `UnresolvedAlias` in `RelationalGroupedDataset`.

### Why are the changes needed?

Fix an unexpected query output schema change

### Does this PR introduce _any_ user-facing change?

Yes as explained above.

### How was this patch tested?

updated test

Closes #32239 from cloud-fan/bug.

Authored-by: Wenchen Fan <[email protected]>
Signed-off-by: Takeshi Yamamuro <[email protected]>
flyrain pushed a commit to flyrain/spark that referenced this pull request Sep 21, 2021
…ate UnresolvedAlias

### What changes were proposed in this pull request?

This PR partially backports apache#31758 to 3.1, to fix a backward compatibility issue caused by apache#28490

The query below has different output schemas in 3.0 and 3.1
```
sql("select struct(1, 2) as s").groupBy(col("s.col1")).agg(first("s"))
```

In 3.0 the output column name is `col1`, in 3.1  it's `s.col1`. This breaks existing queries.

In apache#28490 , we changed the logic of resolving aggregate expressions. What happened is that the input nested column `s.col1` will become `UnresolvedAlias(s.col1, None)`. In `ResolveReference`, the logic used to directly resolve `s.col` to `s.col1 AS col1` but after apache#28490 we enter the code path with `trimAlias = true and !isTopLevel`, so the alias is removed and resulting in `s.col1`, which will then be resolved in `ResolveAliases` as `s.col1 AS s.col1`

apache#31758 happens to fix this issue because we no longer wrap `UnresolvedAttribute` with `UnresolvedAlias` in `RelationalGroupedDataset`.

### Why are the changes needed?

Fix an unexpected query output schema change

### Does this PR introduce _any_ user-facing change?

Yes as explained above.

### How was this patch tested?

updated test

Closes apache#32239 from cloud-fan/bug.

Authored-by: Wenchen Fan <[email protected]>
Signed-off-by: Takeshi Yamamuro <[email protected]>
fishcus pushed a commit to fishcus/spark that referenced this pull request Jan 12, 2022
…ate UnresolvedAlias

### What changes were proposed in this pull request?

This PR partially backports apache#31758 to 3.1, to fix a backward compatibility issue caused by apache#28490

The query below has different output schemas in 3.0 and 3.1
```
sql("select struct(1, 2) as s").groupBy(col("s.col1")).agg(first("s"))
```

In 3.0 the output column name is `col1`, in 3.1  it's `s.col1`. This breaks existing queries.

In apache#28490 , we changed the logic of resolving aggregate expressions. What happened is that the input nested column `s.col1` will become `UnresolvedAlias(s.col1, None)`. In `ResolveReference`, the logic used to directly resolve `s.col` to `s.col1 AS col1` but after apache#28490 we enter the code path with `trimAlias = true and !isTopLevel`, so the alias is removed and resulting in `s.col1`, which will then be resolved in `ResolveAliases` as `s.col1 AS s.col1`

apache#31758 happens to fix this issue because we no longer wrap `UnresolvedAttribute` with `UnresolvedAlias` in `RelationalGroupedDataset`.

### Why are the changes needed?

Fix an unexpected query output schema change

### Does this PR introduce _any_ user-facing change?

Yes as explained above.

### How was this patch tested?

updated test

Closes apache#32239 from cloud-fan/bug.

Authored-by: Wenchen Fan <[email protected]>
Signed-off-by: Takeshi Yamamuro <[email protected]>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
Development

Successfully merging this pull request may close these issues.

6 participants