-
Notifications
You must be signed in to change notification settings - Fork 28.5k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[SPARK-8160][SQL]Support using external sorting to run aggregate #6875
Conversation
Test build #35110 has finished for PR 6875 at commit
|
Test build #35122 has finished for PR 6875 at commit
|
@JoshRosen this may be of interest to you |
@andrewor14, yes, definitely 😄 I have a patch (#6444) to implement an optimized binary processing sort for use in Spark SQL and the change here will amplify the benefits of the new sort, so I'm super excited about this. |
@JoshRosen yes, i have looked at patch (#6444). this pr has no conflict with (#6444) because it just implement based-sort aggregate after sort by groupKey. so it can run on both external sort and binary processing sort. |
@lianhuiwang Can we do hash based aggregation first, then switch to sort based if We can not hold all of them in memory? (we still can have a flag to disable it) |
@davies if we can not hold all of them in memory and then switch to sort based, it should re-shuffle data to do sort. so its computation cost is very expensive. i think it is determined by statistics before physical plan execution. this problem is similar as hash join or sort-merge join. now sort merge join is determined by spark.sql.planner.sortMergeJoin(default is false). like sort merge join, sort based aggregation of this PR is also determined by spark.sql.planner.sortMergeAggregate(default is false). |
@lianhuiwang Aggregation is different than join, because aggregation could aggregation could reduce the data size, but join cannot. Optimizer could figure out whether use broadcast join or sort merge join based on the size of table, but it's very hard to guess what's the memory assumption will for aggregation (which is determined by the number of unique groups and aggregation algorithm). All the aggregations happens within a partition, so no shuffling is needed. Usually, there are two aggregations happen before and after shuffling. |
@davies thanks. yes,i get it. i think it is similar as aggregation of ExternalSorter. |
I think we should close this PR for now while we review the other one; let's re-open if necessary. |
ok,@JoshRosen, thanks, i close this PR. |
add config 'spark.sql.planner.sortMergeAggregate' to turn on sortMerge Aggregate, now default is false. and add two class to support sortMergeAggregate:
SortMergeAggregate is for Aggregate that cannot be codegened.
SortMergeGeneratedAggregate is for GeneratedAggregate that can be codegened.