diff --git a/datafusion/core/src/physical_plan/aggregates/bounded_aggregate_stream.rs b/datafusion/core/src/physical_plan/aggregates/bounded_aggregate_stream.rs deleted file mode 100644 index a89ef3aaffc87..0000000000000 --- a/datafusion/core/src/physical_plan/aggregates/bounded_aggregate_stream.rs +++ /dev/null @@ -1,1072 +0,0 @@ -// Licensed to the Apache Software Foundation (ASF) under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. - -//! This file implements streaming aggregation on ordered GROUP BY expressions. -//! Generated output will itself have an ordering and the executor can run with -//! bounded memory, ensuring composability in streaming cases. - -use std::cmp::min; -use std::ops::Range; -use std::sync::Arc; -use std::task::{Context, Poll}; -use std::vec; - -use ahash::RandomState; -use futures::ready; -use futures::stream::{Stream, StreamExt}; -use hashbrown::raw::RawTable; -use itertools::izip; - -use crate::physical_plan::aggregates::{ - evaluate_group_by, evaluate_many, evaluate_optional, group_schema, AggregateMode, - AggregationOrdering, GroupByOrderMode, PhysicalGroupBy, RowAccumulatorItem, -}; -use crate::physical_plan::metrics::{BaselineMetrics, RecordOutput}; -use crate::physical_plan::{aggregates, AggregateExpr, PhysicalExpr}; -use crate::physical_plan::{RecordBatchStream, SendableRecordBatchStream}; -use datafusion_execution::memory_pool::proxy::{RawTableAllocExt, VecAllocExt}; -use datafusion_execution::memory_pool::{MemoryConsumer, MemoryReservation}; -use datafusion_execution::TaskContext; - -use crate::physical_plan::aggregates::utils::{ - aggr_state_schema, col_to_scalar, get_at_indices, get_optional_filters, - read_as_batch, slice_and_maybe_filter, ExecutionState, GroupState, -}; -use arrow::array::{new_null_array, ArrayRef, UInt32Builder}; -use arrow::compute::{cast, SortColumn}; -use arrow::datatypes::DataType; -use arrow::row::{OwnedRow, RowConverter, SortField}; -use arrow::{datatypes::SchemaRef, record_batch::RecordBatch}; -use datafusion_common::cast::as_boolean_array; -use datafusion_common::utils::{evaluate_partition_ranges, get_row_at_idx}; -use datafusion_common::{Result, ScalarValue}; -use datafusion_expr::Accumulator; -use datafusion_physical_expr::hash_utils::create_hashes; -use datafusion_row::accessor::RowAccessor; -use datafusion_row::layout::RowLayout; - -use super::AggregateExec; - -/// Grouping aggregate with row-format aggregation states inside. -/// -/// For each aggregation entry, we use: -/// - [Arrow-row] represents grouping keys for fast hash computation and comparison directly on raw bytes. -/// - [WordAligned] row to store aggregation state, designed to be CPU-friendly when updates over every field are often. -/// -/// The architecture is the following: -/// -/// 1. For each input RecordBatch, update aggregation states corresponding to all appeared grouping keys. -/// 2. At the end of the aggregation (e.g. end of batches in a partition), the accumulator converts its state to a RecordBatch of a single row -/// 3. The RecordBatches of all accumulators are merged (`concatenate` in `rust/arrow`) together to a single RecordBatch. -/// 4. The state's RecordBatch is `merge`d to a new state -/// 5. The state is mapped to the final value -/// -/// [Arrow-row]: OwnedRow -/// [WordAligned]: datafusion_row::layout -pub(crate) struct BoundedAggregateStream { - schema: SchemaRef, - input: SendableRecordBatchStream, - mode: AggregateMode, - - normal_aggr_expr: Vec>, - /// Aggregate expressions not supporting row accumulation - normal_aggregate_expressions: Vec>>, - /// Filter expression for each normal aggregate expression - normal_filter_expressions: Vec>>, - - /// Aggregate expressions supporting row accumulation - row_aggregate_expressions: Vec>>, - /// Filter expression for each row aggregate expression - row_filter_expressions: Vec>>, - row_accumulators: Vec, - row_converter: RowConverter, - row_aggr_schema: SchemaRef, - row_aggr_layout: Arc, - - group_by: PhysicalGroupBy, - - aggr_state: AggregationState, - exec_state: ExecutionState, - baseline_metrics: BaselineMetrics, - random_state: RandomState, - /// size to be used for resulting RecordBatches - batch_size: usize, - /// threshold for using `ScalarValue`s to update - /// accumulators during high-cardinality aggregations for each input batch. - scalar_update_factor: usize, - /// if the result is chunked into batches, - /// last offset is preserved for continuation. - row_group_skip_position: usize, - /// keeps range for each accumulator in the field - /// first element in the array corresponds to normal accumulators - /// second element in the array corresponds to row accumulators - indices: [Vec>; 2], - /// Information on how the input of this group is ordered - aggregation_ordering: AggregationOrdering, - /// Has this stream finished producing output - is_end: bool, -} - -impl BoundedAggregateStream { - /// Create a new BoundedAggregateStream - pub fn new( - agg: &AggregateExec, - context: Arc, - partition: usize, - aggregation_ordering: AggregationOrdering, // Stores algorithm mode and output ordering - ) -> Result { - let agg_schema = Arc::clone(&agg.schema); - let agg_group_by = agg.group_by.clone(); - let agg_filter_expr = agg.filter_expr.clone(); - - let batch_size = context.session_config().batch_size(); - let scalar_update_factor = context.session_config().agg_scalar_update_factor(); - let input = agg.input.execute(partition, Arc::clone(&context))?; - let baseline_metrics = BaselineMetrics::new(&agg.metrics, partition); - - let timer = baseline_metrics.elapsed_compute().timer(); - - let mut start_idx = agg_group_by.expr.len(); - let mut row_aggr_expr = vec![]; - let mut row_agg_indices = vec![]; - let mut row_aggregate_expressions = vec![]; - let mut row_filter_expressions = vec![]; - let mut normal_aggr_expr = vec![]; - let mut normal_agg_indices = vec![]; - let mut normal_aggregate_expressions = vec![]; - let mut normal_filter_expressions = vec![]; - // The expressions to evaluate the batch, one vec of expressions per aggregation. - // Assuming create_schema() always puts group columns in front of aggregation columns, we set - // col_idx_base to the group expression count. - let all_aggregate_expressions = - aggregates::aggregate_expressions(&agg.aggr_expr, &agg.mode, start_idx)?; - let filter_expressions = match agg.mode { - AggregateMode::Partial | AggregateMode::Single => agg_filter_expr, - AggregateMode::Final | AggregateMode::FinalPartitioned => { - vec![None; agg.aggr_expr.len()] - } - }; - for ((expr, others), filter) in agg - .aggr_expr - .iter() - .zip(all_aggregate_expressions.into_iter()) - .zip(filter_expressions.into_iter()) - { - let n_fields = match agg.mode { - // In partial aggregation, we keep additional fields in order to successfully - // merge aggregation results downstream. - AggregateMode::Partial => expr.state_fields()?.len(), - _ => 1, - }; - // Stores range of each expression: - let aggr_range = Range { - start: start_idx, - end: start_idx + n_fields, - }; - if expr.row_accumulator_supported() { - row_aggregate_expressions.push(others); - row_filter_expressions.push(filter.clone()); - row_agg_indices.push(aggr_range); - row_aggr_expr.push(expr.clone()); - } else { - normal_aggregate_expressions.push(others); - normal_filter_expressions.push(filter.clone()); - normal_agg_indices.push(aggr_range); - normal_aggr_expr.push(expr.clone()); - } - start_idx += n_fields; - } - - let row_accumulators = aggregates::create_row_accumulators(&row_aggr_expr)?; - - let row_aggr_schema = aggr_state_schema(&row_aggr_expr); - - let group_schema = group_schema(&agg_schema, agg_group_by.expr.len()); - let row_converter = RowConverter::new( - group_schema - .fields() - .iter() - .map(|f| SortField::new(f.data_type().clone())) - .collect(), - )?; - - let row_aggr_layout = Arc::new(RowLayout::new(&row_aggr_schema)); - - let name = format!("BoundedAggregateStream[{partition}]"); - let aggr_state = AggregationState { - reservation: MemoryConsumer::new(name).register(context.memory_pool()), - map: RawTable::with_capacity(0), - ordered_group_states: Vec::with_capacity(0), - }; - - timer.done(); - - let exec_state = ExecutionState::ReadingInput; - - Ok(BoundedAggregateStream { - schema: agg_schema, - input, - mode: agg.mode, - normal_aggr_expr, - normal_aggregate_expressions, - normal_filter_expressions, - row_aggregate_expressions, - row_filter_expressions, - row_accumulators, - row_converter, - row_aggr_schema, - row_aggr_layout, - group_by: agg_group_by, - aggr_state, - exec_state, - baseline_metrics, - random_state: Default::default(), - batch_size, - scalar_update_factor, - row_group_skip_position: 0, - indices: [normal_agg_indices, row_agg_indices], - is_end: false, - aggregation_ordering, - }) - } -} - -impl Stream for BoundedAggregateStream { - type Item = Result; - - fn poll_next( - mut self: std::pin::Pin<&mut Self>, - cx: &mut Context<'_>, - ) -> Poll> { - let elapsed_compute = self.baseline_metrics.elapsed_compute().clone(); - - loop { - match self.exec_state { - ExecutionState::ReadingInput => { - match ready!(self.input.poll_next_unpin(cx)) { - // new batch to aggregate - Some(Ok(batch)) => { - let timer = elapsed_compute.timer(); - let result = self.group_aggregate_batch(batch); - timer.done(); - - // allocate memory - // This happens AFTER we actually used the memory, but simplifies the whole accounting and we are OK with - // overshooting a bit. Also this means we either store the whole record batch or not. - let result = result.and_then(|allocated| { - self.aggr_state.reservation.try_grow(allocated) - }); - - if let Err(e) = result { - return Poll::Ready(Some(Err(e))); - } - } - // inner had error, return to caller - Some(Err(e)) => return Poll::Ready(Some(Err(e))), - // inner is done, switch to producing output - None => { - let states = self.aggr_state.ordered_group_states.iter_mut(); - for state in states { - state.status = GroupStatus::CanEmit; - } - self.exec_state = ExecutionState::ProducingOutput; - } - } - } - - ExecutionState::ProducingOutput => { - let timer = elapsed_compute.timer(); - let result = self.create_batch_from_map(); - - timer.done(); - - match result { - // made output - Ok(Some(result)) => { - let batch = result.record_output(&self.baseline_metrics); - self.row_group_skip_position += batch.num_rows(); - // try to read more input - self.exec_state = ExecutionState::ReadingInput; - self.prune(); - return Poll::Ready(Some(Ok(batch))); - } - // end of output - Ok(None) => { - self.exec_state = ExecutionState::Done; - } - // error making output - Err(error) => return Poll::Ready(Some(Err(error))), - } - } - ExecutionState::Done => return Poll::Ready(None), - } - } - } -} - -impl RecordBatchStream for BoundedAggregateStream { - fn schema(&self) -> SchemaRef { - self.schema.clone() - } -} - -/// This utility object encapsulates the row object, the hash and the group -/// indices for a group. This information is used when executing streaming -/// GROUP BY calculations. -struct GroupOrderInfo { - /// The group by key - owned_row: OwnedRow, - /// the hash value of the group - hash: u64, - /// the range of row indices in the input batch that belong to this group - range: Range, -} - -impl BoundedAggregateStream { - /// Update the aggr_state hash table according to group_by values - /// (result of group_by_expressions) when group by expressions are - /// fully ordered. - fn update_fully_ordered_group_state( - &mut self, - group_values: &[ArrayRef], - per_group_order_info: Vec, - allocated: &mut usize, - ) -> Result> { - // 1.1 construct the key from the group values - // 1.2 construct the mapping key if it does not exist - // 1.3 add the row' index to `indices` - - // track which entries in `aggr_state` have rows in this batch to aggregate - let mut groups_with_rows = vec![]; - - let AggregationState { - map: row_map, - ordered_group_states, - .. - } = &mut self.aggr_state; - - for GroupOrderInfo { - owned_row, - hash, - range, - } in per_group_order_info - { - let entry = row_map.get_mut(hash, |(_hash, group_idx)| { - // verify that a group that we are inserting with hash is - // actually the same key value as the group in - // existing_idx (aka group_values @ row) - let ordered_group_state = &ordered_group_states[*group_idx]; - let group_state = &ordered_group_state.group_state; - owned_row.row() == group_state.group_by_values.row() - }); - - match entry { - // Existing entry for this group value - Some((_hash, group_idx)) => { - let group_state = &mut ordered_group_states[*group_idx].group_state; - - // 1.3 - if group_state.indices.is_empty() { - groups_with_rows.push(*group_idx); - }; - for row in range.start..range.end { - // remember this row - group_state.indices.push_accounted(row as u32, allocated); - } - } - // 1.2 Need to create new entry - None => { - let accumulator_set = - aggregates::create_accumulators(&self.normal_aggr_expr)?; - // Save the value of the ordering columns as Vec - let row = get_row_at_idx(group_values, range.start)?; - let ordered_columns = self - .aggregation_ordering - .order_indices - .iter() - .map(|idx| row[*idx].clone()) - .collect::>(); - - // Add new entry to group_states and save newly created index - let group_state = GroupState { - group_by_values: owned_row, - aggregation_buffer: vec![ - 0; - self.row_aggr_layout.fixed_part_width() - ], - accumulator_set, - indices: (range.start as u32..range.end as u32) - .collect::>(), // 1.3 - }; - let group_idx = ordered_group_states.len(); - - // NOTE: do NOT include the `RowGroupState` struct size in here because this is captured by - // `group_states` (see allocation down below) - *allocated += std::mem::size_of_val(&group_state.group_by_values) - + (std::mem::size_of::() - * group_state.aggregation_buffer.capacity()) - + (std::mem::size_of::() * group_state.indices.capacity()); - - // Allocation done by normal accumulators - *allocated += (std::mem::size_of::>() - * group_state.accumulator_set.capacity()) - + group_state - .accumulator_set - .iter() - .map(|accu| accu.size()) - .sum::(); - - // for hasher function, use precomputed hash value - row_map.insert_accounted( - (hash, group_idx), - |(hash, _group_index)| *hash, - allocated, - ); - - let ordered_group_state = OrderedGroupState { - group_state, - ordered_columns, - status: GroupStatus::GroupInProgress, - hash, - }; - ordered_group_states.push_accounted(ordered_group_state, allocated); - - groups_with_rows.push(group_idx); - } - }; - } - Ok(groups_with_rows) - } - - // Update the aggr_state according to group_by values (result of group_by_expressions) - fn update_group_state( - &mut self, - group_values: &[ArrayRef], - allocated: &mut usize, - ) -> Result> { - // 1.1 construct the key from the group values - // 1.2 construct the mapping key if it does not exist - // 1.3 add the row' index to `indices` - - // track which entries in `aggr_state` have rows in this batch to aggregate - let mut groups_with_rows = vec![]; - - let group_rows = self.row_converter.convert_columns(group_values)?; - let n_rows = group_rows.num_rows(); - // 1.1 Calculate the group keys for the group values - let mut batch_hashes = vec![0; n_rows]; - create_hashes(group_values, &self.random_state, &mut batch_hashes)?; - - let AggregationState { - map, - ordered_group_states: group_states, - .. - } = &mut self.aggr_state; - - for (row, hash) in batch_hashes.into_iter().enumerate() { - let entry = map.get_mut(hash, |(_hash, group_idx)| { - // verify that a group that we are inserting with hash is - // actually the same key value as the group in - // existing_idx (aka group_values @ row) - let group_state = &group_states[*group_idx].group_state; - group_rows.row(row) == group_state.group_by_values.row() - }); - - match entry { - // Existing entry for this group value - Some((_hash, group_idx)) => { - let group_state = &mut group_states[*group_idx].group_state; - - // 1.3 - if group_state.indices.is_empty() { - groups_with_rows.push(*group_idx); - }; - - group_state.indices.push_accounted(row as u32, allocated); // remember this row - } - // 1.2 Need to create new entry - None => { - let accumulator_set = - aggregates::create_accumulators(&self.normal_aggr_expr)?; - let row_values = get_row_at_idx(group_values, row)?; - let ordered_columns = self - .aggregation_ordering - .order_indices - .iter() - .map(|idx| row_values[*idx].clone()) - .collect::>(); - let group_state = GroupState { - group_by_values: group_rows.row(row).owned(), - aggregation_buffer: vec![ - 0; - self.row_aggr_layout.fixed_part_width() - ], - accumulator_set, - indices: vec![row as u32], // 1.3 - }; - let group_idx = group_states.len(); - - // NOTE: do NOT include the `GroupState` struct size in here because this is captured by - // `group_states` (see allocation down below) - *allocated += std::mem::size_of_val(&group_state.group_by_values) - + (std::mem::size_of::() - * group_state.aggregation_buffer.capacity()) - + (std::mem::size_of::() * group_state.indices.capacity()); - - // Allocation done by normal accumulators - *allocated += (std::mem::size_of::>() - * group_state.accumulator_set.capacity()) - + group_state - .accumulator_set - .iter() - .map(|accu| accu.size()) - .sum::(); - - // for hasher function, use precomputed hash value - map.insert_accounted( - (hash, group_idx), - |(hash, _group_index)| *hash, - allocated, - ); - - // Add new entry to group_states and save newly created index - let ordered_group_state = OrderedGroupState { - group_state, - ordered_columns, - status: GroupStatus::GroupInProgress, - hash, - }; - group_states.push_accounted(ordered_group_state, allocated); - - groups_with_rows.push(group_idx); - } - }; - } - Ok(groups_with_rows) - } - - // Update the accumulator results, according to aggr_state. - #[allow(clippy::too_many_arguments)] - fn update_accumulators_using_batch( - &mut self, - groups_with_rows: &[usize], - offsets: &[usize], - row_values: &[Vec], - normal_values: &[Vec], - row_filter_values: &[Option], - normal_filter_values: &[Option], - allocated: &mut usize, - ) -> Result<()> { - // 2.1 for each key in this batch - // 2.2 for each aggregation - // 2.3 `slice` from each of its arrays the keys' values - // 2.4 update / merge the accumulator with the values - // 2.5 clear indices - groups_with_rows - .iter() - .zip(offsets.windows(2)) - .try_for_each(|(group_idx, offsets)| { - let group_state = - &mut self.aggr_state.ordered_group_states[*group_idx].group_state; - // 2.2 - // Process row accumulators - self.row_accumulators - .iter_mut() - .zip(row_values.iter()) - .zip(row_filter_values.iter()) - .try_for_each(|((accumulator, aggr_array), filter_opt)| { - let values = slice_and_maybe_filter( - aggr_array, - filter_opt.as_ref(), - offsets, - )?; - let mut state_accessor = - RowAccessor::new_from_layout(self.row_aggr_layout.clone()); - state_accessor - .point_to(0, group_state.aggregation_buffer.as_mut_slice()); - match self.mode { - AggregateMode::Partial | AggregateMode::Single => { - accumulator.update_batch(&values, &mut state_accessor) - } - AggregateMode::FinalPartitioned | AggregateMode::Final => { - // note: the aggregation here is over states, not values, thus the merge - accumulator.merge_batch(&values, &mut state_accessor) - } - } - })?; - // normal accumulators - group_state - .accumulator_set - .iter_mut() - .zip(normal_values.iter()) - .zip(normal_filter_values.iter()) - .try_for_each(|((accumulator, aggr_array), filter_opt)| { - let values = slice_and_maybe_filter( - aggr_array, - filter_opt.as_ref(), - offsets, - )?; - let size_pre = accumulator.size(); - let res = match self.mode { - AggregateMode::Partial | AggregateMode::Single => { - accumulator.update_batch(&values) - } - AggregateMode::FinalPartitioned | AggregateMode::Final => { - // note: the aggregation here is over states, not values, thus the merge - accumulator.merge_batch(&values) - } - }; - let size_post = accumulator.size(); - *allocated += size_post.saturating_sub(size_pre); - res - }) - // 2.5 - .and({ - group_state.indices.clear(); - Ok(()) - }) - })?; - Ok(()) - } - - // Update the accumulator results, according to aggr_state. - fn update_accumulators_using_scalar( - &mut self, - groups_with_rows: &[usize], - row_values: &[Vec], - row_filter_values: &[Option], - ) -> Result<()> { - let filter_bool_array = row_filter_values - .iter() - .map(|filter_opt| match filter_opt { - Some(f) => Ok(Some(as_boolean_array(f)?)), - None => Ok(None), - }) - .collect::>>()?; - - for group_idx in groups_with_rows { - let group_state = - &mut self.aggr_state.ordered_group_states[*group_idx].group_state; - let mut state_accessor = - RowAccessor::new_from_layout(self.row_aggr_layout.clone()); - state_accessor.point_to(0, group_state.aggregation_buffer.as_mut_slice()); - for idx in &group_state.indices { - for (accumulator, values_array, filter_array) in izip!( - self.row_accumulators.iter_mut(), - row_values.iter(), - filter_bool_array.iter() - ) { - if values_array.len() == 1 { - let scalar_value = - col_to_scalar(&values_array[0], filter_array, *idx as usize)?; - accumulator.update_scalar(&scalar_value, &mut state_accessor)?; - } else { - let scalar_values = values_array - .iter() - .map(|array| { - col_to_scalar(array, filter_array, *idx as usize) - }) - .collect::>>()?; - accumulator - .update_scalar_values(&scalar_values, &mut state_accessor)?; - } - } - } - // clear the group indices in this group - group_state.indices.clear(); - } - - Ok(()) - } - - /// Perform group-by aggregation for the given [`RecordBatch`]. - /// - /// If successful, this returns the additional number of bytes that were allocated during this process. - /// - fn group_aggregate_batch(&mut self, batch: RecordBatch) -> Result { - // Evaluate the grouping expressions: - let group_by_values = evaluate_group_by(&self.group_by, &batch)?; - // Keep track of memory allocated: - let mut allocated = 0usize; - - // Evaluate the aggregation expressions. - // We could evaluate them after the `take`, but since we need to evaluate all - // of them anyways, it is more performant to do it while they are together. - let row_aggr_input_values = - evaluate_many(&self.row_aggregate_expressions, &batch)?; - let normal_aggr_input_values = - evaluate_many(&self.normal_aggregate_expressions, &batch)?; - let row_filter_values = evaluate_optional(&self.row_filter_expressions, &batch)?; - let normal_filter_values = - evaluate_optional(&self.normal_filter_expressions, &batch)?; - - let row_converter_size_pre = self.row_converter.size(); - for group_values in &group_by_values { - // If the input is fully sorted on its grouping keys - let groups_with_rows = if let AggregationOrdering { - mode: GroupByOrderMode::FullyOrdered, - order_indices, - ordering, - } = &self.aggregation_ordering - { - let group_rows = self.row_converter.convert_columns(group_values)?; - let n_rows = group_rows.num_rows(); - // 1.1 Calculate the group keys for the group values - let mut batch_hashes = vec![0; n_rows]; - create_hashes(group_values, &self.random_state, &mut batch_hashes)?; - let sort_column = order_indices - .iter() - .enumerate() - .map(|(idx, cur_idx)| SortColumn { - values: group_values[*cur_idx].clone(), - options: Some(ordering[idx].options), - }) - .collect::>(); - let n_rows = group_rows.num_rows(); - // determine the boundaries between groups - let ranges = evaluate_partition_ranges(n_rows, &sort_column)?; - let per_group_order_info = ranges - .into_iter() - .map(|range| GroupOrderInfo { - owned_row: group_rows.row(range.start).owned(), - hash: batch_hashes[range.start], - range, - }) - .collect::>(); - self.update_fully_ordered_group_state( - group_values, - per_group_order_info, - &mut allocated, - )? - } else { - self.update_group_state(group_values, &mut allocated)? - }; - - // Decide the accumulators update mode, use scalar value to update the accumulators when all of the conditions are meet: - // 1) The aggregation mode is Partial or Single - // 2) There is not normal aggregation expressions - // 3) The number of affected groups is high (entries in `aggr_state` have rows need to update). Usually the high cardinality case - if matches!(self.mode, AggregateMode::Partial | AggregateMode::Single) - && normal_aggr_input_values.is_empty() - && normal_filter_values.is_empty() - && groups_with_rows.len() >= batch.num_rows() / self.scalar_update_factor - { - self.update_accumulators_using_scalar( - &groups_with_rows, - &row_aggr_input_values, - &row_filter_values, - )?; - } else { - // Collect all indices + offsets based on keys in this vec - let mut batch_indices = UInt32Builder::with_capacity(0); - let mut offsets = vec![0]; - let mut offset_so_far = 0; - for &group_idx in groups_with_rows.iter() { - let indices = &self.aggr_state.ordered_group_states[group_idx] - .group_state - .indices; - batch_indices.append_slice(indices); - offset_so_far += indices.len(); - offsets.push(offset_so_far); - } - let batch_indices = batch_indices.finish(); - - let row_filter_values = - get_optional_filters(&row_filter_values, &batch_indices); - let normal_filter_values = - get_optional_filters(&normal_filter_values, &batch_indices); - if self.aggregation_ordering.mode == GroupByOrderMode::FullyOrdered { - self.update_accumulators_using_batch( - &groups_with_rows, - &offsets, - &row_aggr_input_values, - &normal_aggr_input_values, - &row_filter_values, - &normal_filter_values, - &mut allocated, - )?; - } else { - let row_values = - get_at_indices(&row_aggr_input_values, &batch_indices)?; - let normal_values = - get_at_indices(&normal_aggr_input_values, &batch_indices)?; - self.update_accumulators_using_batch( - &groups_with_rows, - &offsets, - &row_values, - &normal_values, - &row_filter_values, - &normal_filter_values, - &mut allocated, - )?; - }; - } - } - allocated += self - .row_converter - .size() - .saturating_sub(row_converter_size_pre); - - let mut new_result = false; - let last_ordered_columns = self - .aggr_state - .ordered_group_states - .last() - .map(|item| item.ordered_columns.clone()); - - if let Some(last_ordered_columns) = last_ordered_columns { - for cur_group in &mut self.aggr_state.ordered_group_states { - if cur_group.ordered_columns != last_ordered_columns { - // We will no longer receive value. Set status to GroupStatus::CanEmit - // meaning we can generate result for this group. - cur_group.status = GroupStatus::CanEmit; - new_result = true; - } - } - } - if new_result { - self.exec_state = ExecutionState::ProducingOutput; - } - - Ok(allocated) - } -} - -/// Tracks the state of the ordered grouping -#[derive(Debug, PartialEq)] -enum GroupStatus { - /// Data for current group is not complete, and new data may yet - /// arrive. - GroupInProgress, - /// Data for current group is completed, and its result can emitted. - CanEmit, - /// Result for the groups has been successfully emitted, and group - /// state can be pruned. - Emitted, -} - -/// Information about the order of the state that is built for each -/// output group. -#[derive(Debug)] -pub struct OrderedGroupState { - /// Aggregate values - group_state: GroupState, - /// The actual value of the ordered columns for this group - ordered_columns: Vec, - /// Can we emit this group? - status: GroupStatus, - /// Hash value of the group - hash: u64, -} - -/// The state of all the groups -pub struct AggregationState { - pub reservation: MemoryReservation, - - /// Logically maps group values to an index in `group_states` - /// - /// Uses the raw API of hashbrown to avoid actually storing the - /// keys in the table - /// - /// keys: u64 hashes of the GroupValue - /// values: (hash, index into `group_states`) - pub map: RawTable<(u64, usize)>, - - /// State for each group - pub ordered_group_states: Vec, -} - -impl std::fmt::Debug for AggregationState { - fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result { - // hashes are not store inline, so could only get values - let map_string = "RawTable"; - f.debug_struct("AggregationState") - .field("map", &map_string) - .field("ordered_group_states", &self.ordered_group_states) - .finish() - } -} - -impl BoundedAggregateStream { - /// Prune the groups from `[Self::ordered_group_states]` which are in - /// [`GroupStatus::Emitted`]. - /// - /// Emitted means that the result of this group has already been - /// emitted, and we are sure that these groups can not receive new - /// rows. - fn prune(&mut self) { - // clear out emitted groups - let n_partition = self.aggr_state.ordered_group_states.len(); - self.aggr_state - .ordered_group_states - .retain(|elem| elem.status != GroupStatus::Emitted); - - let n_partition_new = self.aggr_state.ordered_group_states.len(); - let n_pruned = n_partition - n_partition_new; - - // update hash table with the new indexes of the remaining groups - self.aggr_state.map.clear(); - for (idx, item) in self.aggr_state.ordered_group_states.iter().enumerate() { - self.aggr_state - .map - .insert(item.hash, (item.hash, idx), |(hash, _)| *hash); - } - self.row_group_skip_position -= n_pruned; - } - - /// Create a RecordBatch with all group keys and accumulator' states or values. - fn create_batch_from_map(&mut self) -> Result> { - let skip_items = self.row_group_skip_position; - if skip_items > self.aggr_state.ordered_group_states.len() || self.is_end { - return Ok(None); - } - self.is_end |= skip_items == self.aggr_state.ordered_group_states.len(); - if self.aggr_state.ordered_group_states.is_empty() { - let schema = self.schema.clone(); - return Ok(Some(RecordBatch::new_empty(schema))); - } - - let end_idx = min( - skip_items + self.batch_size, - self.aggr_state.ordered_group_states.len(), - ); - let group_state_chunk = - &self.aggr_state.ordered_group_states[skip_items..end_idx]; - - // Consider only the groups that can be emitted. (The ones we - // are sure that will not receive new entry.) - let group_state_chunk = group_state_chunk - .iter() - .filter(|item| item.status == GroupStatus::CanEmit) - .collect::>(); - - if group_state_chunk.is_empty() { - let schema = self.schema.clone(); - return Ok(Some(RecordBatch::new_empty(schema))); - } - - // Buffers for each distinct group (i.e. row accumulator memories) - let mut state_buffers = group_state_chunk - .iter() - .map(|gs| gs.group_state.aggregation_buffer.clone()) - .collect::>(); - - let output_fields = self.schema.fields(); - // Store row accumulator results (either final output or intermediate state): - let row_columns = match self.mode { - AggregateMode::Partial => { - read_as_batch(&state_buffers, &self.row_aggr_schema) - } - AggregateMode::Final - | AggregateMode::FinalPartitioned - | AggregateMode::Single => { - let mut results = vec![]; - for (idx, acc) in self.row_accumulators.iter().enumerate() { - let mut state_accessor = RowAccessor::new(&self.row_aggr_schema); - let current = state_buffers - .iter_mut() - .map(|buffer| { - state_accessor.point_to(0, buffer); - acc.evaluate(&state_accessor) - }) - .collect::>>()?; - // Get corresponding field for row accumulator - let field = &output_fields[self.indices[1][idx].start]; - let result = if current.is_empty() { - Ok(arrow::array::new_empty_array(field.data_type())) - } else { - let item = ScalarValue::iter_to_array(current)?; - // cast output if needed (e.g. for types like Dictionary where - // the intermediate GroupByScalar type was not the same as the - // output - cast(&item, field.data_type()) - }?; - results.push(result); - } - results - } - }; - - // Store normal accumulator results (either final output or intermediate state): - let mut columns = vec![]; - for (idx, &Range { start, end }) in self.indices[0].iter().enumerate() { - for (field_idx, field) in output_fields[start..end].iter().enumerate() { - let current = match self.mode { - AggregateMode::Partial => ScalarValue::iter_to_array( - group_state_chunk.iter().map(|group_state| { - group_state.group_state.accumulator_set[idx] - .state() - .map(|v| v[field_idx].clone()) - .expect("Unexpected accumulator state in hash aggregate") - }), - ), - AggregateMode::Final - | AggregateMode::FinalPartitioned - | AggregateMode::Single => ScalarValue::iter_to_array( - group_state_chunk.iter().map(|group_state| { - group_state.group_state.accumulator_set[idx] - .evaluate() - .expect("Unexpected accumulator state in hash aggregate") - }), - ), - }?; - // Cast output if needed (e.g. for types like Dictionary where - // the intermediate GroupByScalar type was not the same as the - // output - let result = cast(¤t, field.data_type())?; - columns.push(result); - } - } - - // Stores the group by fields - let group_buffers = group_state_chunk - .iter() - .map(|gs| gs.group_state.group_by_values.row()) - .collect::>(); - let mut output: Vec = self.row_converter.convert_rows(group_buffers)?; - - // The size of the place occupied by row and normal accumulators - let extra: usize = self - .indices - .iter() - .flatten() - .map(|Range { start, end }| end - start) - .sum(); - let empty_arr = new_null_array(&DataType::Null, 1); - output.extend(std::iter::repeat(empty_arr).take(extra)); - - // Write results of both accumulator types to the corresponding location in - // the output schema: - let results = [columns.into_iter(), row_columns.into_iter()]; - for (outer, mut current) in results.into_iter().enumerate() { - for &Range { start, end } in self.indices[outer].iter() { - for item in output.iter_mut().take(end).skip(start) { - *item = current.next().expect("Columns cannot be empty"); - } - } - } - - // Set status of the emitted groups to GroupStatus::Emitted mode. - for gs in self.aggr_state.ordered_group_states[skip_items..end_idx].iter_mut() { - if gs.status == GroupStatus::CanEmit { - gs.status = GroupStatus::Emitted; - } - } - - Ok(Some(RecordBatch::try_new(self.schema.clone(), output)?)) - } -} diff --git a/datafusion/core/src/physical_plan/aggregates/mod.rs b/datafusion/core/src/physical_plan/aggregates/mod.rs index 5edb0476778d5..88794dc0d5c62 100644 --- a/datafusion/core/src/physical_plan/aggregates/mod.rs +++ b/datafusion/core/src/physical_plan/aggregates/mod.rs @@ -18,8 +18,7 @@ //! Aggregates functionalities use crate::physical_plan::aggregates::{ - bounded_aggregate_stream::BoundedAggregateStream, no_grouping::AggregateStream, - row_hash::GroupedHashAggregateStream, + no_grouping::AggregateStream, row_hash::GroupedHashAggregateStream, }; use crate::physical_plan::metrics::{ExecutionPlanMetricsSet, MetricsSet}; use crate::physical_plan::{ @@ -46,10 +45,9 @@ use std::any::Any; use std::collections::HashMap; use std::sync::Arc; -mod bounded_aggregate_stream; mod no_grouping; +mod order; mod row_hash; -mod utils; pub use datafusion_expr::AggregateFunction; use datafusion_physical_expr::aggregate::is_order_sensitive; @@ -89,7 +87,7 @@ pub enum AggregateMode { /// Specifically, each distinct combination of the relevant columns /// are contiguous in the input, and once a new combination is seen /// previous combinations are guaranteed never to appear again -#[derive(Debug, Clone, PartialEq, Eq)] +#[derive(Debug, Clone, Copy, PartialEq, Eq)] pub enum GroupByOrderMode { /// The input is not (known to be) ordered by any of the /// expressions in the GROUP BY clause. @@ -212,7 +210,7 @@ impl PartialEq for PhysicalGroupBy { enum StreamType { AggregateStream(AggregateStream), GroupedHashAggregateStream(GroupedHashAggregateStream), - BoundedAggregate(BoundedAggregateStream), + //BoundedAggregate(BoundedAggregateStream), } impl From for SendableRecordBatchStream { @@ -220,7 +218,7 @@ impl From for SendableRecordBatchStream { match stream { StreamType::AggregateStream(stream) => Box::pin(stream), StreamType::GroupedHashAggregateStream(stream) => Box::pin(stream), - StreamType::BoundedAggregate(stream) => Box::pin(stream), + //StreamType::BoundedAggregate(stream) => Box::pin(stream), } } } @@ -719,14 +717,6 @@ impl AggregateExec { Ok(StreamType::AggregateStream(AggregateStream::new( self, context, partition, )?)) - } else if let Some(aggregation_ordering) = &self.aggregation_ordering { - let aggregation_ordering = aggregation_ordering.clone(); - Ok(StreamType::BoundedAggregate(BoundedAggregateStream::new( - self, - context, - partition, - aggregation_ordering, - )?)) } else { Ok(StreamType::GroupedHashAggregateStream( GroupedHashAggregateStream::new(self, context, partition)?, @@ -1105,6 +1095,7 @@ fn create_accumulators( .collect::>>() } +#[allow(dead_code)] fn create_row_accumulators( aggr_expr: &[Arc], ) -> Result> { diff --git a/datafusion/core/src/physical_plan/aggregates/order/full.rs b/datafusion/core/src/physical_plan/aggregates/order/full.rs new file mode 100644 index 0000000000000..d8820c826d437 --- /dev/null +++ b/datafusion/core/src/physical_plan/aggregates/order/full.rs @@ -0,0 +1,154 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use crate::physical_expr::EmitTo; + +/// Tracks grouping state when the data is ordered entirely by its +/// group keys +/// +/// When the group values are sorted, as soon as we see group `n+1` we +/// know we will never see any rows for group `n again and thus they +/// can be emitted. +/// +/// ```text +/// SUM(amt) GROUP BY id +/// +/// The input is sorted by id +/// +/// +/// ┌─────┐ ┌──────────────────┐ +/// │┌───┐│ │ ┌──────────────┐ │ ┏━━━━━━━━━━━━━━┓ +/// ││ 0 ││ │ │ 123 │ │ ┌─────┃ 13 ┃ +/// │└───┘│ │ └──────────────┘ │ │ ┗━━━━━━━━━━━━━━┛ +/// │ ... │ │ ... │ │ +/// │┌───┐│ │ ┌──────────────┐ │ │ current +/// ││12 ││ │ │ 234 │ │ │ +/// │├───┤│ │ ├──────────────┤ │ │ +/// ││12 ││ │ │ 234 │ │ │ +/// │├───┤│ │ ├──────────────┤ │ │ +/// ││13 ││ │ │ 456 │◀┼───┘ +/// │└───┘│ │ └──────────────┘ │ +/// └─────┘ └──────────────────┘ +/// +/// group indices group_values current tracks the most +/// (in group value recent group index +/// order) +/// ``` +/// +/// In the above diagram the current group is `13` groups `0..12` can +/// be emitted. Group `13` can not be emitted because it may have more +/// values in the next batch. +#[derive(Debug)] +pub(crate) struct GroupOrderingFull { + state: State, + /// Hash values for groups in 0..completed + hashes: Vec, +} + +#[derive(Debug)] +enum State { + /// Have seen no input yet + Start, + + /// Have seen all groups with indexes less than `completed_index` + InProgress { + /// index of the current group for which values are being + /// generated (can emit current - 1) + current: usize, + }, + + /// Seen end of input, all groups can be emitted + Complete, +} + +impl GroupOrderingFull { + pub fn new() -> Self { + Self { + state: State::Start, + hashes: vec![], + } + } + + // How far can data be emitted? Returns None if no data can be + // emitted + pub fn emit_to(&self) -> Option { + match &self.state { + State::Start => None, + State::InProgress { current, .. } => { + // Can not emit if we are still on the first row, + // otherwise emit all rows prior to the current group + if *current == 0 { + None + } else { + Some(EmitTo::First(*current)) + } + } + State::Complete { .. } => Some(EmitTo::All), + } + } + + /// removes the first n groups from this ordering, shifting all + /// existing indexes down by N and returns a reference to the + /// updated hashes + pub fn remove_groups(&mut self, n: usize) -> &[u64] { + match &mut self.state { + State::Start => panic!("invalid state: start"), + State::InProgress { current } => { + // shift down by n + assert!(*current >= n); + *current -= n; + self.hashes.drain(0..n); + } + State::Complete { .. } => panic!("invalid state: complete"), + }; + &self.hashes + } + + /// Note that the input is complete so any outstanding groups are done as well + pub fn input_done(&mut self) { + self.state = match self.state { + State::Start => State::Complete, + State::InProgress { .. } => State::Complete, + State::Complete => State::Complete, + }; + } + + /// Note that we saw a new distinct group + pub fn new_group(&mut self, group_index: usize, hash: u64) { + self.state = match self.state { + State::Start => { + assert_eq!(group_index, 0); + self.hashes.push(hash); + State::InProgress { + current: group_index, + } + } + State::InProgress { current } => { + // expect to see group_index the next after this + assert_eq!(group_index, self.hashes.len()); + assert_eq!(group_index, current + 1); + self.hashes.push(hash); + State::InProgress { + current: group_index, + } + } + State::Complete { .. } => { + panic!("Saw new group after input was complete"); + } + }; + } +} diff --git a/datafusion/core/src/physical_plan/aggregates/order/mod.rs b/datafusion/core/src/physical_plan/aggregates/order/mod.rs new file mode 100644 index 0000000000000..6f62ebe1b8eef --- /dev/null +++ b/datafusion/core/src/physical_plan/aggregates/order/mod.rs @@ -0,0 +1,117 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! Order tracking for memory bounded grouping + +use arrow_schema::Schema; +use datafusion_common::Result; +use datafusion_physical_expr::EmitTo; + +use super::{AggregationOrdering, GroupByOrderMode}; + +mod full; +mod partial; + +pub(crate) use full::GroupOrderingFull; +pub(crate) use partial::GroupOrderingPartial; + +/// Group ordering state, if present, for each group in the hash +/// table. +#[derive(Debug)] +pub(crate) enum GroupOrdering { + /// Groups are not ordered + None, + /// Groups are orderd by some pre-set of the group keys + Partial(GroupOrderingPartial), + /// Groups are entirely contiguous, + Full(GroupOrderingFull), + /// The ordering was temporarily taken / borrowed + /// Note: `Self::Taken` is left when the GroupOrdering is temporarily + /// taken to satisfy the borrow checker. If an error happens + /// before it can be restored the ordering information is lost and + /// execution can not proceed. By panic'ing the behavior remains + /// well defined if something tries to use a ordering that was + /// taken. + Taken, +} + +// Default is used for `std::mem::take` to satisfy the borrow checker +impl Default for GroupOrdering { + fn default() -> Self { + Self::Taken + } +} + +impl GroupOrdering { + /// Create a `GroupOrdering` for the ordering + pub fn try_new( + input_schema: &Schema, + ordering: &AggregationOrdering, + ) -> Result { + let AggregationOrdering { + mode, + order_indices, + ordering, + } = ordering; + + Ok(match mode { + GroupByOrderMode::None => GroupOrdering::None, + GroupByOrderMode::PartiallyOrdered => { + let partial = + GroupOrderingPartial::try_new(input_schema, order_indices, ordering)?; + GroupOrdering::Partial(partial) + } + GroupByOrderMode::FullyOrdered => { + GroupOrdering::Full(GroupOrderingFull::new()) + } + }) + } + + // How far can data be emitted based on groups seen so far? + // Returns `None` if nothing can be emitted at this point based on + // ordering information + pub fn emit_to(&self) -> Option { + match self { + GroupOrdering::Taken => panic!("group state taken"), + GroupOrdering::None => None, + GroupOrdering::Partial(partial) => partial.emit_to(), + GroupOrdering::Full(full) => full.emit_to(), + } + } + + /// Updates the state the input is done + pub fn input_done(&mut self) { + match self { + GroupOrdering::Taken => panic!("group state taken"), + GroupOrdering::None => {} + GroupOrdering::Partial(partial) => partial.input_done(), + GroupOrdering::Full(full) => full.input_done(), + } + } + + /// removes the first n groups from this ordering, shifting all + /// existing indexes down by N and returns a reference to the + /// updated hashes + pub fn remove_groups(&mut self, n: usize) -> &[u64] { + match self { + GroupOrdering::Taken => panic!("group state taken"), + GroupOrdering::None => &[], + GroupOrdering::Partial(partial) => partial.remove_groups(n), + GroupOrdering::Full(full) => full.remove_groups(n), + } + } +} diff --git a/datafusion/core/src/physical_plan/aggregates/order/partial.rs b/datafusion/core/src/physical_plan/aggregates/order/partial.rs new file mode 100644 index 0000000000000..03741a2bd4672 --- /dev/null +++ b/datafusion/core/src/physical_plan/aggregates/order/partial.rs @@ -0,0 +1,240 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use crate::physical_expr::EmitTo; +use arrow::row::{OwnedRow, Row, RowConverter, Rows, SortField}; +use arrow_array::ArrayRef; +use arrow_schema::Schema; +use datafusion_common::Result; +use datafusion_physical_expr::PhysicalSortExpr; + +/// Tracks grouping state when the data is ordered by some subset of its +/// group keys. +/// +/// I this case, once we see the next sort keys value, we know we will +/// never see groups with that sort key again, so we can emit all +/// previous groups to that point. +/// +/// ```text +/// +/// SUM(amt) GROUP BY id, state +/// +/// The input is sorted by state +/// +/// ┏━━━━━━━━━━━━━━━━━┓ ┏━━━━━━━┓ +/// ┌─────┐ ┌───────────────────┐ ┌─────┃ 9 ┃ ┃ "MD" ┃ +/// │┌───┐│ │ ┌──────────────┐ │ │ ┗━━━━━━━━━━━━━━━━━┛ ┗━━━━━━━┛ +/// ││ 0 ││ │ │ 123, "MA" │ │ │ current_sort sort_key +/// │└───┘│ │ └──────────────┘ │ │ +/// │ ... │ │ ... │ │ current_sort tracks the most +/// │┌───┐│ │ ┌──────────────┐ │ │ recent group index that had +/// ││12 ││ │ │ 765, "MA" │ │ │ the same sort_key as current +/// │├───┤│ │ ├──────────────┤ │ │ +/// ││12 ││ │ │ 923, "MD" │◀─┼─┘ +/// │├───┤│ │ ├──────────────┤ │ ┏━━━━━━━━━━━━━━┓ +/// ││13 ││ │ │ 345, "MD" │◀─┼────────┃ 12 ┃ +/// │└───┘│ │ └──────────────┘ │ ┗━━━━━━━━━━━━━━┛ +/// └─────┘ └───────────────────┘ current +/// group indices +/// (in group value group_values current tracks the most +/// order) recent group index +///``` +#[derive(Debug)] +pub(crate) struct GroupOrderingPartial { + /// State machine + state: State, + + /// The indexes in the group by expresion that form the sort key + order_indices: Vec, + + /// Converter for the columns of the group by that make up the sort key. + row_converter: RowConverter, + + /// Hash values for groups in 0..completed + hashes: Vec, +} + +/// Tracks the state of the the grouping +#[derive(Debug, Default)] +enum State { + /// Taken to potentially be updated. + #[default] + Taken, + + /// Have seen no input yet + Start, + + /// Have seen all groups with indexes less than `completed_index` + InProgress { + /// first group index with with sort_key + current_sort: usize, + /// The sort key of group_index `current_sort + sort_key: OwnedRow, + /// index of the current group for which values are being + /// generated + current: usize, + }, + + /// Seen end of input, all groups can be emitted + Complete, +} + +impl GroupOrderingPartial { + pub fn try_new( + input_schema: &Schema, + order_indices: &[usize], + ordering: &[PhysicalSortExpr], + ) -> Result { + assert!(!order_indices.is_empty()); + assert_eq!(order_indices.len(), ordering.len()); + + let fields = ordering + .iter() + .map(|sort_expr| { + Ok(SortField::new_with_options( + sort_expr.expr.data_type(input_schema)?, + sort_expr.options, + )) + }) + .collect::>>()?; + + Ok(Self { + state: State::Start, + order_indices: order_indices.to_vec(), + row_converter: RowConverter::new(fields)?, + hashes: vec![], + }) + } + + /// Creates SortKeys from the group values + /// + /// For example, if group_values had `A, B, C` but the input was + /// only sorted on `B` and `C` this sould return rows for (`B`, + /// `C`) + pub fn compute_sort_keys(&mut self, group_values: &[ArrayRef]) -> Result { + // Take only the columns that are in the sort key + let sort_values: Vec<_> = self + .order_indices + .iter() + .map(|&idx| group_values[idx].clone()) + .collect(); + + Ok(self.row_converter.convert_columns(&sort_values)?) + } + + /// How far can data be emitted? Returns None if no data can be + /// emitted + pub fn emit_to(&self) -> Option { + match &self.state { + State::Taken => unreachable!("State previously taken"), + State::Start => None, + State::InProgress { current_sort, .. } => { + // Can not emit if we are still on the first row sort + // row otherwise we can emit all groups that had earlier sort keys + // + if *current_sort == 0 { + None + } else { + Some(EmitTo::First(*current_sort)) + } + } + State::Complete => Some(EmitTo::All), + } + } + + /// removes the first n groups from this ordering, shifting all + /// existing indexes down by N and returns a reference to the + /// updated hashes + pub fn remove_groups(&mut self, n: usize) -> &[u64] { + match &mut self.state { + State::Taken => unreachable!("State previously taken"), + State::Start => panic!("invalid state: start"), + State::InProgress { + current_sort, + current, + sort_key: _, + } => { + // shift indexes down by n + assert!(*current >= n); + *current -= n; + assert!(*current_sort >= n); + *current_sort -= n; + // Note sort_key stays the same, we are just translating group indexes + self.hashes.drain(0..n); + } + State::Complete { .. } => panic!("invalid state: complete"), + }; + &self.hashes + } + + /// Note that the input is complete so any outstanding groups are done as well + pub fn input_done(&mut self) { + self.state = match self.state { + State::Taken => unreachable!("State previously taken"), + State::Start => State::Complete, + State::InProgress { .. } => State::Complete, + State::Complete => State::Complete, + }; + } + + /// Note that we saw a new distinct group with the specified groups sort key + pub fn new_group(&mut self, group_index: usize, group_sort_key: Row, hash: u64) { + let old_state = std::mem::take(&mut self.state); + self.state = match old_state { + State::Taken => unreachable!("State previously taken"), + State::Start => { + assert_eq!(group_index, 0); + self.hashes.push(hash); + State::InProgress { + current_sort: 0, + sort_key: group_sort_key.owned(), + current: 0, + } + } + State::InProgress { + current_sort, + sort_key, + current, + } => { + // expect to see group_index the next after this + assert_eq!(group_index, self.hashes.len()); + assert_eq!(group_index, current + 1); + self.hashes.push(hash); + + // Does this group have seen a new sort_key? + if sort_key.row() != group_sort_key { + State::InProgress { + current_sort: group_index, + sort_key: group_sort_key.owned(), + current: group_index, + } + } + // same sort key + else { + State::InProgress { + current_sort, + sort_key, + current: group_index, + } + } + } + State::Complete => { + panic!("Saw new group after the end of input"); + } + } + } +} diff --git a/datafusion/core/src/physical_plan/aggregates/row_hash.rs b/datafusion/core/src/physical_plan/aggregates/row_hash.rs index e272b60b054a8..010256d4916b2 100644 --- a/datafusion/core/src/physical_plan/aggregates/row_hash.rs +++ b/datafusion/core/src/physical_plan/aggregates/row_hash.rs @@ -15,12 +15,10 @@ // specific language governing permissions and limitations // under the License. -//! Hash aggregation through row format -//! -//! POC demonstration of GroupByHashApproach +//! Hash aggregation use datafusion_physical_expr::{ - AggregateExpr, GroupsAccumulator, GroupsAccumulatorAdapter, + AggregateExpr, EmitTo, GroupsAccumulator, GroupsAccumulatorAdapter, }; use log::debug; use std::sync::Arc; @@ -58,6 +56,7 @@ pub(crate) enum ExecutionState { Done, } +use super::order::GroupOrdering; use super::AggregateExec; /// Hash based Grouping Aggregator @@ -199,10 +198,18 @@ pub(crate) struct GroupedHashAggregateStream { /// max rows in output RecordBatches batch_size: usize, + + /// Optional ordering information, that might allow the hash table + /// to emitted prior to seeing more input + group_ordering: GroupOrdering, + + /// Have we seen the end of the input + input_done: bool, } impl GroupedHashAggregateStream { /// Create a new GroupedHashAggregateStream + /// pub fn new( agg: &AggregateExec, context: Arc, @@ -256,6 +263,13 @@ impl GroupedHashAggregateStream { let map = RawTable::with_capacity(0); let group_values = row_converter.empty_rows(0, 0); + let group_ordering = + if let Some(aggregation_ordering) = agg.aggregation_ordering.as_ref() { + GroupOrdering::try_new(&group_schema, aggregation_ordering)? + } else { + GroupOrdering::None + }; + timer.done(); let exec_state = ExecutionState::ReadingInput; @@ -277,6 +291,8 @@ impl GroupedHashAggregateStream { baseline_metrics, random_state: Default::default(), batch_size, + group_ordering, + input_done: false, }) } } @@ -301,6 +317,16 @@ fn create_group_accumulator( } } +/// Extracts a successful Ok(_) or returns Poll::Ready(Some(Err(e))) with errors +macro_rules! extract_ok { + ($RES: expr) => {{ + match $RES { + Ok(v) => v, + Err(e) => return Poll::Ready(Some(Err(e))), + } + }}; +} + impl Stream for GroupedHashAggregateStream { type Item = Result; @@ -318,36 +344,47 @@ impl Stream for GroupedHashAggregateStream { // new batch to aggregate Some(Ok(batch)) => { let timer = elapsed_compute.timer(); - let result = self.group_aggregate_batch(batch); - timer.done(); + // Do the grouping + let allocated = self.group_aggregate_batch(batch); - // allocate memory AFTER we actually used - // the memory, which simplifies the whole + // Register allocated memory AFTER we + // actually used it, to simplify the whole // accounting and we are OK with // overshooting a bit. // // Also this means we either store the // whole record batch or not. - let result = result.and_then(|allocated| { + extract_ok!(allocated.and_then(|allocated| { self.reservation.try_grow(allocated) - }); - - if let Err(e) = result { - return Poll::Ready(Some(Err(e))); + })); + + // If we can begin emitting rows, do so, + // otherwise keep consuming input + let to_emit = if self.input_done { + Some(EmitTo::All) + } else { + self.group_ordering.emit_to() + }; + + if let Some(to_emit) = to_emit { + let batch = + extract_ok!(self.create_batch_from_map(to_emit)); + self.exec_state = ExecutionState::ProducingOutput(batch); } + timer.done(); + } + Some(Err(e)) => { + // inner had error, return to caller + return Poll::Ready(Some(Err(e))); } - // inner had error, return to caller - Some(Err(e)) => return Poll::Ready(Some(Err(e))), - // inner is done, producing output None => { + // inner is done, emit all rows and switch to producing output + self.input_done = true; + self.group_ordering.input_done(); let timer = elapsed_compute.timer(); - match self.create_batch_from_map() { - Ok(batch) => { - self.exec_state = - ExecutionState::ProducingOutput(batch) - } - Err(e) => return Poll::Ready(Some(Err(e))), - } + let batch = + extract_ok!(self.create_batch_from_map(EmitTo::All)); + self.exec_state = ExecutionState::ProducingOutput(batch); timer.done(); } } @@ -356,7 +393,11 @@ impl Stream for GroupedHashAggregateStream { ExecutionState::ProducingOutput(batch) => { // slice off a part of the batch, if needed let output_batch = if batch.num_rows() <= self.batch_size { - self.exec_state = ExecutionState::Done; + if self.input_done { + self.exec_state = ExecutionState::Done; + } else { + self.exec_state = ExecutionState::ReadingInput + } batch } else { // output first batch_size rows @@ -397,15 +438,92 @@ impl GroupedHashAggregateStream { group_values: &[ArrayRef], allocated: &mut usize, ) -> Result<()> { + // track memory used + let group_values_size_pre = self.group_values.size(); + let scratch_size_pre = self.scratch_space.size(); + + // take ownership of the group ordering to satisfy the borrow + // checker, leaving a "Taken" item in its place + let mut group_ordering = std::mem::take(&mut self.group_ordering); + + match &mut group_ordering { + // Tried to run after error. See comments on GroupOrdering::Taken + GroupOrdering::Taken => panic!("order state was taken"), + GroupOrdering::None => { + self.update_group_state_inner(group_values, allocated, |_, _, _| {})?; + } + // ordering columns are a subset of the groups and we + // need to make a new Rows with just the ordered + // columns to determine when the next order by key has + // been emitted + GroupOrdering::Partial(group_ordering_partial) => { + // compute the sort key values for each group + let sort_keys = group_ordering_partial.compute_sort_keys(group_values)?; + + self.update_group_state_inner( + group_values, + allocated, + |row_index, group_index, hash| { + group_ordering_partial.new_group( + group_index, + sort_keys.row(row_index), + hash, + ); + }, + )?; + } + GroupOrdering::Full(group_ordering_full) => { + self.update_group_state_inner( + group_values, + allocated, + |_row_index, group_index, hash| { + group_ordering_full.new_group(group_index, hash); + }, + )?; + } + } + + // put the updated group ordering back + self.group_ordering = group_ordering; + + // account for memory growth in scratch space + *allocated += self.scratch_space.size(); + *allocated -= scratch_size_pre; // subtract after adding to avoid underflow + + // account for any memory increase used to store group_values + *allocated += self.group_values.size(); + *allocated -= group_values_size_pre; // subtract after adding to avoid underflow + + Ok(()) + } + + /// See comments on [`Self::update_group_state`] + /// + /// Invokes new_group_fn(input_row_idx, group_idx, hash, group_row) whenever a new + /// group is seen (aka inserted into the hash table) + /// + /// `group_idx` is the new group's index + /// + /// hash: the hash value for this group's row + /// + /// `group_row` group by values for the group, in `Row` format + /// + /// Note this function is templated so that the dispatch cost of + /// figuring out what to do is not done in the inner loop + fn update_group_state_inner( + &mut self, + group_values: &[ArrayRef], + allocated: &mut usize, + mut updated_fn: F, + ) -> Result<()> + where + F: FnMut(usize, usize, u64), + { // Convert the group keys into the row format // Avoid reallocation when https://github.com/apache/arrow-rs/issues/4479 is available let group_rows = self.row_converter.convert_columns(group_values)?; let n_rows = group_rows.num_rows(); - // track memory used - let group_values_size_pre = self.group_values.size(); - let scratch_size_pre = self.scratch_space.size(); - // tracks to which group each of the input rows belongs let group_indices = &mut self.scratch_space.current_group_indices; group_indices.clear(); @@ -429,8 +547,10 @@ impl GroupedHashAggregateStream { Some((_hash, group_idx)) => *group_idx, // 1.2 Need to create new entry for the group None => { - // Add new entry to aggr_state and save newly created index + // Assign a new group_index, and save group_value let group_idx = self.group_values.num_rows(); + // Do any special update + (updated_fn)(row, group_idx, hash); self.group_values.push(group_rows.row(row)); // for hasher function, use precomputed hash value @@ -444,15 +564,6 @@ impl GroupedHashAggregateStream { }; group_indices.push(group_idx); } - - // account for memory growth in scratch space - *allocated += self.scratch_space.size(); - *allocated -= scratch_size_pre; // subtract after adding to avoid underflow - - // account for any memory increase used to store group_values - *allocated += self.group_values.size(); - *allocated -= group_values_size_pre; // subtract after adding to avoid underflow - Ok(()) } @@ -526,28 +637,80 @@ impl GroupedHashAggregateStream { Ok(allocated) } - /// Create an output RecordBatch with all group keys and accumulator states/values - fn create_batch_from_map(&mut self) -> Result { + /// Create an output RecordBatch with the group keys and + /// accumulator states/values specified in emit_to + fn create_batch_from_map(&mut self, emit_to: EmitTo) -> Result { if self.group_values.num_rows() == 0 { - let schema = self.schema.clone(); - return Ok(RecordBatch::new_empty(schema)); + return Ok(RecordBatch::new_empty(self.schema())); } + let output = self.build_output(emit_to)?; + self.remove_emitted(emit_to)?; + let batch = RecordBatch::try_new(self.schema(), output)?; + Ok(batch) + } + + /// Creates the output: + /// + /// (group value, group value 2, ... agg value 1, agg value 2, ...) + fn build_output(&mut self, emit_to: EmitTo) -> Result> { // First output rows are the groups - let groups_rows = self.group_values.iter(); - let mut output: Vec = self.row_converter.convert_rows(groups_rows)?; + let mut output: Vec = match emit_to { + EmitTo::All => { + let groups_rows = self.group_values.iter(); + self.row_converter.convert_rows(groups_rows)? + } + EmitTo::First(n) => { + let groups_rows = self.group_values.iter().take(n); + self.row_converter.convert_rows(groups_rows)? + } + }; // Next output each aggregate value, from the accumulators for acc in self.accumulators.iter_mut() { + use AggregateMode::*; match self.mode { - AggregateMode::Partial => output.extend(acc.state()?), - AggregateMode::Final - | AggregateMode::FinalPartitioned - | AggregateMode::Single => output.push(acc.evaluate()?), + Partial => output.extend(acc.state(emit_to)?), + Final | FinalPartitioned | Single => output.push(acc.evaluate(emit_to)?), } } - Ok(RecordBatch::try_new(self.schema.clone(), output)?) + Ok(output) + } + + /// Removes the first `n` groups, and adjust all + /// group_indices appropriately + fn remove_emitted(&mut self, emit_to: EmitTo) -> Result<()> { + match emit_to { + EmitTo::All => { + // Eventually when we allow early dumping of the hash + // table (TODO write ticket) we can clear out all the + // state (hash table, and groups) + //self.map.clear(); + Ok(()) + } + EmitTo::First(n) => { + // Clear out first n group keys + // TODO file some ticket in arrow-rs to make this more efficent? + let mut new_group_values = self.row_converter.empty_rows(0, 0); + for row in self.group_values.iter().skip(n) { + new_group_values.push(row); + } + std::mem::swap(&mut new_group_values, &mut self.group_values); + + // rebuild hash table (TODO we could just remove the + // entries for each group that was emitted rather than + // rebuilding the whole thing + + let hashes = self.group_ordering.remove_groups(n); + assert_eq!(hashes.len(), self.group_values.num_rows()); + self.map.clear(); + for (idx, &hash) in hashes.iter().enumerate() { + self.map.insert(hash, (hash, idx), |(hash, _)| *hash); + } + Ok(()) + } + } } } diff --git a/datafusion/core/src/physical_plan/aggregates/utils.rs b/datafusion/core/src/physical_plan/aggregates/utils.rs deleted file mode 100644 index a55464edd145c..0000000000000 --- a/datafusion/core/src/physical_plan/aggregates/utils.rs +++ /dev/null @@ -1,150 +0,0 @@ -// Licensed to the Apache Software Foundation (ASF) under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. - -//! This file contains various utility functions that are common to both -//! batch and streaming aggregation code. - -use crate::physical_plan::aggregates::AccumulatorItem; -use arrow::compute; -use arrow::compute::filter; -use arrow::row::OwnedRow; -use arrow_array::types::UInt32Type; -use arrow_array::{Array, ArrayRef, BooleanArray, PrimitiveArray}; -use arrow_schema::{Schema, SchemaRef}; -use datafusion_common::cast::as_boolean_array; -use datafusion_common::utils::get_arrayref_at_indices; -use datafusion_common::{DataFusionError, Result, ScalarValue}; -use datafusion_physical_expr::AggregateExpr; -use datafusion_row::reader::{read_row, RowReader}; -use datafusion_row::MutableRecordBatch; -use std::sync::Arc; - -/// This object encapsulates the state that is built for each output group. -#[derive(Debug)] -pub(crate) struct GroupState { - /// The actual group by values, stored sequentially - pub group_by_values: OwnedRow, - - // Accumulator state, stored sequentially - pub aggregation_buffer: Vec, - - // Accumulator state, one for each aggregate that doesn't support row accumulation - pub accumulator_set: Vec, - - /// Scratch space used to collect indices for input rows in a - /// batch that have values to aggregate, reset on each batch. - pub indices: Vec, -} - -#[derive(Debug)] -/// This object tracks the aggregation phase. -pub(crate) enum ExecutionState { - ReadingInput, - ProducingOutput, - Done, -} - -pub(crate) fn aggr_state_schema(aggr_expr: &[Arc]) -> SchemaRef { - let fields = aggr_expr - .iter() - .flat_map(|expr| expr.state_fields().unwrap().into_iter()) - .collect::>(); - Arc::new(Schema::new(fields)) -} - -pub(crate) fn read_as_batch(rows: &[Vec], schema: &Schema) -> Vec { - let mut output = MutableRecordBatch::new(rows.len(), Arc::new(schema.clone())); - let mut row = RowReader::new(schema); - - for data in rows { - row.point_to(0, data); - read_row(&row, &mut output, schema); - } - - output.output_as_columns() -} - -pub(crate) fn get_at_indices( - input_values: &[Vec], - batch_indices: &PrimitiveArray, -) -> Result>> { - input_values - .iter() - .map(|array| get_arrayref_at_indices(array, batch_indices)) - .collect() -} - -pub(crate) fn get_optional_filters( - original_values: &[Option>], - batch_indices: &PrimitiveArray, -) -> Vec>> { - original_values - .iter() - .map(|array| { - array.as_ref().map(|array| { - compute::take( - array.as_ref(), - batch_indices, - None, // None: no index check - ) - .unwrap() - }) - }) - .collect() -} - -pub(crate) fn slice_and_maybe_filter( - aggr_array: &[ArrayRef], - filter_opt: Option<&Arc>, - offsets: &[usize], -) -> Result> { - let (offset, length) = (offsets[0], offsets[1] - offsets[0]); - let sliced_arrays: Vec = aggr_array - .iter() - .map(|array| array.slice(offset, length)) - .collect(); - - if let Some(f) = filter_opt { - let sliced = f.slice(offset, length); - let filter_array = as_boolean_array(&sliced)?; - - sliced_arrays - .iter() - .map(|array| filter(array, filter_array).map_err(DataFusionError::ArrowError)) - .collect() - } else { - Ok(sliced_arrays) - } -} - -/// This method is similar to Scalar::try_from_array except for the Null handling. -/// This method returns [ScalarValue::Null] instead of [ScalarValue::Type(None)]. -pub(crate) fn col_to_scalar( - array: &ArrayRef, - filter: &Option<&BooleanArray>, - row_index: usize, -) -> Result { - if array.is_null(row_index) { - return Ok(ScalarValue::Null); - } - if let Some(filter) = filter { - if !filter.value(row_index) { - return Ok(ScalarValue::Null); - } - } - ScalarValue::try_from_array(array, row_index) -} diff --git a/datafusion/core/tests/fuzz_cases/aggregate_fuzz.rs b/datafusion/core/tests/fuzz_cases/aggregate_fuzz.rs index 74dd9ee1d13e1..e27c133412c2f 100644 --- a/datafusion/core/tests/fuzz_cases/aggregate_fuzz.rs +++ b/datafusion/core/tests/fuzz_cases/aggregate_fuzz.rs @@ -108,9 +108,6 @@ async fn run_aggregate_test(input1: Vec, group_by_columns: Vec<&str .collect::>(); let group_by = PhysicalGroupBy::new_single(expr); - println!("aggregate_expr: {aggregate_expr:?}"); - println!("group_by: {group_by:?}"); - let aggregate_exec_running = Arc::new( AggregateExec::try_new( AggregateMode::Partial, @@ -170,6 +167,8 @@ async fn run_aggregate_test(input1: Vec, group_by_columns: Vec<&str (i, usual_line), (i, running_line), "Inconsistent result\n\n\ + Aggregate_expr: {aggregate_expr:?}\n\ + group_by: {group_by:?}\n\ Left Plan:\n{}\n\ Right Plan:\n{}\n\ schema:\n{schema}\n\ diff --git a/datafusion/core/tests/sqllogictests/test_files/aal.slt b/datafusion/core/tests/sqllogictests/test_files/aal.slt new file mode 100644 index 0000000000000..aeacc8f43d561 --- /dev/null +++ b/datafusion/core/tests/sqllogictests/test_files/aal.slt @@ -0,0 +1,243 @@ +# Test for grouped by table + + +# Columns in the table are a,b,c,d. Source is CsvExec which is ordered by +# a,b,c column. Column a has cardinality 2, column b has cardinality 4. +# Column c has cardinality 100 (unique entries). Column d has cardinality 5. +statement ok +CREATE UNBOUNDED EXTERNAL TABLE t ( + a0 INTEGER, + a INTEGER, + b INTEGER, + c INTEGER, + d INTEGER +) +STORED AS CSV +WITH HEADER ROW +WITH ORDER (a ASC, b ASC, c ASC) +LOCATION 'tests/data/window_2.csv'; + + +# Show that it uses bounded aggregate stream... +query TT +EXPLAIN +SELECT a, SUM(b) +FROM t +GROUP BY a; +---- +logical_plan +Aggregate: groupBy=[[t.a]], aggr=[[SUM(t.b)]] +--TableScan: t projection=[a, b] +physical_plan +AggregateExec: mode=FinalPartitioned, gby=[a@0 as a], aggr=[SUM(t.b)], ordering_mode=FullyOrdered +--CoalesceBatchesExec: target_batch_size=8192 +----RepartitionExec: partitioning=Hash([a@0], 4), input_partitions=1 +------AggregateExec: mode=Partial, gby=[a@0 as a], aggr=[SUM(t.b)], ordering_mode=FullyOrdered +--------CsvExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/core/tests/data/window_2.csv]]}, projection=[a, b], infinite_source=true, output_ordering=[a@0 ASC NULLS LAST, b@1 ASC NULLS LAST], has_header=true + +query IIII +select a, b, c, d from t; +---- +0 0 0 0 +0 0 1 2 +0 0 2 0 +0 0 3 0 +0 0 4 1 +0 0 5 1 +0 0 6 0 +0 0 7 2 +0 0 8 1 +0 0 9 4 +0 0 10 4 +0 0 11 2 +0 0 12 2 +0 0 13 1 +0 0 14 2 +0 0 15 3 +0 0 16 3 +0 0 17 2 +0 0 18 1 +0 0 19 4 +0 0 20 0 +0 0 21 3 +0 0 22 0 +0 0 23 0 +0 0 24 4 +0 1 25 0 +0 1 26 2 +0 1 27 0 +0 1 28 1 +0 1 29 1 +0 1 30 3 +0 1 31 4 +0 1 32 2 +0 1 33 2 +0 1 34 4 +0 1 35 0 +0 1 36 1 +0 1 37 4 +0 1 38 0 +0 1 39 1 +0 1 40 1 +0 1 41 3 +0 1 42 3 +0 1 43 2 +0 1 44 3 +0 1 45 0 +0 1 46 0 +0 1 47 1 +0 1 48 1 +0 1 49 3 +1 2 50 0 +1 2 51 3 +1 2 52 1 +1 2 53 1 +1 2 54 4 +1 2 55 2 +1 2 56 1 +1 2 57 1 +1 2 58 1 +1 2 59 2 +1 2 60 4 +1 2 61 3 +1 2 62 1 +1 2 63 4 +1 2 64 4 +1 2 65 0 +1 2 66 2 +1 2 67 4 +1 2 68 1 +1 2 69 1 +1 2 70 0 +1 2 71 2 +1 2 72 1 +1 2 73 1 +1 2 74 4 +1 3 75 2 +1 3 76 0 +1 3 77 2 +1 3 78 1 +1 3 79 4 +1 3 80 2 +1 3 81 0 +1 3 82 4 +1 3 83 2 +1 3 84 1 +1 3 85 1 +1 3 86 1 +1 3 87 4 +1 3 88 3 +1 3 89 4 +1 3 90 1 +1 3 91 2 +1 3 92 0 +1 3 93 0 +1 3 94 2 +1 3 95 0 +1 3 96 4 +1 3 97 2 +1 3 98 4 +1 3 99 3 + +# Force incremental output +statement ok +set datafusion.execution.batch_size = 8; + +# # Run it +# query II rowsort +# SELECT a, COUNT(b) +# FROM t +# GROUP BY a; +# ---- +# 0 50 +# 1 50 + + +# # Run it +# query II rowsort +# SELECT a, SUM(b) +# FROM t +# GROUP BY a; +# ---- +# 0 25 +# 1 125 + +# use partial grouping (data sorted on a, b, c but group by d, a) + +# Run it +query III rowsort +SELECT d, a, COUNT(b) +FROM t +GROUP BY d, a; +---- +0 0 13 +0 1 8 +1 0 12 +1 1 15 +2 0 10 +2 1 11 +3 0 8 +3 1 4 +4 0 7 +4 1 12 + + + +# Run it +query III rowsort +SELECT d, a, SUM(b) +FROM t +GROUP BY d, a; +---- +0 0 6 +0 1 21 +1 0 7 +1 1 35 +2 0 4 +2 1 29 +3 0 5 +3 1 10 +4 0 3 +4 1 30 + + +# verify +statement ok +create table t2 as select * from t; + + +# Run it +query III rowsort +SELECT d, a, COUNT(b) +FROM t2 +GROUP BY d, a; +---- +0 0 13 +0 1 8 +1 0 12 +1 1 15 +2 0 10 +2 1 11 +3 0 8 +3 1 4 +4 0 7 +4 1 12 + + + +# Run it +query III rowsort +SELECT d, a, SUM(b) +FROM t2 +GROUP BY d, a; +---- +0 0 6 +0 1 21 +1 0 7 +1 1 35 +2 0 4 +2 1 29 +3 0 5 +3 1 10 +4 0 3 +4 1 30 diff --git a/datafusion/physical-expr/src/aggregate/average.rs b/datafusion/physical-expr/src/aggregate/average.rs index e95e9fcf877ae..6ad0f94e032a7 100644 --- a/datafusion/physical-expr/src/aggregate/average.rs +++ b/datafusion/physical-expr/src/aggregate/average.rs @@ -48,6 +48,7 @@ use datafusion_common::{DataFusionError, Result}; use datafusion_expr::Accumulator; use datafusion_row::accessor::RowAccessor; +use super::groups_accumulator::EmitTo; use super::utils::{adjust_output_array, Decimal128Averager}; /// AVG aggregate expression @@ -560,10 +561,10 @@ where Ok(()) } - fn evaluate(&mut self) -> Result { - let counts = std::mem::take(&mut self.counts); - let sums = std::mem::take(&mut self.sums); - let nulls = self.null_state.build(); + fn evaluate(&mut self, emit_to: EmitTo) -> Result { + let counts = emit_to.take_needed(&mut self.counts); + let sums = emit_to.take_needed(&mut self.sums); + let nulls = self.null_state.build(emit_to); assert_eq!(nulls.len(), sums.len()); assert_eq!(counts.len(), sums.len()); @@ -598,12 +599,14 @@ where } // return arrays for sums and counts - fn state(&mut self) -> Result> { - let nulls = Some(self.null_state.build()); - let counts = std::mem::take(&mut self.counts); + fn state(&mut self, emit_to: EmitTo) -> Result> { + let nulls = self.null_state.build(emit_to); + let nulls = Some(nulls); + + let counts = emit_to.take_needed(&mut self.counts); let counts = UInt64Array::new(counts.into(), nulls.clone()); // zero copy - let sums = std::mem::take(&mut self.sums); + let sums = emit_to.take_needed(&mut self.sums); let sums = PrimitiveArray::::new(sums.into(), nulls); // zero copy let sums = adjust_output_array(&self.sum_data_type, Arc::new(sums))?; diff --git a/datafusion/physical-expr/src/aggregate/count.rs b/datafusion/physical-expr/src/aggregate/count.rs index e0b9ffd81ae5b..60e15a673a0c2 100644 --- a/datafusion/physical-expr/src/aggregate/count.rs +++ b/datafusion/physical-expr/src/aggregate/count.rs @@ -41,6 +41,7 @@ use datafusion_row::accessor::RowAccessor; use crate::expressions::format_state_name; use super::groups_accumulator::accumulate::accumulate_indices; +use super::groups_accumulator::EmitTo; /// COUNT aggregate expression /// Returns the amount of non-null values of the given expression. @@ -171,8 +172,8 @@ impl GroupsAccumulator for CountGroupsAccumulator { Ok(()) } - fn evaluate(&mut self) -> Result { - let counts = std::mem::take(&mut self.counts); + fn evaluate(&mut self, emit_to: EmitTo) -> Result { + let counts = emit_to.take_needed(&mut self.counts); // Count is always non null (null inputs just don't contribute to the overall values) let nulls = None; @@ -182,8 +183,8 @@ impl GroupsAccumulator for CountGroupsAccumulator { } // return arrays for counts - fn state(&mut self) -> Result> { - let counts = std::mem::take(&mut self.counts); + fn state(&mut self, emit_to: EmitTo) -> Result> { + let counts = emit_to.take_needed(&mut self.counts); let counts: PrimitiveArray = Int64Array::from(counts); // zero copy, no nulls Ok(vec![Arc::new(counts) as ArrayRef]) } diff --git a/datafusion/physical-expr/src/aggregate/groups_accumulator/accumulate.rs b/datafusion/physical-expr/src/aggregate/groups_accumulator/accumulate.rs index bcc9d30bedd84..596265a737da0 100644 --- a/datafusion/physical-expr/src/aggregate/groups_accumulator/accumulate.rs +++ b/datafusion/physical-expr/src/aggregate/groups_accumulator/accumulate.rs @@ -21,7 +21,9 @@ use arrow::datatypes::ArrowPrimitiveType; use arrow_array::{Array, BooleanArray, PrimitiveArray}; -use arrow_buffer::{BooleanBufferBuilder, NullBuffer}; +use arrow_buffer::{BooleanBuffer, BooleanBufferBuilder, NullBuffer}; + +use crate::EmitTo; /// Track the accumulator null state per row: if any values for that /// group were null and if any values have been seen at all for that group. @@ -317,13 +319,30 @@ impl NullState { } } - /// Creates the final [`NullBuffer`] representing which - /// group_indices should have null values (because they never saw - /// any values) + /// Creates the a [`NullBuffer`] representing which group_indices + /// should have null values (because they never saw any values) + /// for the `emit_to` rows. /// - /// resets the internal state to empty - pub fn build(&mut self) -> NullBuffer { - NullBuffer::new(self.seen_values.finish()) + /// resets the internal state appropriately + pub fn build(&mut self, emit_to: EmitTo) -> NullBuffer { + let nulls: BooleanBuffer = self.seen_values.finish(); + + let nulls = match emit_to { + EmitTo::All => nulls, + EmitTo::First(n) => { + // split off the first N values in seen_values + // + // TODO make this more efficient rather than two + // copies and bitwise manipulation + let first_n_null: BooleanBuffer = nulls.iter().take(n).collect(); + // reset the existing seen buffer + for seen in nulls.iter().skip(n) { + self.seen_values.append(seen); + } + first_n_null + } + }; + NullBuffer::new(nulls) } } @@ -689,7 +708,7 @@ mod test { // Validate the final buffer (one value per group) let expected_null_buffer = mock.expected_null_buffer(total_num_groups); - let null_buffer = null_state.build(); + let null_buffer = null_state.build(EmitTo::All); assert_eq!(null_buffer, expected_null_buffer); } @@ -806,7 +825,7 @@ mod test { // Validate the final buffer (one value per group) let expected_null_buffer = mock.expected_null_buffer(total_num_groups); - let null_buffer = null_state.build(); + let null_buffer = null_state.build(EmitTo::All); assert_eq!(null_buffer, expected_null_buffer); } diff --git a/datafusion/physical-expr/src/aggregate/groups_accumulator/adapter.rs b/datafusion/physical-expr/src/aggregate/groups_accumulator/adapter.rs index 7b4c61fe7dc4e..b66bcd5504526 100644 --- a/datafusion/physical-expr/src/aggregate/groups_accumulator/adapter.rs +++ b/datafusion/physical-expr/src/aggregate/groups_accumulator/adapter.rs @@ -17,7 +17,7 @@ //! Adapter that makes [`GroupsAccumulator`] out of [`Accumulator`] -use super::GroupsAccumulator; +use super::{EmitTo, GroupsAccumulator}; use arrow::{ array::{AsArray, UInt32Builder}, compute, @@ -71,7 +71,7 @@ impl AccumulatorState { fn size(&self) -> usize { self.accumulator.size() + std::mem::size_of_val(self) - + std::mem::size_of::() * self.indices.capacity() + + self.indices.allocated_size() } } @@ -82,40 +82,29 @@ impl GroupsAccumulatorAdapter { where F: Fn() -> Result> + Send + 'static, { - let mut new_self = Self { + Self { factory: Box::new(factory), states: vec![], allocation_bytes: 0, - }; - new_self.reset_allocation(); - new_self - } - - // Reset the allocation bytes to empty state - fn reset_allocation(&mut self) { - assert!(self.states.is_empty()); - self.allocation_bytes = std::mem::size_of::(); + } } /// Ensure that self.accumulators has total_num_groups fn make_accumulators_if_needed(&mut self, total_num_groups: usize) -> Result<()> { // can't shrink assert!(total_num_groups >= self.states.len()); - let vec_size_pre = - std::mem::size_of::() * self.states.capacity(); + let vec_size_pre = self.states.allocated_size(); // instantiate new accumulators let new_accumulators = total_num_groups - self.states.len(); for _ in 0..new_accumulators { let accumulator = (self.factory)()?; let state = AccumulatorState::new(accumulator); - self.allocation_bytes += state.size(); + self.add_allocation(state.size()); self.states.push(state); } - self.allocation_bytes += - std::mem::size_of::() * self.states.capacity(); - self.allocation_bytes -= vec_size_pre; + self.adjust_allocation(vec_size_pre, self.states.allocated_size()); Ok(()) } @@ -204,9 +193,11 @@ impl GroupsAccumulatorAdapter { // RecordBatch(es) let iter = groups_with_rows.iter().zip(offsets.windows(2)); + let mut sizes_pre = 0; + let mut sizes_post = 0; for (&group_idx, offsets) in iter { let state = &mut self.states[group_idx]; - let size_pre = state.size(); + sizes_pre += state.size(); let values_to_accumulate = slice_and_maybe_filter(&values, opt_filter.as_ref(), offsets)?; @@ -215,12 +206,32 @@ impl GroupsAccumulatorAdapter { // clear out the state so they are empty for next // iteration state.indices.clear(); - - self.allocation_bytes += state.size(); - self.allocation_bytes -= size_pre; + sizes_post += state.size(); } + + self.adjust_allocation(sizes_pre, sizes_post); Ok(()) } + + fn add_allocation(&mut self, size: usize) { + self.allocation_bytes += size; + } + + fn free_allocation(&mut self, size: usize) { + // use saturating sub to avoid errors if the accumulators + // report erronious sizes + self.allocation_bytes = self.allocation_bytes.saturating_sub(size) + } + + /// Adjusts the allocation for something that started with + /// start_size and now has new_size avoiding overflow + fn adjust_allocation(&mut self, old_size: usize, new_size: usize) { + if new_size > old_size { + self.add_allocation(new_size - old_size) + } else { + self.free_allocation(old_size - new_size) + } + } } impl GroupsAccumulator for GroupsAccumulatorAdapter { @@ -243,27 +254,36 @@ impl GroupsAccumulator for GroupsAccumulatorAdapter { Ok(()) } - fn evaluate(&mut self) -> Result { - let states = std::mem::take(&mut self.states); + fn evaluate(&mut self, emit_to: EmitTo) -> Result { + let vec_size_pre = self.states.allocated_size(); + + let states = emit_to.take_needed(&mut self.states); let results: Vec = states .into_iter() - .map(|state| state.accumulator.evaluate()) + .map(|state| { + self.free_allocation(state.size()); + state.accumulator.evaluate() + }) .collect::>()?; let result = ScalarValue::iter_to_array(results); - self.reset_allocation(); + + self.adjust_allocation(vec_size_pre, self.states.allocated_size()); + result } - fn state(&mut self) -> Result> { - let states = std::mem::take(&mut self.states); + fn state(&mut self, emit_to: EmitTo) -> Result> { + let vec_size_pre = self.states.allocated_size(); + let states = emit_to.take_needed(&mut self.states); // each accumulator produces a potential vector of values // which we need to form into columns let mut results: Vec> = vec![]; for state in states { + self.free_allocation(state.size()); let accumulator_state = state.accumulator.state()?; results.resize_with(accumulator_state.len(), Vec::new); for (idx, state_val) in accumulator_state.into_iter().enumerate() { @@ -284,8 +304,8 @@ impl GroupsAccumulator for GroupsAccumulatorAdapter { assert_eq!(arr.len(), first_col.len()) } } + self.adjust_allocation(vec_size_pre, self.states.allocated_size()); - self.reset_allocation(); Ok(arrays) } @@ -302,7 +322,8 @@ impl GroupsAccumulator for GroupsAccumulatorAdapter { opt_filter, total_num_groups, |accumulator, values_to_accumulate| { - accumulator.merge_batch(values_to_accumulate) + accumulator.merge_batch(values_to_accumulate)?; + Ok(()) }, )?; Ok(()) @@ -313,6 +334,23 @@ impl GroupsAccumulator for GroupsAccumulatorAdapter { } } +/// Extension trait for [`Vec`] to account for allocations. +pub trait VecAllocExt { + /// Item type. + type T; + /// Return the amount of memory allocated by this Vec (not + /// recursively counting any heap allocations contained within the + /// structure). Does not include the size of `self` + fn allocated_size(&self) -> usize; +} + +impl VecAllocExt for Vec { + type T = T; + fn allocated_size(&self) -> usize { + std::mem::size_of::() * self.capacity() + } +} + fn get_filter_at_indices( opt_filter: Option<&BooleanArray>, indices: &PrimitiveArray, diff --git a/datafusion/physical-expr/src/aggregate/groups_accumulator/bool_op.rs b/datafusion/physical-expr/src/aggregate/groups_accumulator/bool_op.rs index 83ffc3717b442..6a976cf7977e5 100644 --- a/datafusion/physical-expr/src/aggregate/groups_accumulator/bool_op.rs +++ b/datafusion/physical-expr/src/aggregate/groups_accumulator/bool_op.rs @@ -24,7 +24,7 @@ use datafusion_common::Result; use crate::GroupsAccumulator; -use super::accumulate::NullState; +use super::{accumulate::NullState, EmitTo}; /// An accumulator that implements a single operation over a /// [`BooleanArray`] where the accumulated state is also boolean (such @@ -98,15 +98,19 @@ where Ok(()) } - fn evaluate(&mut self) -> Result { + fn evaluate(&mut self, emit_to: EmitTo) -> Result { + if emit_to.is_some() { + panic!("emit_to handling not yet implemented"); + } + let values = self.values.finish(); - let nulls = self.null_state.build(); + let nulls = self.null_state.build(emit_to); let values = BooleanArray::new(values, Some(nulls)); Ok(Arc::new(values)) } - fn state(&mut self) -> Result> { - self.evaluate().map(|arr| vec![arr]) + fn state(&mut self, emit_to: EmitTo) -> Result> { + self.evaluate(emit_to).map(|arr| vec![arr]) } fn merge_batch( diff --git a/datafusion/physical-expr/src/aggregate/groups_accumulator/mod.rs b/datafusion/physical-expr/src/aggregate/groups_accumulator/mod.rs index 49d62e7a93942..56d09f2a9ab6f 100644 --- a/datafusion/physical-expr/src/aggregate/groups_accumulator/mod.rs +++ b/datafusion/physical-expr/src/aggregate/groups_accumulator/mod.rs @@ -27,6 +27,46 @@ pub(crate) mod prim_op; use arrow_array::{ArrayRef, BooleanArray}; use datafusion_common::Result; +/// Describes how many rows should be emitted from an accumulator. +#[derive(Debug, Clone, Copy)] +pub enum EmitTo { + /// Emit all groups + All, + /// Emit only the first `n` groups and shift all existing group + /// indexes down by n. For example, if `n=10`, group_index `0, 1, + /// ... 9` are emitted and group indexes '`10, 11, 12, ...` become + /// `0, 1, 2, ...`. + First(usize), +} + +impl EmitTo { + /// Does self specify something other than `All`?1 + pub fn is_some(&self) -> bool { + matches!(self, Self::First(_)) + } + + /// Removes the required number of rows from `v`, returning a Vec + /// with elements taken, and the remaining values in `v`. + /// + /// This avoids copying if Self::All + pub fn take_needed(&self, v: &mut Vec) -> Vec { + match self { + Self::All => { + // Take the entire vector, leave new (empty) vector + std::mem::take(v) + } + Self::First(n) => { + // get end n+1,.. values into t + let mut t = v.split_off(*n); + // leave n+1,.. in v + std::mem::swap(v, &mut t); + t + } + } + } + // TODO null state handling +} + /// `GroupAccumulator` implements a single aggregate (e.g. AVG) and /// stores the state for *all* groups internally. /// @@ -72,28 +112,33 @@ pub trait GroupsAccumulator: Send { /// each group, and `evaluate` will produce that running sum as /// its output for all groups, in group_index order /// - /// The accumulator should free to release / reset it is internal - /// state after this call to the same as it was after being - /// initially created. - fn evaluate(&mut self) -> Result; + /// If `emit_to`` is [`EmitTo::All`], the accumulator should return all + /// groups and release / reset its internal state equivalent to + /// when it was first created. + /// + /// If `emit_to` is [`EmitTo::First`], only the first `first` groups + /// should be emitted and the state for those first groups. State + /// for the remaining groups must be retained for future use. The + /// group_indices on subsequent calls to `update_batch` or + /// `merge_batch` will be shifted by `first`. For example, if + /// emitting the first `10` groups, the first 10 rows should be + /// returned and every remaining group's group_index is lowered by + /// 10. Data for group_index `17` will be come the data for + /// group_index `7`, and so on. + fn evaluate(&mut self, emit_to: EmitTo) -> Result; /// Returns the intermediate aggregate state for this accumulator, /// used for multi-phase grouping, resetting its internal state. /// - /// The rows returned *must* be in group_index order: The value - /// for group_index 0, followed by 1, etc. Any group_index that - /// did not have values, should be null. - /// /// For example, `AVG` might return two arrays: `SUM` and `COUNT` /// but the `MIN` aggregate would just return a single array. /// /// Note more sophisticated internal state can be passed as /// single `StructArray` rather than multiple arrays. /// - /// The accumulator should free to release / reset its internal - /// state after this call to the same as it was after being - /// initially created. - fn state(&mut self) -> Result>; + /// See [`Self::evaluate`] for details on the required output + /// order and `emit_to`. + fn state(&mut self, emit_to: EmitTo) -> Result>; /// Merges intermediate state (the output from [`Self::state`]) /// into this accumulator's values. diff --git a/datafusion/physical-expr/src/aggregate/groups_accumulator/prim_op.rs b/datafusion/physical-expr/src/aggregate/groups_accumulator/prim_op.rs index a49651a5e3fa8..adeaea712c68f 100644 --- a/datafusion/physical-expr/src/aggregate/groups_accumulator/prim_op.rs +++ b/datafusion/physical-expr/src/aggregate/groups_accumulator/prim_op.rs @@ -24,7 +24,7 @@ use datafusion_common::Result; use crate::{aggregate::utils::adjust_output_array, GroupsAccumulator}; -use super::accumulate::NullState; +use super::{accumulate::NullState, EmitTo}; /// An accumulator that implements a single operation over /// [`ArrowPrimitiveType`] where the accumulated state is the same as @@ -112,16 +112,16 @@ where Ok(()) } - fn evaluate(&mut self) -> Result { - let values = std::mem::take(&mut self.values); - let nulls = self.null_state.build(); + fn evaluate(&mut self, emit_to: EmitTo) -> Result { + let values = emit_to.take_needed(&mut self.values); + let nulls = self.null_state.build(emit_to); let values = PrimitiveArray::::new(values.into(), Some(nulls)); // no copy adjust_output_array(&self.data_type, Arc::new(values)) } - fn state(&mut self) -> Result> { - self.evaluate().map(|arr| vec![arr]) + fn state(&mut self, emit_to: EmitTo) -> Result> { + self.evaluate(emit_to).map(|arr| vec![arr]) } fn merge_batch( diff --git a/datafusion/physical-expr/src/lib.rs b/datafusion/physical-expr/src/lib.rs index b695ee169eed2..58275f3780b47 100644 --- a/datafusion/physical-expr/src/lib.rs +++ b/datafusion/physical-expr/src/lib.rs @@ -46,8 +46,9 @@ pub mod utils; pub mod var_provider; pub mod window; -// reexport this to maintain compatibility with anything that used from_slice previously -pub use aggregate::groups_accumulator::{GroupsAccumulator, GroupsAccumulatorAdapter}; +pub use aggregate::groups_accumulator::{ + EmitTo, GroupsAccumulator, GroupsAccumulatorAdapter, +}; pub use aggregate::AggregateExpr; pub use equivalence::{