Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

First and Last Accumulators should update with state row excluding is_set flag #7565

Merged
merged 5 commits into from
Sep 16, 2023

Conversation

viirya
Copy link
Member

@viirya viirya commented Sep 15, 2023

Which issue does this PR close?

Closes #7567.

Closes #7559

Rationale for this change

First and Last Accumulators would update itself from first/last row during merging state batches (e.g., merge_batch). However, currently it takes the whole state row (which includes is_set flag) into update_with_new_row which in turn takes all columns except for first one into orderings (so existing is_set is put there) and adds is_set flag. This ends with double is_set flags if state is called on the accumulators which have merged state batches.

Normally this is not an issue because state is not called once aggregation enters the stage of merging state batches. But in #7400, where spilling happens to call state on such accumulators to get its states and spill into disk. This leads to a hacky fix there and we should fix these two accumulators accordingly to avoid the hacky way.

What changes are included in this PR?

Are these changes tested?

Are there any user-facing changes?

@github-actions github-actions bot added the physical-expr Physical Expressions label Sep 15, 2023
@viirya
Copy link
Member Author

viirya commented Sep 15, 2023

cc @kazuyukitanimura @alamb

Copy link
Contributor

@alamb alamb left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This makes sense to me -- thank you @viirya ❤️

datafusion/physical-expr/src/aggregate/first_last.rs Outdated Show resolved Hide resolved
let merged_state = first_accumulator.state()?;
assert_eq!(merged_state.len(), state1.len());

// LastValueAccumulator
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we probably can remove some common code here but not a blocker

Copy link
Contributor

@kazuyukitanimura kazuyukitanimura left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This looks a better solution. One question that I am still trying to understand...

@@ -237,13 +231,17 @@ impl Accumulator for FirstValueAccumulator {
};
if !ordered_states[0].is_empty() {
let first_row = get_row_at_idx(&ordered_states, 0)?;
let first_ordering = &first_row[1..];
// When collecting orderings, we exclude the is_set flag from the state.
let first_ordering = &first_row[1..is_set_idx];
let sort_options = get_sort_options(&self.ordering_req);
// Either there is no existing value, or there is an earlier version in new data.
if !self.is_set
|| compare_rows(first_ordering, &self.orderings, &sort_options)?.is_lt()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am wondering if we need to do anything about compare_rows(first_ordering, &self.orderings, &sort_options) since now first_ordering is shorter?

Copy link
Member Author

@viirya viirya Sep 15, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It is not shorter. Actually, this fix makes them same length. Previously first_ordering has more one element (is_set) but it is not in orderings.

Copy link
Contributor

@kazuyukitanimura kazuyukitanimura Sep 15, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actually I found the only time self.orderings can be longer is right after the creation by try_new(). But at the time it is self.is_set = false, so it will not hit compare_rows. Then self.orderings is updated by first_ordering. So I agree that this should work.

@@ -237,13 +231,17 @@ impl Accumulator for FirstValueAccumulator {
};
if !ordered_states[0].is_empty() {
let first_row = get_row_at_idx(&ordered_states, 0)?;
let first_ordering = &first_row[1..];
// When collecting orderings, we exclude the is_set flag from the state.
let first_ordering = &first_row[1..is_set_idx];
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is_set_idx is an index for states but does first_row have the same index? Can they be different because filtered_states may have filtered rows?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please never mind, this should be the same

Copy link
Member Author

@viirya viirya Sep 15, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Doesn't filtered_states come from states? Why they have different index?? They are simply partial aggregation inputs. Do you think there are different rows in partial aggregation inputs?

@alamb alamb merged commit 61ed374 into apache:main Sep 16, 2023
@viirya
Copy link
Member Author

viirya commented Sep 16, 2023

Thank you @alamb @sunchao @kazuyukitanimura

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
physical-expr Physical Expressions
Projects
None yet
Development

Successfully merging this pull request may close these issues.

First and Last Accumulators should update with state row excluding is_set flag
4 participants