-
Notifications
You must be signed in to change notification settings - Fork 28.5k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[SPARK-12706] [SQL] grouping() and grouping_id() #10677
Conversation
Test build #49049 has finished for PR 10677 at commit
|
Test build #49050 has finished for PR 10677 at commit
|
/** | ||
* Aggregate function: returns the level of grouping, equals to | ||
* | ||
* (grouping(c1) << (n-1)) + (grouping(c1) << (n-2)) + ... + grouping(cn) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Second term should be grouping(c2)
@hvanhovell can you review this one? |
I'll have a look |
aggsBuffer += e | ||
e | ||
case e if isPartOfAggregation(e) => e | ||
case e: GroupingID => |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is probably a dumb question. What happens if we use these functions without grouping sets? Do we get a nice analysis exception?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Right now, it will fail to resolve, agreed that should be have a better error message.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Had capture this in CheckAnasys.
@davies the PR is in good shape. The two main (minor) issues I could find are:
|
Conflicts: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala
This reverts commit 736e8d2.
Test build #51017 has finished for PR 10677 at commit
|
@davies I did some reading on As for the Hive compatibility we can also add a few lines of comments in the Analyzer explaining why Hive is wrong. |
Test build #51025 has finished for PR 10677 at commit
|
Test build #51027 has finished for PR 10677 at commit
|
@hvanhovell The analyzer know nothing about Hive, where is the best place to put the comment? |
@davies Yeah, you have a point there. We have inherited the wrong construction of the bitmask from Hive: https://github.com/apache/spark/blob/master/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/CatalystQl.scala#L202-L206. We could also fix/document it there. |
Test build #51043 has finished for PR 10677 at commit
|
LGTM I left a few minor final comments in CatalystQl |
Test build #51053 has finished for PR 10677 at commit
|
Test build #2534 has finished for PR 10677 at commit
|
merging into master, thanks! |
## What changes were proposed in this pull request? Prior this pr, the following code would cause an NPE: `case class point(a:String, b:String, c:String, d: Int)` `val data = Seq( point("1","2","3", 1), point("4","5","6", 1), point("7","8","9", 1) )` `sc.parallelize(data).toDF().registerTempTable("table")` `spark.sql("select a, b, c, count(d) from table group by a, b, c GROUPING SETS ((a)) ").show()` The reason is that when the grouping_id() behavior was changed in #10677, some code (which should be changed) was left out. Take the above code for example, prior #10677, the bit mask for set "(a)" was `001`, while after #10677 the bit mask was changed to `011`. However, the `nonNullBitmask` was not changed accordingly. This pr will fix this problem. ## How was this patch tested? add integration tests Author: wangyang <[email protected]> Closes #15416 from yangw1234/groupingid. (cherry picked from commit fb0d608) Signed-off-by: Herman van Hovell <[email protected]>
## What changes were proposed in this pull request? Prior this pr, the following code would cause an NPE: `case class point(a:String, b:String, c:String, d: Int)` `val data = Seq( point("1","2","3", 1), point("4","5","6", 1), point("7","8","9", 1) )` `sc.parallelize(data).toDF().registerTempTable("table")` `spark.sql("select a, b, c, count(d) from table group by a, b, c GROUPING SETS ((a)) ").show()` The reason is that when the grouping_id() behavior was changed in #10677, some code (which should be changed) was left out. Take the above code for example, prior #10677, the bit mask for set "(a)" was `001`, while after #10677 the bit mask was changed to `011`. However, the `nonNullBitmask` was not changed accordingly. This pr will fix this problem. ## How was this patch tested? add integration tests Author: wangyang <[email protected]> Closes #15416 from yangw1234/groupingid. (cherry picked from commit fb0d608) Signed-off-by: Herman van Hovell <[email protected]>
## What changes were proposed in this pull request? Prior this pr, the following code would cause an NPE: `case class point(a:String, b:String, c:String, d: Int)` `val data = Seq( point("1","2","3", 1), point("4","5","6", 1), point("7","8","9", 1) )` `sc.parallelize(data).toDF().registerTempTable("table")` `spark.sql("select a, b, c, count(d) from table group by a, b, c GROUPING SETS ((a)) ").show()` The reason is that when the grouping_id() behavior was changed in #10677, some code (which should be changed) was left out. Take the above code for example, prior #10677, the bit mask for set "(a)" was `001`, while after #10677 the bit mask was changed to `011`. However, the `nonNullBitmask` was not changed accordingly. This pr will fix this problem. ## How was this patch tested? add integration tests Author: wangyang <[email protected]> Closes #15416 from yangw1234/groupingid.
## What changes were proposed in this pull request? Prior this pr, the following code would cause an NPE: `case class point(a:String, b:String, c:String, d: Int)` `val data = Seq( point("1","2","3", 1), point("4","5","6", 1), point("7","8","9", 1) )` `sc.parallelize(data).toDF().registerTempTable("table")` `spark.sql("select a, b, c, count(d) from table group by a, b, c GROUPING SETS ((a)) ").show()` The reason is that when the grouping_id() behavior was changed in apache#10677, some code (which should be changed) was left out. Take the above code for example, prior apache#10677, the bit mask for set "(a)" was `001`, while after apache#10677 the bit mask was changed to `011`. However, the `nonNullBitmask` was not changed accordingly. This pr will fix this problem. ## How was this patch tested? add integration tests Author: wangyang <[email protected]> Closes apache#15416 from yangw1234/groupingid.
Grouping() returns a column is aggregated or not, grouping_id() returns the aggregation levels.
grouping()/grouping_id() could be used with window function, but does not work in having/sort clause, will be fixed by another PR.
The GROUPING__ID/grouping_id() in Hive is wrong (according to docs), we also did it wrongly, this PR change that to match the behavior in most databases (also the docs of Hive).