-
Notifications
You must be signed in to change notification settings - Fork 1.3k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat: Support spilling for hash aggregation #7400
Conversation
Waiting for #7399 for the fmt issues |
Thank you @kazuyukitanimura -- I plan to review this PR tomorrow. I am very excited! |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks @kazuyukitanimura for working on this! This is one of the last few missing cornerstones in DataFusion. Cheers!
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thank you very much @kazuyukitanimura -- I think this is really nicely written. I left some comments for your consideration. I think we could simplify some of the traits and memory accounting and still keep most of the benefits of this PR.
I also would love to test this myself locally, but it occurs to me I can't actually do so because datafusion CLI doesn't have any way to do it. I'll file a ticket
From my perspective the only things that would be required for merging this PR would be:
- Performance tests showing it doesn't slow down performance (I don't expect it to and I could help with this)
- A few more tests that exercise the spill path after aggregating more than one batch of data (I have left a suggestion on how to do that with
FairSpillPool
Again, really nice work and thank you very much for the contribution
datafusion/core/src/physical_plan/aggregates/group_values/mod.rs
Outdated
Show resolved
Hide resolved
datafusion/physical-expr/src/aggregate/groups_accumulator/mod.rs
Outdated
Show resolved
Hide resolved
datafusion/physical-expr/src/aggregate/groups_accumulator/mod.rs
Outdated
Show resolved
Hide resolved
Thank you @alamb for reviewing. I plan to work on addressing comments a week after next (9/5~) once I am back from my travel.
Regarding the benchmark, what would be the best way to proceed? I read https://github.com/apache/arrow-datafusion/blob/main/benchmarks/README.md |
This is what I recommend. I have some scripts in https://github.com/alamb/datafusion-benchmarking that I used to compare a branch to main, but they aren't super user friendly at the moment. I'll run this branch on a gcp machine and report back Thanks again |
Marking as draft to signify this PR has gotten feedback and is waiting to incorporate it before subsequent review |
Thank you @kazuyukitanimura - this is a really nice piece of technology, as well as a nice example of collaboration. I agree let's merge this and continue working on main / follow on PRs. I filed #7571 to track adding spilling group benchmarks |
@Dandandan I believe the reason is that Q15 is so fast (15 ms!) it is susceptible to small pturbations, and the difference is largely a measurement error:
|
Great, thank you all again. |
]; | ||
let expected = if spill { | ||
vec![ | ||
"+---+-----+-----------------+", |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@kazuyukitanimura Hi, do you remember why the result of spill
is different from the non-spill
one?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think the reason is that we are comparing the output of a partial aggregate and when spill
we also have a lower desired batch size and hit the emit early logic:
datafusion/datafusion/physical-plan/src/aggregates/row_hash.rs
Lines 934 to 945 in 5b6b404
fn emit_early_if_necessary(&mut self) -> Result<()> { | |
if self.group_values.len() >= self.batch_size | |
&& matches!(self.group_ordering, GroupOrdering::None) | |
&& matches!(self.mode, AggregateMode::Partial) | |
&& self.update_memory_reservation().is_err() | |
{ | |
let n = self.group_values.len() / self.batch_size * self.batch_size; | |
let batch = self.emit(EmitTo::First(n), false)?; | |
self.exec_state = ExecutionState::ProducingOutput(batch); | |
} | |
Ok(()) | |
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
That's right. When spilling happens, it means we don't have enough memory to hold original batch so we will do partial aggregation on smaller batch, different partial aggregation result will be but I think it doesn't change final aggregation result.
Which issue does this PR close?
Closes #1570
Rationale for this change
For hash aggregation, the sizes of group values and accumulators can become large that may cause out of memory.
What changes are included in this PR?
This PR lets the hash aggregation operator spill large data to local disk using Arrow IPC format. For every input
RecordBatch
, the memory manager checks whether the new input size meets the memory configuration. If not, spilling happens, and later stream-merge sorts the spilled data to read back. The stream merge sort function was reused from the one in sort operator.Are these changes tested?
Yes
Are there any user-facing changes?
No