-
Notifications
You must be signed in to change notification settings - Fork 1.3k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
First and Last Accumulators should update with state row excluding is_set flag #7565
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This makes sense to me -- thank you @viirya ❤️
let merged_state = first_accumulator.state()?; | ||
assert_eq!(merged_state.len(), state1.len()); | ||
|
||
// LastValueAccumulator |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
we probably can remove some common code here but not a blocker
Co-authored-by: Andrew Lamb <[email protected]>
08b689b
to
10f70f3
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This looks a better solution. One question that I am still trying to understand...
@@ -237,13 +231,17 @@ impl Accumulator for FirstValueAccumulator { | |||
}; | |||
if !ordered_states[0].is_empty() { | |||
let first_row = get_row_at_idx(&ordered_states, 0)?; | |||
let first_ordering = &first_row[1..]; | |||
// When collecting orderings, we exclude the is_set flag from the state. | |||
let first_ordering = &first_row[1..is_set_idx]; | |||
let sort_options = get_sort_options(&self.ordering_req); | |||
// Either there is no existing value, or there is an earlier version in new data. | |||
if !self.is_set | |||
|| compare_rows(first_ordering, &self.orderings, &sort_options)?.is_lt() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I am wondering if we need to do anything about compare_rows(first_ordering, &self.orderings, &sort_options)
since now first_ordering
is shorter?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It is not shorter. Actually, this fix makes them same length. Previously first_ordering
has more one element (is_set
) but it is not in orderings
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actually I found the only time self.orderings
can be longer is right after the creation by try_new()
. But at the time it is self.is_set = false
, so it will not hit compare_rows
. Then self.orderings
is updated by first_ordering
. So I agree that this should work.
@@ -237,13 +231,17 @@ impl Accumulator for FirstValueAccumulator { | |||
}; | |||
if !ordered_states[0].is_empty() { | |||
let first_row = get_row_at_idx(&ordered_states, 0)?; | |||
let first_ordering = &first_row[1..]; | |||
// When collecting orderings, we exclude the is_set flag from the state. | |||
let first_ordering = &first_row[1..is_set_idx]; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
is_set_idx
is an index for states
but does first_row
have the same index? Can they be different because filtered_states
may have filtered rows?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please never mind, this should be the same
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Doesn't filtered_states
come from states
? Why they have different index?? They are simply partial aggregation inputs. Do you think there are different rows in partial aggregation inputs?
Thank you @alamb @sunchao @kazuyukitanimura |
Which issue does this PR close?
Closes #7567.
Closes #7559
Rationale for this change
First and Last Accumulators would update itself from first/last row during merging state batches (e.g.,
merge_batch
). However, currently it takes the whole state row (which includesis_set
flag) intoupdate_with_new_row
which in turn takes all columns except for first one intoorderings
(so existingis_set
is put there) and addsis_set
flag. This ends with doubleis_set
flags ifstate
is called on the accumulators which have merged state batches.Normally this is not an issue because
state
is not called once aggregation enters the stage of merging state batches. But in #7400, where spilling happens to callstate
on such accumulators to get its states and spill into disk. This leads to a hacky fix there and we should fix these two accumulators accordingly to avoid the hacky way.What changes are included in this PR?
Are these changes tested?
Are there any user-facing changes?