Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Row accumulator support update Scalar values #6003

Merged
merged 6 commits into from
Apr 20, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion datafusion/common/src/scalar.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1454,7 +1454,7 @@ impl std::hash::Hash for ScalarValue {
/// return a reference to the values array and the index into it for a
/// dictionary array
#[inline]
fn get_dict_value<K: ArrowDictionaryKeyType>(
pub fn get_dict_value<K: ArrowDictionaryKeyType>(
array: &dyn Array,
index: usize,
) -> (&ArrayRef, Option<usize>) {
Expand Down
146 changes: 114 additions & 32 deletions datafusion/core/src/physical_plan/aggregates/row_hash.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,16 +31,15 @@ use futures::stream::{Stream, StreamExt};

use crate::execution::context::TaskContext;
use crate::execution::memory_pool::proxy::{RawTableAllocExt, VecAllocExt};
use crate::execution::memory_pool::{MemoryConsumer, MemoryReservation};
use crate::physical_plan::aggregates::{
evaluate_group_by, evaluate_many, evaluate_optional, group_schema, AccumulatorItem,
AggregateMode, PhysicalGroupBy, RowAccumulatorItem,
};
use crate::physical_plan::metrics::{BaselineMetrics, RecordOutput};
use crate::physical_plan::{aggregates, AggregateExpr, PhysicalExpr};
use crate::physical_plan::{RecordBatchStream, SendableRecordBatchStream};

use crate::execution::memory_pool::{MemoryConsumer, MemoryReservation};
use arrow::array::{new_null_array, Array, ArrayRef, PrimitiveArray, UInt32Builder};
use arrow::array::*;
use arrow::compute::{cast, filter};
use arrow::datatypes::{DataType, Schema, UInt32Type};
use arrow::{compute, datatypes::SchemaRef, record_batch::RecordBatch};
Expand All @@ -53,6 +52,7 @@ use datafusion_row::layout::RowLayout;
use datafusion_row::reader::{read_row, RowReader};
use datafusion_row::{MutableRecordBatch, RowType};
use hashbrown::raw::RawTable;
use itertools::izip;

/// Grouping aggregate with row-format aggregation states inside.
///
Expand Down Expand Up @@ -410,7 +410,7 @@ impl GroupedHashAggregateStream {

// Update the accumulator results, according to row_aggr_state.
#[allow(clippy::too_many_arguments)]
fn update_accumulators(
fn update_accumulators_using_batch(
&mut self,
groups_with_rows: &[usize],
offsets: &[usize],
Expand Down Expand Up @@ -491,6 +491,55 @@ impl GroupedHashAggregateStream {
Ok(())
}

// Update the accumulator results, according to row_aggr_state.
fn update_accumulators_using_scalar(
&mut self,
groups_with_rows: &[usize],
row_values: &[Vec<ArrayRef>],
row_filter_values: &[Option<ArrayRef>],
) -> Result<()> {
let filter_bool_array = row_filter_values
.iter()
.map(|filter_opt| match filter_opt {
Some(f) => Ok(Some(as_boolean_array(f)?)),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Seems you don't need to wrap the result in Ok / Result here.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the as_boolean_array returns a Result here.

None => Ok(None),
})
.collect::<Result<Vec<_>>>()?;

for group_idx in groups_with_rows {
let group_state = &mut self.aggr_state.group_states[*group_idx];
let mut state_accessor =
RowAccessor::new_from_layout(self.row_aggr_layout.clone());
state_accessor.point_to(0, group_state.aggregation_buffer.as_mut_slice());
for idx in &group_state.indices {
for (accumulator, values_array, filter_array) in izip!(
self.row_accumulators.iter_mut(),
row_values.iter(),
filter_bool_array.iter()
) {
if values_array.len() == 1 {
let scalar_value =
col_to_scalar(&values_array[0], filter_array, *idx as usize)?;
accumulator.update_scalar(&scalar_value, &mut state_accessor)?;
} else {
let scalar_values = values_array
.iter()
.map(|array| {
col_to_scalar(array, filter_array, *idx as usize)
})
.collect::<Result<Vec<_>>>()?;
accumulator
.update_scalar_values(&scalar_values, &mut state_accessor)?;
}
}
}
// clear the group indices in this group
group_state.indices.clear();
}

Ok(())
}

/// Perform group-by aggregation for the given [`RecordBatch`].
///
/// If successful, this returns the additional number of bytes that were allocated during this process.
Expand All @@ -516,35 +565,50 @@ impl GroupedHashAggregateStream {
for group_values in &group_by_values {
let groups_with_rows =
self.update_group_state(group_values, &mut allocated)?;

// Collect all indices + offsets based on keys in this vec
let mut batch_indices: UInt32Builder = UInt32Builder::with_capacity(0);
let mut offsets = vec![0];
let mut offset_so_far = 0;
for &group_idx in groups_with_rows.iter() {
let indices = &self.aggr_state.group_states[group_idx].indices;
batch_indices.append_slice(indices);
offset_so_far += indices.len();
offsets.push(offset_so_far);
// Decide the accumulators update mode, use scalar value to update the accumulators when all of the conditions are meet:
// 1) The aggregation mode is Partial or Single
// 2) There is not normal aggregation expressions
// 3) The number of affected groups is high (entries in `aggr_state` have rows need to update). Usually the high cardinality case
if matches!(self.mode, AggregateMode::Partial | AggregateMode::Single)
&& normal_aggr_input_values.is_empty()
&& normal_filter_values.is_empty()
&& groups_with_rows.len() >= batch.num_rows() / 10
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This magic number 10 is used to identify high cardinality. Shall we make it configurable or document how this 10 is chosen?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure, will make it configurable.

{
self.update_accumulators_using_scalar(
&groups_with_rows,
&row_aggr_input_values,
&row_filter_values,
)?;
} else {
// Collect all indices + offsets based on keys in this vec
let mut batch_indices: UInt32Builder = UInt32Builder::with_capacity(0);
let mut offsets = vec![0];
let mut offset_so_far = 0;
for &group_idx in groups_with_rows.iter() {
let indices = &self.aggr_state.group_states[group_idx].indices;
batch_indices.append_slice(indices);
offset_so_far += indices.len();
offsets.push(offset_so_far);
}
let batch_indices = batch_indices.finish();

let row_values = get_at_indices(&row_aggr_input_values, &batch_indices)?;
let normal_values =
get_at_indices(&normal_aggr_input_values, &batch_indices)?;
let row_filter_values =
get_optional_filters(&row_filter_values, &batch_indices);
let normal_filter_values =
get_optional_filters(&normal_filter_values, &batch_indices);
self.update_accumulators_using_batch(
&groups_with_rows,
&offsets,
&row_values,
&normal_values,
&row_filter_values,
&normal_filter_values,
&mut allocated,
)?;
}
let batch_indices = batch_indices.finish();

let row_values = get_at_indices(&row_aggr_input_values, &batch_indices)?;
let normal_values =
get_at_indices(&normal_aggr_input_values, &batch_indices)?;
let row_filter_values =
get_optional_filters(&row_filter_values, &batch_indices);
let normal_filter_values =
get_optional_filters(&normal_filter_values, &batch_indices);
self.update_accumulators(
&groups_with_rows,
&offsets,
&row_values,
&normal_values,
&row_filter_values,
&normal_filter_values,
&mut allocated,
)?;
}
allocated += self
.row_converter
Expand Down Expand Up @@ -793,3 +857,21 @@ fn slice_and_maybe_filter(
};
Ok(filtered_arrays)
}

/// This method is similar to Scalar::try_from_array except for the Null handling.
/// This method returns [ScalarValue::Null] instead of [ScalarValue::Type(None)]
fn col_to_scalar(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think you can replace this function with

/// This method is similar to Scalar::try_from_array except for the Null handling.
/// This method returns [ScalarValue::Null] instead of [ScalarValue::Type(None)]
fn col_to_scalar(
    array: &ArrayRef,
    filter: &Option<&BooleanArray>,
    row_index: usize,
) -> Result<ScalarValue> {
    if array.is_null(row_index) {
        return Ok(ScalarValue::Null);
    }
    if let Some(filter) = filter {
        if !filter.value(row_index) {
            return Ok(ScalarValue::Null);
        }
    }
    let mut res = ScalarValue::try_from_array(array, row_index)?;
    if res.is_null() {
        res = ScalarValue::Null;
    }
    Ok(res)
}

ScalarValue::is_null matches both [ScalarValue::Null] and [ScalarValue::Type(None)].

array: &ArrayRef,
filter: &Option<&BooleanArray>,
row_index: usize,
) -> Result<ScalarValue> {
if array.is_null(row_index) {
return Ok(ScalarValue::Null);
}
if let Some(filter) = filter {
if !filter.value(row_index) {
return Ok(ScalarValue::Null);
}
}
ScalarValue::try_from_array(array, row_index)
}
51 changes: 51 additions & 0 deletions datafusion/core/tests/sql/aggregates.rs
Original file line number Diff line number Diff line change
Expand Up @@ -543,6 +543,57 @@ async fn count_multi_expr() -> Result<()> {
Ok(())
}

#[tokio::test]
async fn count_multi_expr_group_by() -> Result<()> {
let schema = Arc::new(Schema::new(vec![
Field::new("c1", DataType::Int32, true),
Field::new("c2", DataType::Int32, true),
Field::new("c3", DataType::Int32, true),
]));

let data = RecordBatch::try_new(
schema.clone(),
vec![
Arc::new(Int32Array::from(vec![
Some(0),
None,
Some(1),
Some(2),
None,
])),
Arc::new(Int32Array::from(vec![
Some(1),
Some(1),
Some(0),
None,
None,
])),
Arc::new(Int32Array::from(vec![
Some(10),
Some(10),
Some(10),
Some(10),
Some(10),
])),
],
)?;

let ctx = SessionContext::new();
ctx.register_batch("test", data)?;
let sql = "SELECT c3, count(c1, c2) FROM test group by c3";
let actual = execute_to_batches(&ctx, sql).await;

let expected = vec![
"+----+------------------------+",
"| c3 | COUNT(test.c1,test.c2) |",
"+----+------------------------+",
"| 10 | 2 |",
"+----+------------------------+",
];
assert_batches_sorted_eq!(expected, &actual);
Ok(())
}

#[tokio::test]
async fn simple_avg() -> Result<()> {
let schema = Schema::new(vec![Field::new("a", DataType::Int32, false)]);
Expand Down
23 changes: 19 additions & 4 deletions datafusion/physical-expr/src/aggregate/average.rs
Original file line number Diff line number Diff line change
Expand Up @@ -299,8 +299,24 @@ impl RowAccumulator for AvgRowAccumulator {
self.state_index() + 1,
accessor,
&sum::sum_batch(values, &self.sum_datatype)?,
)?;
Ok(())
)
}

fn update_scalar_values(
&mut self,
values: &[ScalarValue],
accessor: &mut RowAccessor,
) -> Result<()> {
let value = &values[0];
sum::update_avg_to_row(self.state_index(), accessor, value)
}

fn update_scalar(
&mut self,
value: &ScalarValue,
accessor: &mut RowAccessor,
) -> Result<()> {
sum::update_avg_to_row(self.state_index(), accessor, value)
}

fn merge_batch(
Expand All @@ -315,8 +331,7 @@ impl RowAccumulator for AvgRowAccumulator {

// sum
let difference = sum::sum_batch(&states[1], &self.sum_datatype)?;
sum::add_to_row(self.state_index() + 1, accessor, &difference)?;
Ok(())
sum::add_to_row(self.state_index() + 1, accessor, &difference)
}

fn evaluate(&self, accessor: &RowAccessor) -> Result<ScalarValue> {
Expand Down
25 changes: 25 additions & 0 deletions datafusion/physical-expr/src/aggregate/count.rs
Original file line number Diff line number Diff line change
Expand Up @@ -242,6 +242,31 @@ impl RowAccumulator for CountRowAccumulator {
Ok(())
}

fn update_scalar_values(
&mut self,
values: &[ScalarValue],
accessor: &mut RowAccessor,
) -> Result<()> {
if !values.iter().any(|s| matches!(s, ScalarValue::Null)) {
accessor.add_u64(self.state_index, 1)
}
Ok(())
}

fn update_scalar(
&mut self,
value: &ScalarValue,
accessor: &mut RowAccessor,
) -> Result<()> {
match value {
ScalarValue::Null => {
// do not update the accumulator
}
_ => accessor.add_u64(self.state_index, 1),
}
Ok(())
}

fn merge_batch(
&mut self,
states: &[ArrayRef],
Expand Down
40 changes: 38 additions & 2 deletions datafusion/physical-expr/src/aggregate/min_max.rs
Original file line number Diff line number Diff line change
Expand Up @@ -503,6 +503,9 @@ macro_rules! min_max_v2 {
ScalarValue::Decimal128(rhs, ..) => {
typed_min_max_v2!($INDEX, $ACC, rhs, i128, $OP)
}
ScalarValue::Null => {
// do nothing
}
e => {
return Err(DataFusionError::Internal(format!(
"MIN/MAX is not expected to receive scalars of incompatible types {:?}",
Expand Down Expand Up @@ -647,8 +650,24 @@ impl RowAccumulator for MaxRowAccumulator {
) -> Result<()> {
let values = &values[0];
let delta = &max_batch(values)?;
max_row(self.index, accessor, delta)?;
Ok(())
max_row(self.index, accessor, delta)
}

fn update_scalar_values(
&mut self,
values: &[ScalarValue],
accessor: &mut RowAccessor,
) -> Result<()> {
let value = &values[0];
max_row(self.index, accessor, value)
}

fn update_scalar(
&mut self,
value: &ScalarValue,
accessor: &mut RowAccessor,
) -> Result<()> {
max_row(self.index, accessor, value)
}

fn merge_batch(
Expand Down Expand Up @@ -894,6 +913,23 @@ impl RowAccumulator for MinRowAccumulator {
Ok(())
}

fn update_scalar_values(
&mut self,
values: &[ScalarValue],
accessor: &mut RowAccessor,
) -> Result<()> {
let value = &values[0];
min_row(self.index, accessor, value)
}

fn update_scalar(
&mut self,
value: &ScalarValue,
accessor: &mut RowAccessor,
) -> Result<()> {
min_row(self.index, accessor, value)
}

fn merge_batch(
&mut self,
states: &[ArrayRef],
Expand Down
Loading