-
Notifications
You must be signed in to change notification settings - Fork 1.3k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Row accumulator support update Scalar values #6003
Changes from all commits
a64cf24
f233aff
b95c769
5bfc141
4a35fad
a57d4a5
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -31,16 +31,15 @@ use futures::stream::{Stream, StreamExt}; | |
|
||
use crate::execution::context::TaskContext; | ||
use crate::execution::memory_pool::proxy::{RawTableAllocExt, VecAllocExt}; | ||
use crate::execution::memory_pool::{MemoryConsumer, MemoryReservation}; | ||
use crate::physical_plan::aggregates::{ | ||
evaluate_group_by, evaluate_many, evaluate_optional, group_schema, AccumulatorItem, | ||
AggregateMode, PhysicalGroupBy, RowAccumulatorItem, | ||
}; | ||
use crate::physical_plan::metrics::{BaselineMetrics, RecordOutput}; | ||
use crate::physical_plan::{aggregates, AggregateExpr, PhysicalExpr}; | ||
use crate::physical_plan::{RecordBatchStream, SendableRecordBatchStream}; | ||
|
||
use crate::execution::memory_pool::{MemoryConsumer, MemoryReservation}; | ||
use arrow::array::{new_null_array, Array, ArrayRef, PrimitiveArray, UInt32Builder}; | ||
use arrow::array::*; | ||
use arrow::compute::{cast, filter}; | ||
use arrow::datatypes::{DataType, Schema, UInt32Type}; | ||
use arrow::{compute, datatypes::SchemaRef, record_batch::RecordBatch}; | ||
|
@@ -53,6 +52,7 @@ use datafusion_row::layout::RowLayout; | |
use datafusion_row::reader::{read_row, RowReader}; | ||
use datafusion_row::{MutableRecordBatch, RowType}; | ||
use hashbrown::raw::RawTable; | ||
use itertools::izip; | ||
|
||
/// Grouping aggregate with row-format aggregation states inside. | ||
/// | ||
|
@@ -410,7 +410,7 @@ impl GroupedHashAggregateStream { | |
|
||
// Update the accumulator results, according to row_aggr_state. | ||
#[allow(clippy::too_many_arguments)] | ||
fn update_accumulators( | ||
fn update_accumulators_using_batch( | ||
&mut self, | ||
groups_with_rows: &[usize], | ||
offsets: &[usize], | ||
|
@@ -491,6 +491,55 @@ impl GroupedHashAggregateStream { | |
Ok(()) | ||
} | ||
|
||
// Update the accumulator results, according to row_aggr_state. | ||
fn update_accumulators_using_scalar( | ||
&mut self, | ||
groups_with_rows: &[usize], | ||
row_values: &[Vec<ArrayRef>], | ||
row_filter_values: &[Option<ArrayRef>], | ||
) -> Result<()> { | ||
let filter_bool_array = row_filter_values | ||
.iter() | ||
.map(|filter_opt| match filter_opt { | ||
Some(f) => Ok(Some(as_boolean_array(f)?)), | ||
None => Ok(None), | ||
}) | ||
.collect::<Result<Vec<_>>>()?; | ||
|
||
for group_idx in groups_with_rows { | ||
let group_state = &mut self.aggr_state.group_states[*group_idx]; | ||
let mut state_accessor = | ||
RowAccessor::new_from_layout(self.row_aggr_layout.clone()); | ||
state_accessor.point_to(0, group_state.aggregation_buffer.as_mut_slice()); | ||
for idx in &group_state.indices { | ||
for (accumulator, values_array, filter_array) in izip!( | ||
self.row_accumulators.iter_mut(), | ||
row_values.iter(), | ||
filter_bool_array.iter() | ||
) { | ||
if values_array.len() == 1 { | ||
let scalar_value = | ||
col_to_scalar(&values_array[0], filter_array, *idx as usize)?; | ||
accumulator.update_scalar(&scalar_value, &mut state_accessor)?; | ||
} else { | ||
let scalar_values = values_array | ||
.iter() | ||
.map(|array| { | ||
col_to_scalar(array, filter_array, *idx as usize) | ||
}) | ||
.collect::<Result<Vec<_>>>()?; | ||
accumulator | ||
.update_scalar_values(&scalar_values, &mut state_accessor)?; | ||
} | ||
} | ||
} | ||
// clear the group indices in this group | ||
group_state.indices.clear(); | ||
} | ||
|
||
Ok(()) | ||
} | ||
|
||
/// Perform group-by aggregation for the given [`RecordBatch`]. | ||
/// | ||
/// If successful, this returns the additional number of bytes that were allocated during this process. | ||
|
@@ -516,35 +565,50 @@ impl GroupedHashAggregateStream { | |
for group_values in &group_by_values { | ||
let groups_with_rows = | ||
self.update_group_state(group_values, &mut allocated)?; | ||
|
||
// Collect all indices + offsets based on keys in this vec | ||
let mut batch_indices: UInt32Builder = UInt32Builder::with_capacity(0); | ||
let mut offsets = vec![0]; | ||
let mut offset_so_far = 0; | ||
for &group_idx in groups_with_rows.iter() { | ||
let indices = &self.aggr_state.group_states[group_idx].indices; | ||
batch_indices.append_slice(indices); | ||
offset_so_far += indices.len(); | ||
offsets.push(offset_so_far); | ||
// Decide the accumulators update mode, use scalar value to update the accumulators when all of the conditions are meet: | ||
// 1) The aggregation mode is Partial or Single | ||
// 2) There is not normal aggregation expressions | ||
// 3) The number of affected groups is high (entries in `aggr_state` have rows need to update). Usually the high cardinality case | ||
if matches!(self.mode, AggregateMode::Partial | AggregateMode::Single) | ||
&& normal_aggr_input_values.is_empty() | ||
&& normal_filter_values.is_empty() | ||
&& groups_with_rows.len() >= batch.num_rows() / 10 | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This magic number There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Sure, will make it configurable. |
||
{ | ||
self.update_accumulators_using_scalar( | ||
&groups_with_rows, | ||
&row_aggr_input_values, | ||
&row_filter_values, | ||
)?; | ||
} else { | ||
// Collect all indices + offsets based on keys in this vec | ||
let mut batch_indices: UInt32Builder = UInt32Builder::with_capacity(0); | ||
let mut offsets = vec![0]; | ||
let mut offset_so_far = 0; | ||
for &group_idx in groups_with_rows.iter() { | ||
let indices = &self.aggr_state.group_states[group_idx].indices; | ||
batch_indices.append_slice(indices); | ||
offset_so_far += indices.len(); | ||
offsets.push(offset_so_far); | ||
} | ||
let batch_indices = batch_indices.finish(); | ||
|
||
let row_values = get_at_indices(&row_aggr_input_values, &batch_indices)?; | ||
let normal_values = | ||
get_at_indices(&normal_aggr_input_values, &batch_indices)?; | ||
let row_filter_values = | ||
get_optional_filters(&row_filter_values, &batch_indices); | ||
let normal_filter_values = | ||
get_optional_filters(&normal_filter_values, &batch_indices); | ||
self.update_accumulators_using_batch( | ||
&groups_with_rows, | ||
&offsets, | ||
&row_values, | ||
&normal_values, | ||
&row_filter_values, | ||
&normal_filter_values, | ||
&mut allocated, | ||
)?; | ||
} | ||
let batch_indices = batch_indices.finish(); | ||
|
||
let row_values = get_at_indices(&row_aggr_input_values, &batch_indices)?; | ||
let normal_values = | ||
get_at_indices(&normal_aggr_input_values, &batch_indices)?; | ||
let row_filter_values = | ||
get_optional_filters(&row_filter_values, &batch_indices); | ||
let normal_filter_values = | ||
get_optional_filters(&normal_filter_values, &batch_indices); | ||
self.update_accumulators( | ||
&groups_with_rows, | ||
&offsets, | ||
&row_values, | ||
&normal_values, | ||
&row_filter_values, | ||
&normal_filter_values, | ||
&mut allocated, | ||
)?; | ||
} | ||
allocated += self | ||
.row_converter | ||
|
@@ -793,3 +857,21 @@ fn slice_and_maybe_filter( | |
}; | ||
Ok(filtered_arrays) | ||
} | ||
|
||
/// This method is similar to Scalar::try_from_array except for the Null handling. | ||
/// This method returns [ScalarValue::Null] instead of [ScalarValue::Type(None)] | ||
fn col_to_scalar( | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think you can replace this function with /// This method is similar to Scalar::try_from_array except for the Null handling.
/// This method returns [ScalarValue::Null] instead of [ScalarValue::Type(None)]
fn col_to_scalar(
array: &ArrayRef,
filter: &Option<&BooleanArray>,
row_index: usize,
) -> Result<ScalarValue> {
if array.is_null(row_index) {
return Ok(ScalarValue::Null);
}
if let Some(filter) = filter {
if !filter.value(row_index) {
return Ok(ScalarValue::Null);
}
}
let mut res = ScalarValue::try_from_array(array, row_index)?;
if res.is_null() {
res = ScalarValue::Null;
}
Ok(res)
}
|
||
array: &ArrayRef, | ||
filter: &Option<&BooleanArray>, | ||
row_index: usize, | ||
) -> Result<ScalarValue> { | ||
if array.is_null(row_index) { | ||
return Ok(ScalarValue::Null); | ||
} | ||
if let Some(filter) = filter { | ||
if !filter.value(row_index) { | ||
return Ok(ScalarValue::Null); | ||
} | ||
} | ||
ScalarValue::try_from_array(array, row_index) | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Seems you don't need to wrap the result in
Ok
/Result
here.There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
the
as_boolean_array
returns a Result here.