Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Consolidate BoundedAggregateStream #6932

Merged
merged 12 commits into from
Jul 19, 2023

Conversation

alamb
Copy link
Contributor

@alamb alamb commented Jul 12, 2023

Which issue does this PR close?

Closes #6798

Rationale for this change

See #6798 -- basically BoundedAggregateStream is a lot of copy/paste code from the old group by hash implementation which prevents me from deleting RowFormat

Also, I think this will have the very nice side benefit of being significantly faster (due to not using ScalarValue to compare order by keys).

And it takes less code

What changes are included in this PR?

  1. Add GroupOrdering to encapsulate tracking the state of any ordering
  2. Update GroupedAggregateStream to use GroupOrdering when available
  3. Remove BoundedAggregateStream code

I plan to remove the Row format as a follow on PR (#6968) to keep this one reasonably sized

Potential follow on work

There is a tradeoff between how fast the hash table is flushed (e.g. for low latency streaming use cases) and the overhead of generating output / resetting state. In this PR I took the same approach as BoundedAggregateStream which is to flush any completed groups as soon as possible, it might be worth adding a config parameter to tradeoff between latency and efficiency.

Performance Results

I ran the benchmarks results and they showed basically no change (I am seeing significant variance in test speeds)

--------------------
Benchmark tpch_mem.json
--------------------
┏━━━━━━━━━━━━━━┳━━━━━━━━━━━┳━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━┳━━━━━━━━━━━━━━━┓
┃ Query        ┃ main_base ┃ alamb_consolidated_streaming ┃        Change ┃
┡━━━━━━━━━━━━━━╇━━━━━━━━━━━╇━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━╇━━━━━━━━━━━━━━━┩
│ QQuery 1     │  538.24ms │                     542.37ms │     no change │
│ QQuery 2     │  154.76ms │                     159.65ms │     no change │
│ QQuery 3     │  158.91ms │                     163.08ms │     no change │
│ QQuery 4     │  116.72ms │                     114.99ms │     no change │
│ QQuery 5     │  378.90ms │                     388.11ms │     no change │
│ QQuery 6     │   41.26ms │                      40.88ms │     no change │
│ QQuery 7     │  853.86ms │                     835.10ms │     no change │
│ QQuery 8     │  241.75ms │                     241.55ms │     no change │
│ QQuery 9     │  540.67ms │                     548.33ms │     no change │
│ QQuery 10    │  305.35ms │                     331.19ms │  1.08x slower │
│ QQuery 11    │  164.17ms │                     163.74ms │     no change │
│ QQuery 12    │  165.40ms │                     167.82ms │     no change │
│ QQuery 13    │  314.07ms │                     292.19ms │ +1.07x faster │
│ QQuery 14    │   49.98ms │                      46.03ms │ +1.09x faster │
│ QQuery 15    │   52.75ms │                      58.21ms │  1.10x slower │
│ QQuery 16    │  160.10ms │                     161.86ms │     no change │
│ QQuery 17    │  920.34ms │                     836.55ms │ +1.10x faster │
│ QQuery 18    │ 1570.58ms │                    1522.12ms │     no change │
│ QQuery 19    │  166.14ms │                     168.50ms │     no change │
│ QQuery 20    │  312.45ms │                     331.00ms │  1.06x slower │
│ QQuery 21    │ 1059.63ms │                    1062.09ms │     no change │
│ QQuery 22    │   84.26ms │                      85.10ms │     no change │
└──────────────┴───────────┴──────────────────────────────┴───────────────┘

Are these changes tested?

Existing tests

Are there any user-facing changes?

Faster performance, smaller code size

@github-actions github-actions bot added physical-expr Physical Expressions core Core DataFusion crate sqllogictest SQL Logic Tests (.slt) labels Jul 12, 2023
@alamb alamb force-pushed the alamb/consolidated_streaming branch 4 times, most recently from e429743 to 2da356c Compare July 14, 2023 18:41
@alamb alamb force-pushed the alamb/consolidated_streaming branch 2 times, most recently from 2c013a1 to aeac248 Compare July 15, 2023 12:44
@github-actions github-actions bot removed the sqllogictest SQL Logic Tests (.slt) label Jul 15, 2023
@alamb alamb force-pushed the alamb/consolidated_streaming branch 2 times, most recently from b42fc16 to 17cc833 Compare July 15, 2023 21:06
@alamb alamb force-pushed the alamb/consolidated_streaming branch from 17cc833 to 5b874da Compare July 15, 2023 21:10
@@ -1116,6 +1104,7 @@ fn create_accumulators(
.collect::<Result<Vec<_>>>()
}

#[allow(dead_code)]
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This will be removed in #6968

@@ -0,0 +1,170 @@
// Licensed to the Apache Software Foundation (ASF) under one
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This design tries to keep all the ordering isolated out of the main row_hash.rs logic to manage the complexity


if let Err(e) = result {
return Poll::Ready(Some(Err(e)));
// Do the grouping
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The main state machine is updated to emit data when possible (so it needs to transition back and forth between input and emitting)

/// Optional ordering information, that might allow groups to be
/// emitted from the hash table prior to seeing the end of the
/// input
group_ordering: GroupOrdering,
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am quite pleased that the extra ordering state is tracked in a single struct

) -> Result<()> {
// Convert the group keys into the row format
// Avoid reallocation when https://github.com/apache/arrow-rs/issues/4479 is available
let group_rows = self.row_converter.convert_columns(group_values)?;
let n_rows = group_rows.num_rows();

// track memory used
let group_values_size_pre = self.group_values.size();
let scratch_size_pre = self.scratch_space.size();
memory_delta.dec(self.state_size());
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I needed to update the memory accounting logic because previously the group operator never decreased its memory reservation, but now that state is cleared early, it need to shrink as well


// Update ordering information if necessary
let total_num_groups = self.group_values.num_rows();
if total_num_groups > starting_num_groups {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Here is where the ordering information is updated, and it is a few checks per batch overhead when there is no ordering

@@ -71,7 +71,7 @@ impl AccumulatorState {
fn size(&self) -> usize {
self.accumulator.size()
+ std::mem::size_of_val(self)
+ std::mem::size_of::<u32>() * self.indices.capacity()
+ self.indices.allocated_size()
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This adapter needs to be updated to account for the fact that groups can be removed and thus memory freed

/// first group index with the sort_key
current_sort: usize,
/// The sort key of group_index `current_sort`
sort_key: OwnedRow,
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I expect this code to be quite a bit faster than the current BoundedWindowAggregate as it uses the row format rather than ScalarValue, but I don't know of any benchmarks of the streaming group by code

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It occurs to me that a potentially faster way to detect the group boundaries would be to use the inequality kernel offset by one w.r.t to each other, ORing the results together, and then iterating over the set bits.

There would be some subtlety to handle nulls correctly, but it would likely be significantly faster and would not require converting to the row format

We could likely do something similar for window functions if we aren't already

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is an excellent idea -- I filed it as #7023 for follow on work

From my perspective this PR will already be faster than the existing streaming group by (which uses ScalaValue to track the sort keys) so I think it is acceptable to merge as is

Copy link
Contributor

@mustafasrepo mustafasrepo left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have left minor comments, nothing important. This PR eases the maintenance, improves readability, decreases code size. Thanks @alamb for this great work.

@alamb
Copy link
Contributor Author

alamb commented Jul 17, 2023

BTW @mustafasrepo and @ozankabak I meant to mention this before but the tests in https://github.com/apache/arrow-datafusion/blob/main/datafusion/core/tests/fuzz_cases/aggregate_fuzz.rs helped me implement and debug this code very nicely 👌

Thank you for that

Copy link
Contributor Author

@alamb alamb left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I plan to leave this open for at least another day. cc @mingmwang or @Dandandan or @yahoNanJing in case you are interested.

@alamb
Copy link
Contributor Author

alamb commented Jul 18, 2023

I plan to merge this in tomorrow morning unless anyone else wants additional time to review

Copy link
Contributor

@tustvold tustvold left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not an authority on this code, but left some comments. I think this could possibly be implemented without needing the row format at all, which would likely be close to optimal from a performance standpoint

datafusion/core/src/physical_plan/aggregates/order/full.rs Outdated Show resolved Hide resolved
/// │┌───┐│ │ ┌──────────────┐ │ │ ┗━━━━━━━━━━━━━━━━━┛ ┗━━━━━━━┛
/// ││ 0 ││ │ │ 123, "MA" │ │ │ current_sort sort_key
/// │└───┘│ │ └──────────────┘ │ │
/// │ ... │ │ ... │ │ current_sort tracks the most
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This states "most recent" but I think it means "oldest" or possibly "smallest"?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Agreed -- changed to 'smallest' in ddaf0e7

/// first group index with the sort_key
current_sort: usize,
/// The sort key of group_index `current_sort`
sort_key: OwnedRow,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It occurs to me that a potentially faster way to detect the group boundaries would be to use the inequality kernel offset by one w.r.t to each other, ORing the results together, and then iterating over the set bits.

There would be some subtlety to handle nulls correctly, but it would likely be significantly faster and would not require converting to the row format

We could likely do something similar for window functions if we aren't already

@@ -303,6 +322,16 @@ fn create_group_accumulator(
}
}

/// Extracts a successful Ok(_) or returns Poll::Ready(Some(Err(e))) with errors
macro_rules! extract_ok {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

One way to avoid this is to extract a sync function that returns a Result, allowing the use of ? and then map as necessary

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I tried this - I am not sure it is better -- I will put up a follow on PR #7025


// If we can begin emitting rows, do so,
// otherwise keep consuming input
let to_emit = if self.input_done {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How would this branch be reached?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree -- input_done can't be true here as we just got a batch. I will change it to an assert. 372196e

}
EmitTo::First(n) => {
// Clear out first n group keys by copying them to a new Rows.
// TODO file some ticket in arrow-rs to make this more efficent?
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure this can be made efficient, but I also am not sure this aggregator needs to use the row format at all

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In this case, the group_values also need to support "remove the first N values" even if we removed the row format from the partial order state

pub fn new_groups(
&mut self,
group_indices: &[usize],
batch_hashes: &[u64],
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In the case of GroupOrderingFull it seems unnecessary to be computing hashes at all, we can just group based on whenever the sort key changes?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That is a (very) good point -- noted in #7023 for follow on work

///
/// 3. Call `delta.inc(size_thing.size())`
#[derive(Debug, Default)]
pub struct MemoryDelta {
Copy link
Contributor

@tustvold tustvold Jul 19, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In https://github.com/apache/arrow-datafusion/pull/7016/files#r1267407414 I opted to just remove the delta-based accounting as I couldn't see a compelling reason to keep it around, given it was computing total memory usage, and using this to compute deltas, in order to then update the total memory usage

Copy link
Contributor Author

@alamb alamb Jul 19, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The reason there was delta accounting is that in previous incarnations, calculating the size for the overall grouping operator was a significant bottleneck (for groupings with large cardinality groups) .

Specifically Accumulator::size() showed up on a bunch of profiles.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

However, now that you mention it the delta accounting is now done per-Accumulator in the adapter so the actual group by hash operator can probably remove the delta accounting

Here is my proposal:

  1. I will add some comments on the rationale for delta accounting to this PR
  2. I merge this MR
  3. You can either remove the delta accounting in Extract GroupValues (#6969) #7016 in the main GroupByHash operator or I will do so and we can run some benchmarks to make sure it doesn't have performance impact

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Comments in e7dc2ae

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Given #7016 (comment) showed no regression I will opt to simply remove it

Copy link
Contributor Author

@alamb alamb left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you for the comments @tustvold -- they were very helpful.

pub fn new_groups(
&mut self,
group_indices: &[usize],
batch_hashes: &[u64],
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That is a (very) good point -- noted in #7023 for follow on work

/// first group index with the sort_key
current_sort: usize,
/// The sort key of group_index `current_sort`
sort_key: OwnedRow,
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is an excellent idea -- I filed it as #7023 for follow on work

From my perspective this PR will already be faster than the existing streaming group by (which uses ScalaValue to track the sort keys) so I think it is acceptable to merge as is

}
EmitTo::First(n) => {
// Clear out first n group keys by copying them to a new Rows.
// TODO file some ticket in arrow-rs to make this more efficent?
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In this case, the group_values also need to support "remove the first N values" even if we removed the row format from the partial order state

/// │┌───┐│ │ ┌──────────────┐ │ │ ┗━━━━━━━━━━━━━━━━━┛ ┗━━━━━━━┛
/// ││ 0 ││ │ │ 123, "MA" │ │ │ current_sort sort_key
/// │└───┘│ │ └──────────────┘ │ │
/// │ ... │ │ ... │ │ current_sort tracks the most
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Agreed -- changed to 'smallest' in ddaf0e7


// If we can begin emitting rows, do so,
// otherwise keep consuming input
let to_emit = if self.input_done {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree -- input_done can't be true here as we just got a batch. I will change it to an assert. 372196e

///
/// 3. Call `delta.inc(size_thing.size())`
#[derive(Debug, Default)]
pub struct MemoryDelta {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Comments in e7dc2ae

@@ -303,6 +322,16 @@ fn create_group_accumulator(
}
}

/// Extracts a successful Ok(_) or returns Poll::Ready(Some(Err(e))) with errors
macro_rules! extract_ok {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I tried this - I am not sure it is better -- I will put up a follow on PR #7025

@alamb alamb merged commit 1810a15 into apache:main Jul 19, 2023
@alamb alamb deleted the alamb/consolidated_streaming branch July 19, 2023 10:50
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
core Core DataFusion crate physical-expr Physical Expressions
Projects
None yet
Development

Successfully merging this pull request may close these issues.

Reduce duplication between BoundedAggregateStream and GroupedHashAggregateStream
4 participants