Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Simplify update_skip_aggregation_probe method #12332

Merged
merged 3 commits into from
Sep 26, 2024
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -209,7 +209,7 @@ impl GroupsAccumulatorAdapter {

let values_to_accumulate =
slice_and_maybe_filter(&values, opt_filter.as_ref(), offsets)?;
(f)(state.accumulator.as_mut(), &values_to_accumulate)?;
f(state.accumulator.as_mut(), &values_to_accumulate)?;

// clear out the state so they are empty for next
// iteration
Expand Down
22 changes: 4 additions & 18 deletions datafusion/physical-plan/src/aggregates/row_hash.rs
Original file line number Diff line number Diff line change
Expand Up @@ -188,13 +188,6 @@ impl SkipAggregationProbe {
self.should_skip
}

/// Provides an ability to externally set `should_skip` flag
/// to `false` and prohibit further state updates
fn forbid_skipping(&mut self) {
self.should_skip = false;
self.is_locked = true;
}

/// Record the number of rows that were output directly without aggregation
fn record_skipped(&mut self, batch: &RecordBatch) {
self.skipped_aggregation_rows.add(batch.num_rows());
Expand Down Expand Up @@ -1016,19 +1009,12 @@ impl GroupedHashAggregateStream {
}

/// Updates skip aggregation probe state.
///
/// In case stream has any spills, the probe is forcefully set to
/// forbid aggregation skipping, and locked, since spilling resets
/// total number of unique groups.
///
/// Note: currently spilling is not supported for Partial aggregation
fn update_skip_aggregation_probe(&mut self, input_rows: usize) {
if let Some(probe) = self.skip_aggregation_probe.as_mut() {
if !self.spill_state.spills.is_empty() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Now partial aggregation only do early emit indeed, but I guess this check is trying to be defensive, if someone decides to do spilling in the partial stage also, the early emit logic won't be break 🤔 Maybe we can leave an assertion here?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In #7400, the author tried spilling in partial stage, but he was asked to remove that. See #7400 (comment).

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I guess this check is trying to be defensive

Yes, that was the idea -- these features are mutually exclusive, with current implementation of spilling, but if they both are triggered (which never happens for now -- probably the note should be more informative) it shouldn't break query execution.

I think the change is reasonable since current code is redundant indeed, but not sure about assertion -- I suppose it's better to return not_implemented / internal error instead of panicking.

And it's probably worth to retain the note on why skip partial is incompatible with spilling.

Copy link
Contributor

@Rachelint Rachelint Sep 26, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we add another assert to ensure and emphasize that the update_state is only called in Partial mode?

IMO, when we are sure that some branches are actually unreacheable, assertion may be nice that it can let us easier to find the bug through tests?

...
assert!(self.spill_state.spills.is_empty() && self.mode == AggregateMode::Partial);
probe.update_state(input_rows, self.group_values.len());
...

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree -- adding an assertion would be great. Can you perhaps make a PR to do so?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

sure

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have submitted a pr about this #12640

probe.forbid_skipping();
} else {
probe.update_state(input_rows, self.group_values.len());
}
// Skip aggregation probe is not supported if stream has any spills,
// currently spilling is not supported for Partial aggregation
assert!(self.spill_state.spills.is_empty());
probe.update_state(input_rows, self.group_values.len());
};
}

Expand Down