Skip to content

Commit

Permalink
Consolidate BoundedAggregateStream
Browse files Browse the repository at this point in the history
  • Loading branch information
alamb committed Jul 15, 2023
1 parent afc9c9d commit aeac248
Show file tree
Hide file tree
Showing 16 changed files with 890 additions and 1,351 deletions.
1,072 changes: 0 additions & 1,072 deletions datafusion/core/src/physical_plan/aggregates/bounded_aggregate_stream.rs

This file was deleted.

21 changes: 6 additions & 15 deletions datafusion/core/src/physical_plan/aggregates/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,7 @@
//! Aggregates functionalities
use crate::physical_plan::aggregates::{
bounded_aggregate_stream::BoundedAggregateStream, no_grouping::AggregateStream,
row_hash::GroupedHashAggregateStream,
no_grouping::AggregateStream, row_hash::GroupedHashAggregateStream,
};
use crate::physical_plan::metrics::{ExecutionPlanMetricsSet, MetricsSet};
use crate::physical_plan::{
Expand All @@ -46,10 +45,9 @@ use std::any::Any;
use std::collections::HashMap;
use std::sync::Arc;

mod bounded_aggregate_stream;
mod no_grouping;
mod order;
mod row_hash;
mod utils;

pub use datafusion_expr::AggregateFunction;
use datafusion_physical_expr::aggregate::is_order_sensitive;
Expand Down Expand Up @@ -89,7 +87,7 @@ pub enum AggregateMode {
/// Specifically, each distinct combination of the relevant columns
/// are contiguous in the input, and once a new combination is seen
/// previous combinations are guaranteed never to appear again
#[derive(Debug, Clone, PartialEq, Eq)]
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum GroupByOrderMode {
/// The input is not (known to be) ordered by any of the
/// expressions in the GROUP BY clause.
Expand Down Expand Up @@ -212,15 +210,15 @@ impl PartialEq for PhysicalGroupBy {
enum StreamType {
AggregateStream(AggregateStream),
GroupedHashAggregateStream(GroupedHashAggregateStream),
BoundedAggregate(BoundedAggregateStream),
//BoundedAggregate(BoundedAggregateStream),
}

impl From<StreamType> for SendableRecordBatchStream {
fn from(stream: StreamType) -> Self {
match stream {
StreamType::AggregateStream(stream) => Box::pin(stream),
StreamType::GroupedHashAggregateStream(stream) => Box::pin(stream),
StreamType::BoundedAggregate(stream) => Box::pin(stream),
//StreamType::BoundedAggregate(stream) => Box::pin(stream),
}
}
}
Expand Down Expand Up @@ -719,14 +717,6 @@ impl AggregateExec {
Ok(StreamType::AggregateStream(AggregateStream::new(
self, context, partition,
)?))
} else if let Some(aggregation_ordering) = &self.aggregation_ordering {
let aggregation_ordering = aggregation_ordering.clone();
Ok(StreamType::BoundedAggregate(BoundedAggregateStream::new(
self,
context,
partition,
aggregation_ordering,
)?))
} else {
Ok(StreamType::GroupedHashAggregateStream(
GroupedHashAggregateStream::new(self, context, partition)?,
Expand Down Expand Up @@ -1105,6 +1095,7 @@ fn create_accumulators(
.collect::<Result<Vec<_>>>()
}

#[allow(dead_code)]
fn create_row_accumulators(
aggr_expr: &[Arc<dyn AggregateExpr>],
) -> Result<Vec<RowAccumulatorItem>> {
Expand Down
164 changes: 164 additions & 0 deletions datafusion/core/src/physical_plan/aggregates/order/full.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,164 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

use crate::physical_expr::EmitTo;

/// Tracks grouping state when the data is ordered entirely by its
/// group keys
///
/// When the group values are sorted, as soon as we see group `n+1` we
/// know we will never see any rows for group `n again and thus they
/// can be emitted.
///
/// ```text
/// SUM(amt) GROUP BY id
///
/// The input is sorted by id
///
///
/// ┌─────┐ ┌──────────────────┐
/// │┌───┐│ │ ┌──────────────┐ │ ┏━━━━━━━━━━━━━━┓
/// ││ 0 ││ │ │ 123 │ │ ┌─────┃ 13 ┃
/// │└───┘│ │ └──────────────┘ │ │ ┗━━━━━━━━━━━━━━┛
/// │ ... │ │ ... │ │
/// │┌───┐│ │ ┌──────────────┐ │ │ current
/// ││12 ││ │ │ 234 │ │ │
/// │├───┤│ │ ├──────────────┤ │ │
/// ││12 ││ │ │ 234 │ │ │
/// │├───┤│ │ ├──────────────┤ │ │
/// ││13 ││ │ │ 456 │◀┼───┘
/// │└───┘│ │ └──────────────┘ │
/// └─────┘ └──────────────────┘
///
/// group indices group_values current tracks the most
/// (in group value recent group index
/// order)
/// ```
///
/// In the above diagram the current group is `13` groups `0..12` can
/// be emitted. Group `13` can not be emitted because it may have more
/// values in the next batch.
#[derive(Debug)]
pub(crate) struct GroupOrderingFull {
state: State,
/// Hash values for groups in 0..completed
hashes: Vec<u64>,
}

#[derive(Debug)]
enum State {
/// Have seen no input yet
Start,

/// Have seen all groups with indexes less than `completed_index`
InProgress {
/// index of the current group for which values are being
/// generated (can emit current - 1)
current: usize,
},

/// Seen end of input, all groups can be emitted
Complete,
}

impl GroupOrderingFull {
pub fn new() -> Self {
Self {
state: State::Start,
hashes: vec![],
}
}

// How far can data be emitted? Returns None if no data can be
// emitted
pub fn emit_to(&self) -> Option<EmitTo> {
match &self.state {
State::Start => None,
State::InProgress { current, .. } => {
// Can not emit if we are still on the first row,
// otherwise emit all rows prior to the current group
if *current == 0 {
None
} else {
Some(EmitTo::First(*current))
}
}
State::Complete { .. } => Some(EmitTo::All),
}
}

/// removes the first n groups from this ordering, shifting all
/// existing indexes down by N and returns a reference to the
/// updated hashes
pub fn remove_groups(&mut self, n: usize) -> &[u64] {
match &mut self.state {
State::Start => panic!("invalid state: start"),
State::InProgress { current } => {
// shift down by n
assert!(*current >= n);
*current -= n;
self.hashes.drain(0..n);
}
State::Complete { .. } => panic!("invalid state: complete"),
};
&self.hashes
}

/// Note that the input is complete so any outstanding groups are done as well
pub fn input_done(&mut self) {
self.state = match self.state {
State::Start => State::Complete,
State::InProgress { .. } => State::Complete,
State::Complete => State::Complete,
};
}

/// Called when new groups are added in a batch. See documentation
/// on [`super::GroupOrdering::new_groups`]
pub fn new_groups(
&mut self,
group_indices: &[usize],
batch_hashes: &[u64],
total_num_groups: usize,
) {
assert!(total_num_groups > 0);
assert_eq!(group_indices.len(), batch_hashes.len());

// copy any hash values
self.hashes.resize(total_num_groups, 0);
for (&group_index, &hash) in group_indices.iter().zip(batch_hashes.iter()) {
self.hashes[group_index] = hash;
}

let max_group_index = total_num_groups - 1;
self.state = match self.state {
State::Start => State::InProgress {
current: max_group_index,
},
State::InProgress { current } => {
// expect to see new group indexes if we are called again
assert!(current <= max_group_index, "{current} <= {max_group_index}");
State::InProgress {
current: max_group_index,
}
}
State::Complete { .. } => {
panic!("Saw new group after input was complete");
}
};
}
}
134 changes: 134 additions & 0 deletions datafusion/core/src/physical_plan/aggregates/order/mod.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,134 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

//! Order tracking for memory bounded grouping
use arrow_array::ArrayRef;
use arrow_schema::Schema;
use datafusion_common::Result;
use datafusion_physical_expr::EmitTo;

use super::{AggregationOrdering, GroupByOrderMode};

mod full;
mod partial;

pub(crate) use full::GroupOrderingFull;
pub(crate) use partial::GroupOrderingPartial;

/// Group ordering state, if present, for each group in the hash
/// table.
#[derive(Debug)]
pub(crate) enum GroupOrdering {
/// Groups are not ordered
None,
/// Groups are orderd by some pre-set of the group keys
Partial(GroupOrderingPartial),
/// Groups are entirely contiguous,
Full(GroupOrderingFull),
}

impl GroupOrdering {
/// Create a `GroupOrdering` for the ordering
pub fn try_new(
input_schema: &Schema,
ordering: &AggregationOrdering,
) -> Result<Self> {
let AggregationOrdering {
mode,
order_indices,
ordering,
} = ordering;

Ok(match mode {
GroupByOrderMode::None => GroupOrdering::None,
GroupByOrderMode::PartiallyOrdered => {
let partial =
GroupOrderingPartial::try_new(input_schema, order_indices, ordering)?;
GroupOrdering::Partial(partial)
}
GroupByOrderMode::FullyOrdered => {
GroupOrdering::Full(GroupOrderingFull::new())
}
})
}

// How far can data be emitted based on groups seen so far?
// Returns `None` if nothing can be emitted at this point based on
// ordering information
pub fn emit_to(&self) -> Option<EmitTo> {
match self {
GroupOrdering::None => None,
GroupOrdering::Partial(partial) => partial.emit_to(),
GroupOrdering::Full(full) => full.emit_to(),
}
}

/// Updates the state the input is done
pub fn input_done(&mut self) {
match self {
GroupOrdering::None => {}
GroupOrdering::Partial(partial) => partial.input_done(),
GroupOrdering::Full(full) => full.input_done(),
}
}

/// removes the first n groups from this ordering, shifting all
/// existing indexes down by N and returns a reference to the
/// updated hashes
pub fn remove_groups(&mut self, n: usize) -> &[u64] {
match self {
GroupOrdering::None => &[],
GroupOrdering::Partial(partial) => partial.remove_groups(n),
GroupOrdering::Full(full) => full.remove_groups(n),
}
}

/// Called when new groups are added in a batch
///
/// * `total_num_groups`: total number of groups (so max group_index is total_num_groups - 1).
///
/// * `group_values`: The group key values for *each row* in the batch
///
/// * `group_indices`: The indices for each row in the batch
///
/// * `hashes`: The hash values for each row in the batch
pub fn new_groups(
&mut self,
batch_group_values: &[ArrayRef],
group_indices: &[usize],
batch_hashes: &[u64],
total_num_groups: usize,
) -> Result<()> {
match self {
GroupOrdering::None => {}
GroupOrdering::Partial(partial) => {
partial.new_groups(
batch_group_values,
group_indices,
batch_hashes,
total_num_groups,
)?;
}

GroupOrdering::Full(full) => {
full.new_groups(group_indices, batch_hashes, total_num_groups);
}
};
Ok(())
}
}
Loading

0 comments on commit aeac248

Please sign in to comment.