Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Skip partial aggregation based on the cardinality of hash value instead of group values #12697

Closed
wants to merge 18 commits into from
Closed
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 17 additions & 0 deletions datafusion/core/tests/data/aggregate_mixed_type.csv
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
c1,c2
1,'a'
2,'b'
3,'c'
4,'d'
1,'a'
2,'b'
3,'c'
4,'d'
4,'d'
3,'c'
3,'c'
5,'e'
6,'f'
7,'g'
8,'a'
9,'b'
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ impl<O: OffsetSizeTrait> GroupValues for GroupValuesByes<O> {
&mut self,
cols: &[ArrayRef],
groups: &mut Vec<usize>,
_batch_hashes: &[u64],
) -> datafusion_common::Result<()> {
assert_eq!(cols.len(), 1);

Expand Down Expand Up @@ -108,7 +109,7 @@ impl<O: OffsetSizeTrait> GroupValues for GroupValuesByes<O> {

self.num_groups = 0;
let mut group_indexes = vec![];
self.intern(&[remaining_group_values], &mut group_indexes)?;
self.intern(&[remaining_group_values], &mut group_indexes, &[])?;

// Verify that the group indexes were assigned in the correct order
assert_eq!(0, group_indexes[0]);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ impl GroupValues for GroupValuesBytesView {
&mut self,
cols: &[ArrayRef],
groups: &mut Vec<usize>,
_batch_hashes: &[u64],
) -> datafusion_common::Result<()> {
assert_eq!(cols.len(), 1);

Expand Down Expand Up @@ -109,7 +110,8 @@ impl GroupValues for GroupValuesBytesView {

self.num_groups = 0;
let mut group_indexes = vec![];
self.intern(&[remaining_group_values], &mut group_indexes)?;
// Change mode
self.intern(&[remaining_group_values], &mut group_indexes, &[])?;

// Verify that the group indexes were assigned in the correct order
assert_eq!(0, group_indexes[0]);
Expand Down
21 changes: 6 additions & 15 deletions datafusion/physical-plan/src/aggregates/group_values/column.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@ use crate::aggregates::group_values::group_column::{
ByteGroupValueBuilder, GroupColumn, PrimitiveGroupValueBuilder,
};
use crate::aggregates::group_values::GroupValues;
use ahash::RandomState;
use arrow::compute::cast;
use arrow::datatypes::{
Date32Type, Date64Type, Float32Type, Float64Type, Int16Type, Int32Type, Int64Type,
Expand All @@ -28,7 +27,6 @@ use arrow::datatypes::{
use arrow::record_batch::RecordBatch;
use arrow_array::{Array, ArrayRef};
use arrow_schema::{DataType, Schema, SchemaRef};
use datafusion_common::hash_utils::create_hashes;
use datafusion_common::{not_impl_err, DataFusionError, Result};
use datafusion_execution::memory_pool::proxy::{RawTableAllocExt, VecAllocExt};
use datafusion_expr::EmitTo;
Expand Down Expand Up @@ -68,9 +66,6 @@ pub struct GroupValuesColumn {

/// reused buffer to store hashes
hashes_buffer: Vec<u64>,

/// Random state for creating hashes
random_state: RandomState,
}

impl GroupValuesColumn {
Expand All @@ -83,7 +78,6 @@ impl GroupValuesColumn {
map_size: 0,
group_values: vec![],
hashes_buffer: Default::default(),
random_state: Default::default(),
})
}

Expand Down Expand Up @@ -124,9 +118,12 @@ impl GroupValuesColumn {
}

impl GroupValues for GroupValuesColumn {
fn intern(&mut self, cols: &[ArrayRef], groups: &mut Vec<usize>) -> Result<()> {
let n_rows = cols[0].len();

fn intern(
&mut self,
cols: &[ArrayRef],
groups: &mut Vec<usize>,
batch_hashes: &[u64],
) -> Result<()> {
if self.group_values.is_empty() {
let mut v = Vec::with_capacity(cols.len());

Expand Down Expand Up @@ -208,12 +205,6 @@ impl GroupValues for GroupValuesColumn {
// tracks to which group each of the input rows belongs
groups.clear();

// 1.1 Calculate the group keys for the group values
let batch_hashes = &mut self.hashes_buffer;
batch_hashes.clear();
batch_hashes.resize(n_rows, 0);
create_hashes(cols, &self.random_state, batch_hashes)?;

for (row, &target_hash) in batch_hashes.iter().enumerate() {
let entry = self.map.get_mut(target_hash, |(exist_hash, group_idx)| {
// Somewhat surprisingly, this closure can be called even if the
Expand Down
7 changes: 6 additions & 1 deletion datafusion/physical-plan/src/aggregates/group_values/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,12 @@ pub trait GroupValues: Send {
/// If a row has the same value as a previous row, the same group id is
/// assigned. If a row has a new value, the next available group id is
/// assigned.
fn intern(&mut self, cols: &[ArrayRef], groups: &mut Vec<usize>) -> Result<()>;
fn intern(
&mut self,
cols: &[ArrayRef],
groups: &mut Vec<usize>,
batch_hashes: &[u64],
) -> Result<()>;

/// Returns the number of bytes of memory used by this [`GroupValues`]
fn size(&self) -> usize;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -111,7 +111,12 @@ impl<T: ArrowPrimitiveType> GroupValues for GroupValuesPrimitive<T>
where
T::Native: HashValue,
{
fn intern(&mut self, cols: &[ArrayRef], groups: &mut Vec<usize>) -> Result<()> {
fn intern(
&mut self,
cols: &[ArrayRef],
groups: &mut Vec<usize>,
_batch_hashes: &[u64],
) -> Result<()> {
assert_eq!(cols.len(), 1);
groups.clear();

Expand Down
7 changes: 6 additions & 1 deletion datafusion/physical-plan/src/aggregates/group_values/row.rs
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,12 @@ impl GroupValuesRows {
}

impl GroupValues for GroupValuesRows {
fn intern(&mut self, cols: &[ArrayRef], groups: &mut Vec<usize>) -> Result<()> {
fn intern(
&mut self,
cols: &[ArrayRef],
groups: &mut Vec<usize>,
_batch_hashes: &[u64],
) -> Result<()> {
// Convert the group keys into the row format
let group_rows = &mut self.rows_buffer;
group_rows.clear();
Expand Down
Loading
Loading