-
Notifications
You must be signed in to change notification settings - Fork 1.3k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Consolidate BoundedAggregateStream
#6932
Conversation
e429743
to
2da356c
Compare
2c013a1
to
aeac248
Compare
b42fc16
to
17cc833
Compare
17cc833
to
5b874da
Compare
@@ -1116,6 +1104,7 @@ fn create_accumulators( | |||
.collect::<Result<Vec<_>>>() | |||
} | |||
|
|||
#[allow(dead_code)] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This will be removed in #6968
@@ -0,0 +1,170 @@ | |||
// Licensed to the Apache Software Foundation (ASF) under one |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This design tries to keep all the ordering isolated out of the main row_hash.rs logic to manage the complexity
|
||
if let Err(e) = result { | ||
return Poll::Ready(Some(Err(e))); | ||
// Do the grouping |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The main state machine is updated to emit data when possible (so it needs to transition back and forth between input and emitting)
/// Optional ordering information, that might allow groups to be | ||
/// emitted from the hash table prior to seeing the end of the | ||
/// input | ||
group_ordering: GroupOrdering, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I am quite pleased that the extra ordering state is tracked in a single struct
) -> Result<()> { | ||
// Convert the group keys into the row format | ||
// Avoid reallocation when https://github.com/apache/arrow-rs/issues/4479 is available | ||
let group_rows = self.row_converter.convert_columns(group_values)?; | ||
let n_rows = group_rows.num_rows(); | ||
|
||
// track memory used | ||
let group_values_size_pre = self.group_values.size(); | ||
let scratch_size_pre = self.scratch_space.size(); | ||
memory_delta.dec(self.state_size()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I needed to update the memory accounting logic because previously the group operator never decreased its memory reservation, but now that state is cleared early, it need to shrink as well
|
||
// Update ordering information if necessary | ||
let total_num_groups = self.group_values.num_rows(); | ||
if total_num_groups > starting_num_groups { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Here is where the ordering information is updated, and it is a few checks per batch overhead when there is no ordering
@@ -71,7 +71,7 @@ impl AccumulatorState { | |||
fn size(&self) -> usize { | |||
self.accumulator.size() | |||
+ std::mem::size_of_val(self) | |||
+ std::mem::size_of::<u32>() * self.indices.capacity() | |||
+ self.indices.allocated_size() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This adapter needs to be updated to account for the fact that groups can be removed and thus memory freed
/// first group index with the sort_key | ||
current_sort: usize, | ||
/// The sort key of group_index `current_sort` | ||
sort_key: OwnedRow, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I expect this code to be quite a bit faster than the current BoundedWindowAggregate
as it uses the row format rather than ScalarValue
, but I don't know of any benchmarks of the streaming group by code
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It occurs to me that a potentially faster way to detect the group boundaries would be to use the inequality kernel offset by one w.r.t to each other, ORing the results together, and then iterating over the set bits.
There would be some subtlety to handle nulls correctly, but it would likely be significantly faster and would not require converting to the row format
We could likely do something similar for window functions if we aren't already
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is an excellent idea -- I filed it as #7023 for follow on work
From my perspective this PR will already be faster than the existing streaming group by (which uses ScalaValue
to track the sort keys) so I think it is acceptable to merge as is
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I have left minor comments, nothing important. This PR eases the maintenance, improves readability, decreases code size. Thanks @alamb for this great work.
Co-authored-by: Mustafa Akur <[email protected]>
…-datafusion into alamb/consolidated_streaming
BTW @mustafasrepo and @ozankabak I meant to mention this before but the tests in https://github.com/apache/arrow-datafusion/blob/main/datafusion/core/tests/fuzz_cases/aggregate_fuzz.rs helped me implement and debug this code very nicely 👌 Thank you for that |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I plan to leave this open for at least another day. cc @mingmwang or @Dandandan or @yahoNanJing in case you are interested.
I plan to merge this in tomorrow morning unless anyone else wants additional time to review |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm not an authority on this code, but left some comments. I think this could possibly be implemented without needing the row format at all, which would likely be close to optimal from a performance standpoint
/// │┌───┐│ │ ┌──────────────┐ │ │ ┗━━━━━━━━━━━━━━━━━┛ ┗━━━━━━━┛ | ||
/// ││ 0 ││ │ │ 123, "MA" │ │ │ current_sort sort_key | ||
/// │└───┘│ │ └──────────────┘ │ │ | ||
/// │ ... │ │ ... │ │ current_sort tracks the most |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This states "most recent" but I think it means "oldest" or possibly "smallest"?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Agreed -- changed to 'smallest' in ddaf0e7
/// first group index with the sort_key | ||
current_sort: usize, | ||
/// The sort key of group_index `current_sort` | ||
sort_key: OwnedRow, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It occurs to me that a potentially faster way to detect the group boundaries would be to use the inequality kernel offset by one w.r.t to each other, ORing the results together, and then iterating over the set bits.
There would be some subtlety to handle nulls correctly, but it would likely be significantly faster and would not require converting to the row format
We could likely do something similar for window functions if we aren't already
@@ -303,6 +322,16 @@ fn create_group_accumulator( | |||
} | |||
} | |||
|
|||
/// Extracts a successful Ok(_) or returns Poll::Ready(Some(Err(e))) with errors | |||
macro_rules! extract_ok { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
One way to avoid this is to extract a sync function that returns a Result, allowing the use of ?
and then map as necessary
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I tried this - I am not sure it is better -- I will put up a follow on PR #7025
|
||
// If we can begin emitting rows, do so, | ||
// otherwise keep consuming input | ||
let to_emit = if self.input_done { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
How would this branch be reached?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I agree -- input_done
can't be true here as we just got a batch. I will change it to an assert. 372196e
} | ||
EmitTo::First(n) => { | ||
// Clear out first n group keys by copying them to a new Rows. | ||
// TODO file some ticket in arrow-rs to make this more efficent? |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm not sure this can be made efficient, but I also am not sure this aggregator needs to use the row format at all
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In this case, the group_values also need to support "remove the first N values" even if we removed the row format from the partial order state
pub fn new_groups( | ||
&mut self, | ||
group_indices: &[usize], | ||
batch_hashes: &[u64], |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In the case of GroupOrderingFull it seems unnecessary to be computing hashes at all, we can just group based on whenever the sort key changes?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
That is a (very) good point -- noted in #7023 for follow on work
/// | ||
/// 3. Call `delta.inc(size_thing.size())` | ||
#[derive(Debug, Default)] | ||
pub struct MemoryDelta { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In https://github.com/apache/arrow-datafusion/pull/7016/files#r1267407414 I opted to just remove the delta-based accounting as I couldn't see a compelling reason to keep it around, given it was computing total memory usage, and using this to compute deltas, in order to then update the total memory usage
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The reason there was delta accounting is that in previous incarnations, calculating the size for the overall grouping operator was a significant bottleneck (for groupings with large cardinality groups) .
Specifically Accumulator::size()
showed up on a bunch of profiles.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
However, now that you mention it the delta accounting is now done per-Accumulator
in the adapter so the actual group by hash operator can probably remove the delta accounting
Here is my proposal:
- I will add some comments on the rationale for delta accounting to this PR
- I merge this MR
- You can either remove the delta accounting in Extract GroupValues (#6969) #7016 in the main GroupByHash operator or I will do so and we can run some benchmarks to make sure it doesn't have performance impact
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Comments in e7dc2ae
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Given #7016 (comment) showed no regression I will opt to simply remove it
Co-authored-by: Raphael Taylor-Davies <[email protected]>
…-datafusion into alamb/consolidated_streaming
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thank you for the comments @tustvold -- they were very helpful.
pub fn new_groups( | ||
&mut self, | ||
group_indices: &[usize], | ||
batch_hashes: &[u64], |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
That is a (very) good point -- noted in #7023 for follow on work
/// first group index with the sort_key | ||
current_sort: usize, | ||
/// The sort key of group_index `current_sort` | ||
sort_key: OwnedRow, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is an excellent idea -- I filed it as #7023 for follow on work
From my perspective this PR will already be faster than the existing streaming group by (which uses ScalaValue
to track the sort keys) so I think it is acceptable to merge as is
} | ||
EmitTo::First(n) => { | ||
// Clear out first n group keys by copying them to a new Rows. | ||
// TODO file some ticket in arrow-rs to make this more efficent? |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In this case, the group_values also need to support "remove the first N values" even if we removed the row format from the partial order state
/// │┌───┐│ │ ┌──────────────┐ │ │ ┗━━━━━━━━━━━━━━━━━┛ ┗━━━━━━━┛ | ||
/// ││ 0 ││ │ │ 123, "MA" │ │ │ current_sort sort_key | ||
/// │└───┘│ │ └──────────────┘ │ │ | ||
/// │ ... │ │ ... │ │ current_sort tracks the most |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Agreed -- changed to 'smallest' in ddaf0e7
|
||
// If we can begin emitting rows, do so, | ||
// otherwise keep consuming input | ||
let to_emit = if self.input_done { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I agree -- input_done
can't be true here as we just got a batch. I will change it to an assert. 372196e
/// | ||
/// 3. Call `delta.inc(size_thing.size())` | ||
#[derive(Debug, Default)] | ||
pub struct MemoryDelta { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Comments in e7dc2ae
@@ -303,6 +322,16 @@ fn create_group_accumulator( | |||
} | |||
} | |||
|
|||
/// Extracts a successful Ok(_) or returns Poll::Ready(Some(Err(e))) with errors | |||
macro_rules! extract_ok { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I tried this - I am not sure it is better -- I will put up a follow on PR #7025
Which issue does this PR close?
Closes #6798
Rationale for this change
See #6798 -- basically
BoundedAggregateStream
is a lot of copy/paste code from the old group by hash implementation which prevents me from deleting RowFormatAlso, I think this will have the very nice side benefit of being significantly faster (due to not using
ScalarValue
to compare order by keys).And it takes less code
What changes are included in this PR?
GroupOrdering
to encapsulate tracking the state of any orderingGroupedAggregateStream
to use GroupOrdering when availableBoundedAggregateStream
codeI plan to remove the Row format as a follow on PR (#6968) to keep this one reasonably sized
Potential follow on work
There is a tradeoff between how fast the hash table is flushed (e.g. for low latency streaming use cases) and the overhead of generating output / resetting state. In this PR I took the same approach as
BoundedAggregateStream
which is to flush any completed groups as soon as possible, it might be worth adding a config parameter to tradeoff between latency and efficiency.Performance Results
I ran the benchmarks results and they showed basically no change (I am seeing significant variance in test speeds)
Are these changes tested?
Existing tests
Are there any user-facing changes?
Faster performance, smaller code size