Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Support spilling for hash aggregation #7400

Merged
merged 21 commits into from
Sep 15, 2023

Conversation

kazuyukitanimura
Copy link
Contributor

Which issue does this PR close?

Closes #1570

Rationale for this change

For hash aggregation, the sizes of group values and accumulators can become large that may cause out of memory.

What changes are included in this PR?

This PR lets the hash aggregation operator spill large data to local disk using Arrow IPC format. For every input RecordBatch, the memory manager checks whether the new input size meets the memory configuration. If not, spilling happens, and later stream-merge sorts the spilled data to read back. The stream merge sort function was reused from the one in sort operator.

Are these changes tested?

Yes

Are there any user-facing changes?

No

@github-actions github-actions bot added physical-expr Physical Expressions core Core DataFusion crate labels Aug 24, 2023
@kazuyukitanimura
Copy link
Contributor Author

Waiting for #7399 for the fmt issues

@alamb
Copy link
Contributor

alamb commented Aug 24, 2023

Thank you @kazuyukitanimura -- I plan to review this PR tomorrow. I am very excited!

Copy link
Member

@yjshen yjshen left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks @kazuyukitanimura for working on this! This is one of the last few missing cornerstones in DataFusion. Cheers!

@kazuyukitanimura
Copy link
Contributor Author

Just FYI, I will be traveling entire next week. I may not be able to respond in time, but I plan to address comments the week after (9/5~)
Thank you in advance.
cc @sunchao @viirya

Copy link
Contributor

@alamb alamb left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you very much @kazuyukitanimura -- I think this is really nicely written. I left some comments for your consideration. I think we could simplify some of the traits and memory accounting and still keep most of the benefits of this PR.

I also would love to test this myself locally, but it occurs to me I can't actually do so because datafusion CLI doesn't have any way to do it. I'll file a ticket

From my perspective the only things that would be required for merging this PR would be:

  1. Performance tests showing it doesn't slow down performance (I don't expect it to and I could help with this)
  2. A few more tests that exercise the spill path after aggregating more than one batch of data (I have left a suggestion on how to do that with FairSpillPool

Again, really nice work and thank you very much for the contribution

@kazuyukitanimura
Copy link
Contributor Author

Thank you @alamb for reviewing. I plan to work on addressing comments a week after next (9/5~) once I am back from my travel.

  1. Performance tests showing it doesn't slow down performance (I don't expect it to and I could help with this)

Regarding the benchmark, what would be the best way to proceed? I read https://github.com/apache/arrow-datafusion/blob/main/benchmarks/README.md
Is bench.sh run tpch a good one for this PR purpose? I am wondering if there is a github action / CI so that we can use the same machine for benchmarking...

@alamb
Copy link
Contributor

alamb commented Aug 26, 2023

Is bench.sh run tpch a good one for this PR purpose? I am wondering if there is a github action / CI so that we can use the same machine for benchmarking...

This is what I recommend.

I have some scripts in https://github.com/alamb/datafusion-benchmarking that I used to compare a branch to main, but they aren't super user friendly at the moment. I'll run this branch on a gcp machine and report back

Thanks again

@alamb
Copy link
Contributor

alamb commented Aug 29, 2023

Marking as draft to signify this PR has gotten feedback and is waiting to incorporate it before subsequent review

@alamb alamb marked this pull request as draft August 29, 2023 11:40
@github-actions github-actions bot removed the physical-expr Physical Expressions label Sep 7, 2023
@github-actions github-actions bot added the physical-expr Physical Expressions label Sep 8, 2023
@kazuyukitanimura kazuyukitanimura marked this pull request as ready for review September 12, 2023 10:22
@kazuyukitanimura
Copy link
Contributor Author

Thank you all for the reviews! I think addressed all of them. @alamb @yjshen @sunchao @viirya
There are some TODOs I plan to follow up in the future PRs. For any further improvements on this PR, I plan to tack on the next PR.

@alamb
Copy link
Contributor

alamb commented Sep 15, 2023

Thank you all for the reviews! I think addressed all of them. @alamb @yjshen @sunchao @viirya
There are some TODOs I plan to follow up in the future PRs. For any further improvements on this PR, I plan to tack on the next PR.

Thank you @kazuyukitanimura - this is a really nice piece of technology, as well as a nice example of collaboration. I agree let's merge this and continue working on main / follow on PRs.

I filed #7571 to track adding spilling group benchmarks

@alamb alamb merged commit f1f8d79 into apache:main Sep 15, 2023
@alamb
Copy link
Contributor

alamb commented Sep 15, 2023

QQuery 15

What's the reason behind QQuery 15 +2.36x faster?

@Dandandan I believe the reason is that Q15 is so fast (15 ms!) it is susceptible to small pturbations, and the difference is largely a measurement error:

│ QQuery 15 │ 13.57ms │ 14.29ms │ 1.05x slower │

@kazuyukitanimura
Copy link
Contributor Author

Great, thank you all again.

];
let expected = if spill {
vec![
"+---+-----+-----------------+",
Copy link
Contributor

@jayzhan211 jayzhan211 Sep 17, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@kazuyukitanimura Hi, do you remember why the result of spill is different from the non-spill one?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think the reason is that we are comparing the output of a partial aggregate and when spill we also have a lower desired batch size and hit the emit early logic:

fn emit_early_if_necessary(&mut self) -> Result<()> {
if self.group_values.len() >= self.batch_size
&& matches!(self.group_ordering, GroupOrdering::None)
&& matches!(self.mode, AggregateMode::Partial)
&& self.update_memory_reservation().is_err()
{
let n = self.group_values.len() / self.batch_size * self.batch_size;
let batch = self.emit(EmitTo::First(n), false)?;
self.exec_state = ExecutionState::ProducingOutput(batch);
}
Ok(())
}

Copy link
Member

@viirya viirya Sep 17, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That's right. When spilling happens, it means we don't have enough memory to hold original batch so we will do partial aggregation on smaller batch, different partial aggregation result will be but I think it doesn't change final aggregation result.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
core Core DataFusion crate enhancement New feature or request physical-expr Physical Expressions
Projects
None yet
Development

Successfully merging this pull request may close these issues.

Memory Limited GroupBy (Externalized / Spill)
9 participants