Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support StreamAggregation / streaming group by #5133

Closed
Tracked by #4973
xiaoyong-z opened this issue Jan 31, 2023 · 14 comments · Fixed by #6124
Closed
Tracked by #4973

Support StreamAggregation / streaming group by #5133

xiaoyong-z opened this issue Jan 31, 2023 · 14 comments · Fixed by #6124
Labels
enhancement New feature or request

Comments

@xiaoyong-z
Copy link
Contributor

xiaoyong-z commented Jan 31, 2023

If the input group by columns has been sorted before the aggregation, we can enable stream aggregation, it is more efficient than HashAggregation.

@xiaoyong-z xiaoyong-z added the enhancement New feature or request label Jan 31, 2023
@xiaoyong-z
Copy link
Contributor Author

xiaoyong-z commented Jan 31, 2023

@alamb hello, it seems that datafusion currently doesn't have StreamAggregation. If no one works on this, i want to implement it.

@alamb
Copy link
Contributor

alamb commented Jan 31, 2023

Hi @xiaoyong-z -- that is great news! -- I believe that @metesynnada @ozankabak mentioned they wanted to work on this feature. Let's use this ticket to collaborate on a design.

I believe #1570 is also related as streaming grouping is often used to merge the spilled groups. @milenkovicm and I had some discussion about this #1570 (comment) but I never followed through on a writeup

@ozankabak
Copy link
Contributor

This was on our roadmap and we would love to help out on this. @alamb, if you can share with us the papers/resources you mentioned on this we can digest them and share our thinking on the design. @xiaoyong-z, do you have a particular design in mind yet?

@alamb
Copy link
Contributor

alamb commented Jan 31, 2023

I will begin a google doc for us to collaborate on

@alamb
Copy link
Contributor

alamb commented Jan 31, 2023

Here is a google doc with some ideas https://docs.google.com/document/d/16rm5VR1nGkY6DedMCh1NUmThwf3RduAweaBH9b1h6AY/edit?usp=sharing

I have it in "comment" mode for everyone on the internet, but please feel free to request edit access and I will grant it

@ozankabak
Copy link
Contributor

Thank you for putting this together, had an initial look. I expect us to do a deeper dive, take a look at the mentioned papers, and give meaningful comments in the next several days.

@Ted-Jiang
Copy link
Member

Our team is also looking forward to this feature and the memory limited aggregation 👍

@xiaoyong-z
Copy link
Contributor Author

xiaoyong-z commented Feb 1, 2023

Thank you all. I'm still in the very beginning stage, and i plan to investigate some papers and how other system implement it in the following days. @alamb thanks for sharing the google doc, i will put my system design plan on it in the future.

@xiaoyong-z
Copy link
Contributor Author

xiaoyong-z commented Feb 7, 2023

I update some plans to implement the stream aggregation on https://docs.google.com/document/d/16rm5VR1nGkY6DedMCh1NUmThwf3RduAweaBH9b1h6AY/edit?usp=sharing

, PTAL. Detail design for fully stream aggregation will be given in the next following days.

@alamb
Copy link
Contributor

alamb commented Feb 8, 2023

PTAL. Detail design for fully stream aggregation will be given in the next following days.

Thank you @xiaoyong-z -- I read your addition and left some comments. Overall I think it is a great idea.

Here is one possibly approach to implementation (perhaps what you had in mind):

  1. Implement StreamAggregate that handles pre-sorted data (where the data is already sorted according to the grouping keys/ partition keys).
  2. Remove AggregateStream https://github.com/apache/arrow-datafusion/blob/master/datafusion/core/src/physical_plan/aggregates/no_grouping.rs and replace its use in the optimizer with the new StreamAggregate operator
  3. Update the optimizer to recognize when the input to a GroupByHash is sorted appropriately and switch to using the AggregateStream operator.

I think that would get us pretty far.

I am not sure about the idea of "sort the data first and then run the stream aggregator" -- as I mentioned in the document I think it is unlikely that approach will be better in terms of overall memory usage or performance.

When we want to support spilling group by (external group by) that is when sort might be beneficial.

@ozankabak
Copy link
Contributor

@mustafasrepo, can you take a detailed look at @xiaoyong-z's design? Thanks.

@mustafasrepo
Copy link
Contributor

mustafasrepo commented Feb 9, 2023

Thanks @xiaoyong-z, For the design. I asked some questions to understand the design better, and left some comments. Overall I think, your road map is well thought and planned.

@alamb alamb changed the title Support StreamAggregation Support StreamAggregation / streaming group by Mar 3, 2023
@mustafasrepo
Copy link
Contributor

Hi @xiaoyong-z, I can receive some of the tasks from the document. Specifically I would like to start out with the case.

  • 1.Implement StreamAggregate that handles pre-sorted data (where the data is already sorted according to the grouping keys/ partition keys). (Corresponds to 2nd step in the document I guess.)

If you are not working already on this feature.

@xiaoyong-z
Copy link
Contributor Author

@mustafasrepo Sorry, currently i don't have time to push this work.
If you have time on your side, you can work on any part of this feature.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
enhancement New feature or request
Projects
None yet
5 participants