-
Notifications
You must be signed in to change notification settings - Fork 1.3k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Memory Limited GroupBy (Externalized / Spill) #1570
Comments
For hash aggregation I quite like the implementation described in https://dl.acm.org/doi/pdf/10.1145/2588555.2610507 (4.4). The algorithm there optimizes for cache locality by pre-aggregating into fixed-size thread-local hash tables and then overflowing into main memory partitions, but I think it should be easy to extend with also flushing these partitions to disk if they get too large. |
An idea to brainstorm this feature. master...milenkovicm:arrow-datafusion:spill_aggregation_to_mmap has a POC code which uses a memory mapped file as the aggregation store, which should spill to disc in case of memory pressure. This solution is just a prototype, far from perfect, and good chance it is not fit for purpose. The main change is that Problems with this solution:
good point(s):
no performance numbers at the moment, and not much experience with datafusion Any comments welcomed |
I will try and review this sometime later today. |
thanks @alamb, just a follow up with some numbers running a simple query on 10M distinct keys and 50M records total (each key has 5 records) ( with this change:
without change:
|
Thanks for the info @milenkovicm In general I think database systems tend to try and shy away from using mmap for spilling operations, for the reasons very clearly articulated in "Are You Sure You Want to Use MMAP in Your Database Management System?" https://db.cs.cmu.edu/mmap-cidr2022/ I think the current state of the art / best practice for dealing with grouping when the hashtable doesn't fit in memory is to:
Thanks to @yjshen we have much of the spilling / merging code already. We are lacking:
Does that make sense? I know it is a high level description |
thanks @alamb, Would you have any design doc regarding merge group by and re-merge which you can share? |
I do not have a design document, sadly. I can try and help write one (perhaps I can make a diagram and example) if you are interested in working on it @milenkovicm |
I'd be interested. after a big sleep I think I get your approach, but if you can produce a diagram it would be great. thanks @alamb |
😆 sounds good -- I will try and write up something in the coming days to start a discussion |
* change how final aggregation row group is created ... this change would prevent of cloning of whole state, doubling memory needed for aggregation. this PR relates to #1570 * Fix clippy issues * read batch size from `session_config`
Update on this task is @crepererum has added initial support to track the memory use in the grouping operation. We @tustvold / @crepererum (and myself as ticket master) have plans to improve the grouping operation which I will make a bit more discoverable shortly |
@milenkovicm very belatedly, here is a document / diagrams: https://docs.google.com/document/d/16rm5VR1nGkY6DedMCh1NUmThwf3RduAweaBH9b1h6AY/edit?usp=sharing |
@alamb @yjshen |
You still need to disk spilling, no? Or where do you store the serialized state? Also I guess that serialization may become a major bottleneck for some of the accumulators. |
Yes, we still need the disk spilling, the disk spilling can be managed and tracked by the |
I'm not sure I see many benefits of having it serializable, would agree with @crepererum IMHO, aggregation should start with hash map, we can assume that there is not going to be spill, if we're wrong we would pay penalty of being wrong as we will have to sort it before spill. Once we have it spill to disc I'd argue it would make more sense to switch from hash map to b-tree, as we would need to merge it with spill, it is slower but from my experience it is a bit faster than sorting hash map. Spilling can be implemented using two column parquet file (key: blob, value: blob) . Implementation like this works quite well from my experience, especially that in most cases we wont trigger spill |
@mingmwang when we serialize the groups to disk when the hash table is full, we then need to read them back in again somehow and do the final combination. My assumption is that we don't have the memory to read them all back in at once as we had to spill in the first place If we sort the data that is spilled on the group keys, we can stream data from the different spill files in parallel, merge, and then do a streaming group by, which we will have sufficient memory to accomplished.
Yes I agree with @milenkovicm - this approach will work well. It does have a disadvantage of a performance "cliff" when the query goes from in memory --> spilling (it doesn't degrade gracefully) but it is likely the fastest for queries that have sufficient memory |
BTW after #6889 I think we'll be in pretty good shape to dump the accumulator state to disk (as Arrow arrays, perhaps) if we want to pursue this more |
Hi @alamb i have a poc code, a bit stale though, which will dump state to arrow. I've tried to rebase it recently but did not have time to finish it. It's in my branch at https://github.com/milenkovicm/arrow-datafusion/commit/e9d17d2a456707f88497606b3977a8a82199c7d1 Not sure if it still makes sense after all this time. |
Thanks @milenkovicm -- that actually looks quite cool. If/when someone picks up this ticket I think it would be a good place to start from. I especially like how you encapsulated the spilling / state code in its own module |
Hello, We have an internal spilling implementation for partial aggregation. We are currently in the middle of applying back the recent improvements of aggregation (such as #7095). After that is done, we plan to contribute back the work to DataFusion, but I just found this ticket. I am wondering what the current status of this ticket is. I am wondering if the community is willing to review yet another spilling implementation... |
That sounds good
I am willing to review another spilling implementation for sure -- it is one of the missing features for datafusion to be a complete analytic solution To assit review, it would help to explain / document how it works (e.g. is it based on mmap, or are the intermediate groups spilled as arrow, or are they sorted?) ideally in code comments I made some diagrams here that might be helpful (but are mostly related to streaming groupby) |
Thank you @alamb |
I finally posted a PR for this #7400 |
Is your feature request related to a problem or challenge? Please describe what you are trying to do.
Support Grouping "arbitrarily" large inputs (e.g. when the group by hash table doesn't fit in RAM or within some user defined budget)
This typically happens when there are a "large" number of distinct groups. For example, with queries like
When there are large number of groups
This ticket concerns the memory used the
HashAggregateExec
operator -- it doesn't cover other potential targets (e.g. externalized sort or join). That will be covered by other tasks tracked by #587Describe the solution you'd like
For the
HashAggregateExec
operator, I think the best behavior would be:HashAggregateExec
), if the memory budget allowsHopefully after #1568 is complete we'll have an efficient N-way merge that can be reused.
Some ideas of how to break this task down
row_hash.rs
andhash.rs
(remove duplication) #2723Aggregator
, grouping, aggregation #4973Describe alternatives you've considered
TBD
Context
This is follow on work from the great PR from @yjshen in #1526 and part of the story of limiting memory used by DataFusion #587
The text was updated successfully, but these errors were encountered: