From c6f2b234fc76b804805f8685d88ae956475da73f Mon Sep 17 00:00:00 2001 From: Orson Peters Date: Fri, 25 Oct 2024 13:41:51 +0200 Subject: [PATCH 1/4] wip --- Cargo.lock | 1 + .../polars-core/src/hashing/vector_hasher.rs | 6 +- crates/polars-expr/Cargo.toml | 1 + crates/polars-expr/src/groups/mod.rs | 8 +- crates/polars-expr/src/groups/row_encoded.rs | 91 ++++++++++++-- crates/polars-expr/src/reduce/len.rs | 14 +++ crates/polars-expr/src/reduce/min_max.rs | 51 ++++++++ crates/polars-expr/src/reduce/mod.rs | 55 +++++++++ crates/polars-expr/src/reduce/partition.rs | 114 ++++++++++++++++++ crates/polars-expr/src/reduce/sum.rs | 17 +++ crates/polars-stream/src/nodes/group_by.rs | 82 +++++++++++-- crates/polars-utils/src/hashing.rs | 5 + 12 files changed, 417 insertions(+), 28 deletions(-) create mode 100644 crates/polars-expr/src/reduce/partition.rs diff --git a/Cargo.lock b/Cargo.lock index dbd7af500fcd..786a96d23fdb 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2891,6 +2891,7 @@ dependencies = [ "polars-row", "polars-time", "polars-utils", + "rand", "rayon", ] diff --git a/crates/polars-core/src/hashing/vector_hasher.rs b/crates/polars-core/src/hashing/vector_hasher.rs index 7dfb07c64d58..e00e45f1ede8 100644 --- a/crates/polars-core/src/hashing/vector_hasher.rs +++ b/crates/polars-core/src/hashing/vector_hasher.rs @@ -1,4 +1,5 @@ use arrow::bitmap::utils::get_bit_unchecked; +use polars_utils::hashing::folded_multiply; use polars_utils::total_ord::{ToTotalOrd, TotalHash}; use rayon::prelude::*; use xxhash_rust::xxh3::xxh3_64_with_seed; @@ -30,11 +31,6 @@ pub trait VecHash { } } -pub(crate) const fn folded_multiply(s: u64, by: u64) -> u64 { - let result = (s as u128).wrapping_mul(by as u128); - ((result & 0xffff_ffff_ffff_ffff) as u64) ^ ((result >> 64) as u64) -} - pub(crate) fn get_null_hash_value(random_state: &PlRandomState) -> u64 { // we just start with a large prime number and hash that twice // to get a constant hash value for null/None diff --git a/crates/polars-expr/Cargo.toml b/crates/polars-expr/Cargo.toml index 0911445617aa..29aa34652146 100644 --- a/crates/polars-expr/Cargo.toml +++ b/crates/polars-expr/Cargo.toml @@ -24,6 +24,7 @@ polars-plan = { workspace = true } polars-row = { workspace = true } polars-time = { workspace = true, optional = true } polars-utils = { workspace = true } +rand = { workspace = true } rayon = { workspace = true } [features] diff --git a/crates/polars-expr/src/groups/mod.rs b/crates/polars-expr/src/groups/mod.rs index 5eb32b34a052..37103006de7a 100644 --- a/crates/polars-expr/src/groups/mod.rs +++ b/crates/polars-expr/src/groups/mod.rs @@ -23,7 +23,7 @@ pub trait Grouper: Any + Send { /// the ith group of other now has group index group_idxs[i] in self. fn combine(&mut self, other: &dyn Grouper, group_idxs: &mut Vec); - /// Partitions this Grouper into the given partitions. + /// Partitions this Grouper into the given number of partitions. /// /// Updates partition_idxs and group_idxs such that the ith group of self /// has group index group_idxs[i] in partition partition_idxs[i]. @@ -31,13 +31,13 @@ pub trait Grouper: Any + Send { /// It is guaranteed that two equal keys in two independent partition_into /// calls map to the same partition index if the seed and the number of /// partitions is equal. - fn partition_into( + fn partition( &self, seed: u64, - partitions: &mut [Box], + num_partitions: usize, partition_idxs: &mut Vec, group_idxs: &mut Vec, - ); + ) -> Vec>; /// Returns the keys in this Grouper in group order, that is the key for /// group i is returned in row i. diff --git a/crates/polars-expr/src/groups/row_encoded.rs b/crates/polars-expr/src/groups/row_encoded.rs index 46ec956106a5..6745d8952b47 100644 --- a/crates/polars-expr/src/groups/row_encoded.rs +++ b/crates/polars-expr/src/groups/row_encoded.rs @@ -4,8 +4,10 @@ use hashbrown::hash_table::{Entry, HashTable}; use polars_core::chunked_array::ops::row_encode::_get_rows_encoded_unordered; use polars_row::EncodingField; use polars_utils::aliases::PlRandomState; +use polars_utils::hashing::{folded_multiply, hash_to_partition}; use polars_utils::itertools::Itertools; use polars_utils::vec::PushUnchecked; +use rand::Rng; use super::*; @@ -27,7 +29,13 @@ pub struct RowEncodedHashGrouper { key_schema: Arc, table: HashTable, key_data: Vec, - random_state: PlRandomState, + + // Used for computing canonical hashes. + random_state: PlRandomState, + + // Internal random seed used to keep hash iteration order decorrelated. + // We simply store a random odd number and multiply the canonical hash by it. + seed: u64, } impl RowEncodedHashGrouper { @@ -35,6 +43,7 @@ impl RowEncodedHashGrouper { Self { key_schema, random_state, + seed: rand::random::() | 1, ..Default::default() } } @@ -42,9 +51,9 @@ impl RowEncodedHashGrouper { fn insert_key(&mut self, hash: u64, key: &[u8]) -> IdxSize { let num_groups = self.table.len(); let entry = self.table.entry( - hash, + hash.wrapping_mul(self.seed), |g| unsafe { hash == g.key_hash && key == g.key(&self.key_data) }, - |g| g.key_hash, + |g| g.key_hash.wrapping_mul(self.seed), ); match entry { @@ -64,6 +73,20 @@ impl RowEncodedHashGrouper { } } + /// Insert a key, without checking that it is unique. + fn insert_key_unique(&mut self, hash: u64, key: &[u8]) -> IdxSize { + let group_idx = self.table.len().try_into().unwrap(); + let group = Group { + key_hash: hash, + key_offset: self.key_data.len(), + key_length: key.len().try_into().unwrap(), + group_idx, + }; + self.key_data.extend(key); + self.table.insert_unique(hash.wrapping_mul(self.seed), group, |g| g.key_hash.wrapping_mul(self.seed)); + group_idx + } + fn finalize_keys(&self, mut key_rows: Vec<&[u8]>) -> DataFrame { let key_dtypes = self .key_schema @@ -125,7 +148,8 @@ impl Grouper for RowEncodedHashGrouper { fn combine(&mut self, other: &dyn Grouper, group_idxs: &mut Vec) { let other = other.as_any().downcast_ref::().unwrap(); - self.table.reserve(other.table.len(), |g| g.key_hash); // TODO: cardinality estimation. + // TODO: cardinality estimation. + self.table.reserve(other.table.len(), |g| g.key_hash.wrapping_mul(self.seed)); unsafe { group_idxs.clear(); @@ -167,14 +191,59 @@ impl Grouper for RowEncodedHashGrouper { ) } - fn partition_into( + fn partition( &self, - _seed: u64, - _partitions: &mut [Box], - _partition_idxs: &mut Vec, - _group_idxs: &mut Vec, - ) { - unimplemented!() + seed: u64, + num_partitions: usize, + partition_idxs: &mut Vec, + group_idxs: &mut Vec, + ) -> Vec> { + assert!(num_partitions > 0); + + // Two-pass algorithm to prevent reallocations. + let mut partition_size = vec![(0, 0); num_partitions]; // (keys, bytes) + unsafe { + for group in self.table.iter() { + let ph = folded_multiply(group.key_hash, seed | 1); + let p_idx = hash_to_partition(ph, num_partitions); + let (p_keys, p_bytes) = partition_size.get_unchecked_mut(p_idx as usize); + *p_keys += 1; + *p_bytes += group.key_length as usize; + } + } + + let mut rng = rand::thread_rng(); + let mut partitions = partition_size + .into_iter() + .map(|(keys, bytes)| Self { + key_schema: self.key_schema.clone(), + table: HashTable::with_capacity(keys), + key_data: Vec::with_capacity(bytes), + random_state: self.random_state.clone(), + seed: rng.gen::() | 1, + }) + .collect_vec(); + + unsafe { + partition_idxs.clear(); + group_idxs.clear(); + partition_idxs.reserve(self.table.len()); + group_idxs.reserve(self.table.len()); + let partition_idxs_out = partition_idxs.spare_capacity_mut(); + let group_idxs_out = group_idxs.spare_capacity_mut(); + for group in self.table.iter() { + let ph = folded_multiply(group.key_hash, seed | 1); + let p_idx = hash_to_partition(ph, num_partitions); + let p = partitions.get_unchecked_mut(p_idx); + let group_idx = p.insert_key_unique(group.key_hash, group.key(&self.key_data)); + *partition_idxs_out.get_unchecked_mut(group.group_idx as usize) = MaybeUninit::new(p_idx as IdxSize); + *group_idxs_out.get_unchecked_mut(group.group_idx as usize) = MaybeUninit::new(group_idx); + } + partition_idxs.set_len(self.table.len()); + group_idxs.set_len(self.table.len()); + } + + partitions.into_iter().map(|p| Box::new(p) as _).collect() } fn as_any(&self) -> &dyn Any { diff --git a/crates/polars-expr/src/reduce/len.rs b/crates/polars-expr/src/reduce/len.rs index fa5aedb91f18..99ae57323a75 100644 --- a/crates/polars-expr/src/reduce/len.rs +++ b/crates/polars-expr/src/reduce/len.rs @@ -1,5 +1,7 @@ use polars_core::error::constants::LENGTH_LIMIT_MSG; +use crate::reduce::partition::partition_vec; + use super::*; #[derive(Default)] @@ -60,6 +62,18 @@ impl GroupedReduction for LenReduce { .collect_ca(PlSmallStr::EMPTY); Ok(ca.into_series()) } + + unsafe fn partition( + self: Box, + partition_sizes: &[IdxSize], + partition_idxs: &[IdxSize], + group_idxs: &[IdxSize], + ) -> Vec> { + partition_vec(self.groups, partition_sizes, partition_idxs, group_idxs) + .into_iter() + .map(|groups| Box::new(Self { groups }) as _) + .collect() + } fn as_any(&self) -> &dyn Any { self diff --git a/crates/polars-expr/src/reduce/min_max.rs b/crates/polars-expr/src/reduce/min_max.rs index f4541d7a88a1..38c148c16206 100644 --- a/crates/polars-expr/src/reduce/min_max.rs +++ b/crates/polars-expr/src/reduce/min_max.rs @@ -11,6 +11,7 @@ use polars_utils::float::IsFloat; use polars_utils::min_max::MinMax; use super::*; +use crate::reduce::partition::partition_mask; pub fn new_min_reduction(dtype: DataType, propagate_nans: bool) -> Box { use DataType::*; @@ -344,6 +345,31 @@ impl GroupedReduction for BoolMinGroupedReduction { Ok(()) } + unsafe fn partition( + self: Box, + partition_sizes: &[IdxSize], + partition_idxs: &[IdxSize], + group_idxs: &[IdxSize], + ) -> Vec> { + let p_values = partition_mask( + &self.values.freeze(), + partition_sizes, + partition_idxs, + group_idxs, + ); + let p_mask = partition_mask( + &self.mask.freeze(), + partition_sizes, + partition_idxs, + group_idxs, + ); + p_values + .into_iter() + .zip(p_mask) + .map(|(values, mask)| Box::new(Self { values, mask }) as _) + .collect() + } + fn finalize(&mut self) -> PolarsResult { let v = core::mem::take(&mut self.values); let m = core::mem::take(&mut self.mask); @@ -450,6 +476,31 @@ impl GroupedReduction for BoolMaxGroupedReduction { }) } + unsafe fn partition( + self: Box, + partition_sizes: &[IdxSize], + partition_idxs: &[IdxSize], + group_idxs: &[IdxSize], + ) -> Vec> { + let p_values = partition_mask( + &self.values.freeze(), + partition_sizes, + partition_idxs, + group_idxs, + ); + let p_mask = partition_mask( + &self.mask.freeze(), + partition_sizes, + partition_idxs, + group_idxs, + ); + p_values + .into_iter() + .zip(p_mask) + .map(|(values, mask)| Box::new(Self { values, mask }) as _) + .collect() + } + fn as_any(&self) -> &dyn Any { self } diff --git a/crates/polars-expr/src/reduce/mod.rs b/crates/polars-expr/src/reduce/mod.rs index 170ee6abff6c..a5199277cd83 100644 --- a/crates/polars-expr/src/reduce/mod.rs +++ b/crates/polars-expr/src/reduce/mod.rs @@ -2,6 +2,7 @@ mod convert; mod len; mod mean; mod min_max; +mod partition; mod sum; mod var_std; @@ -49,6 +50,23 @@ pub trait GroupedReduction: Any + Send { group_idxs: &[IdxSize], ) -> PolarsResult<()>; + /// Partitions this GroupedReduction into several partitions. + /// + /// The ith group of this GroupedReduction should becomes the group_idxs[i] + /// group in partition partition_idxs[i]. + /// + /// # Safety + /// partitions_idxs[i] < partition_sizes.len() for all i. + /// group_idxs[i] < partition_sizes[partition_idxs[i]] for all i. + /// Each partition p has an associated set of group_idxs, this set contains + /// 0..partition_size[p] exactly once. + unsafe fn partition( + self: Box, + partition_sizes: &[IdxSize], + partition_idxs: &[IdxSize], + group_idxs: &[IdxSize], + ) -> Vec>; + /// Returns the finalized value per group as a Series. /// /// After this operation the number of groups is reset to 0. @@ -245,6 +263,24 @@ where Ok(()) } + unsafe fn partition( + self: Box, + partition_sizes: &[IdxSize], + partition_idxs: &[IdxSize], + group_idxs: &[IdxSize], + ) -> Vec> { + partition::partition_vec(self.values, partition_sizes, partition_idxs, group_idxs) + .into_iter() + .map(|values| { + Box::new(Self { + values, + in_dtype: self.in_dtype.clone(), + reducer: self.reducer.clone(), + }) as _ + }) + .collect() + } + fn finalize(&mut self) -> PolarsResult { let v = core::mem::take(&mut self.values); self.reducer.finish(v, None, &self.in_dtype) @@ -353,6 +389,25 @@ where Ok(()) } + unsafe fn partition( + self: Box, + partition_sizes: &[IdxSize], + partition_idxs: &[IdxSize], + group_idxs: &[IdxSize], + ) -> Vec> { + partition::partition_vec_mask(self.values, &self.mask.freeze(), partition_sizes, partition_idxs, group_idxs) + .into_iter() + .map(|(values, mask)| { + Box::new(Self { + values, + mask, + in_dtype: self.in_dtype.clone(), + reducer: self.reducer.clone(), + }) as _ + }) + .collect() + } + fn finalize(&mut self) -> PolarsResult { let v = core::mem::take(&mut self.values); let m = core::mem::take(&mut self.mask); diff --git a/crates/polars-expr/src/reduce/partition.rs b/crates/polars-expr/src/reduce/partition.rs new file mode 100644 index 000000000000..70c7d4fea2a6 --- /dev/null +++ b/crates/polars-expr/src/reduce/partition.rs @@ -0,0 +1,114 @@ +use arrow::bitmap::{Bitmap, MutableBitmap}; +use polars_utils::itertools::Itertools; +use polars_utils::IdxSize; + +/// Partitions this Vec into multiple Vecs. +/// +/// # Safety +/// partitions_idxs[i] < partition_sizes.len() for all i. +/// idx_in_partition[i] < partition_sizes[partition_idxs[i]] for all i. +/// Each partition p has an associated set of idx_in_partition, this set +/// contains 0..partition_size[p] exactly once. +pub unsafe fn partition_vec( + v: Vec, + partition_sizes: &[IdxSize], + partition_idxs: &[IdxSize], + idx_in_partition: &[IdxSize], +) -> Vec> { + assert!(idx_in_partition.len() == v.len()); + assert!(partition_idxs.len() == v.len()); + + let mut partitions = partition_sizes + .iter() + .map(|sz| Vec::::with_capacity(*sz as usize)) + .collect_vec(); + + unsafe { + // Scatter into each partition. + for (i, val) in v.into_iter().enumerate() { + let p_idx = *partition_idxs.get_unchecked(i) as usize; + let idx_in_p = *idx_in_partition.get_unchecked(i) as usize; + debug_assert!(p_idx < partitions.len()); + let p = partitions.get_unchecked_mut(p_idx); + debug_assert!(idx_in_p < p.capacity()); + p.as_mut_ptr().add(idx_in_p).write(val); + } + + for (p, sz) in partitions.iter_mut().zip(partition_sizes) { + p.set_len(*sz as usize); + } + } + + partitions +} + +/// # Safety +/// Same as partition_vec. +pub unsafe fn partition_mask( + m: &Bitmap, + partition_sizes: &[IdxSize], + partition_idxs: &[IdxSize], + idx_in_partition: &[IdxSize], +) -> Vec { + assert!(idx_in_partition.len() == m.len()); + assert!(partition_idxs.len() == m.len()); + + let mut partitions = partition_sizes + .iter() + .map(|sz| MutableBitmap::from_len_zeroed(*sz as usize)) + .collect_vec(); + + unsafe { + // Scatter into each partition. + for i in 0..m.len() { + let p_idx = *partition_idxs.get_unchecked(i) as usize; + let idx_in_p = *idx_in_partition.get_unchecked(i) as usize; + debug_assert!(p_idx < partitions.len()); + let p = partitions.get_unchecked_mut(p_idx); + debug_assert!(idx_in_p < p.capacity()); + p.set_unchecked(idx_in_p, m.get_bit_unchecked(i)); + } + } + + partitions +} + +/// A fused loop of partition_vec and partition_mask. +/// # Safety +/// Same as partition_vec. +pub unsafe fn partition_vec_mask( + v: Vec, + m: &Bitmap, + partition_sizes: &[IdxSize], + partition_idxs: &[IdxSize], + idx_in_partition: &[IdxSize], +) -> Vec<(Vec, MutableBitmap)> { + assert!(idx_in_partition.len() == v.len()); + assert!(partition_idxs.len() == v.len()); + assert!(m.len() == v.len()); + + let mut partitions = partition_sizes + .iter() + .map(|sz| (Vec::::with_capacity(*sz as usize), MutableBitmap::from_len_zeroed(*sz as usize))) + .collect_vec(); + + unsafe { + // Scatter into each partition. + for (i, val) in v.into_iter().enumerate() { + let p_idx = *partition_idxs.get_unchecked(i) as usize; + let idx_in_p = *idx_in_partition.get_unchecked(i) as usize; + debug_assert!(p_idx < partitions.len()); + let (pv, pm) = partitions.get_unchecked_mut(p_idx); + debug_assert!(idx_in_p < pv.capacity()); + debug_assert!(idx_in_p < pm.len()); + pv.as_mut_ptr().add(idx_in_p).write(val); + pm.set_unchecked(idx_in_p, m.get_bit_unchecked(i)); + } + + for (p, sz) in partitions.iter_mut().zip(partition_sizes) { + p.0.set_len(*sz as usize); + } + } + + partitions +} diff --git a/crates/polars-expr/src/reduce/sum.rs b/crates/polars-expr/src/reduce/sum.rs index 111d69eec4f2..0e36b038c331 100644 --- a/crates/polars-expr/src/reduce/sum.rs +++ b/crates/polars-expr/src/reduce/sum.rs @@ -126,6 +126,23 @@ where Ok(()) } + unsafe fn partition( + self: Box, + partition_sizes: &[IdxSize], + partition_idxs: &[IdxSize], + group_idxs: &[IdxSize], + ) -> Vec> { + partition::partition_vec(self.sums, partition_sizes, partition_idxs, group_idxs) + .into_iter() + .map(|sums| { + Box::new(Self { + sums, + in_dtype: self.in_dtype.clone(), + }) as _ + }) + .collect() + } + fn finalize(&mut self) -> PolarsResult { let v = core::mem::take(&mut self.sums); let arr = Box::new(PrimitiveArray::::from_vec(v)); diff --git a/crates/polars-stream/src/nodes/group_by.rs b/crates/polars-stream/src/nodes/group_by.rs index 6954263bb99b..eb03ef45ef85 100644 --- a/crates/polars-stream/src/nodes/group_by.rs +++ b/crates/polars-stream/src/nodes/group_by.rs @@ -1,9 +1,14 @@ +use std::mem::ManuallyDrop; use std::sync::Arc; use polars_core::prelude::IntoColumn; use polars_core::schema::Schema; +use polars_core::utils::accumulate_dataframes_vertical_unchecked; use polars_expr::groups::Grouper; use polars_expr::reduce::GroupedReduction; +use polars_utils::itertools::Itertools; +use polars_utils::sync::SyncPtr; +use rayon::prelude::*; use super::compute_node_prelude::*; use crate::async_primitives::connector::Receiver; @@ -76,13 +81,14 @@ impl GroupBySinkState { })); } } - - fn into_source(mut self, output_schema: &Schema) -> PolarsResult { - // TODO: parallelize this with partitions. + + fn combine_locals( + output_schema: &Schema, + mut locals: Vec, + ) -> PolarsResult { let mut group_idxs = Vec::new(); - let num_pipelines = self.local.len(); - let mut combined = self.local.pop().unwrap(); - for local in self.local { + let mut combined = locals.pop().unwrap(); + for local in locals { combined.grouper.combine(&*local.grouper, &mut group_idxs); for (l, r) in combined .grouped_reductions @@ -102,10 +108,70 @@ impl GroupBySinkState { out.with_column_unchecked(r.finalize()?.with_name(name.clone()).into_column()); } } - let mut source_node = InMemorySourceNode::new(Arc::new(out)); - source_node.initialize(num_pipelines); + Ok(out) + } + + fn into_source_parallel(self, output_schema: &Schema) -> PolarsResult { + let num_partitions = self.local.len(); + let seed = 0xdeadbeef; + let partitioned_locals: Vec<_> = self + .local + .into_par_iter() + .with_max_len(1) + .map(|local| { + let mut group_idxs = Vec::new(); + let mut partition_idxs = Vec::new(); + let p_groupers = local.grouper.partition( + seed, + num_partitions, + &mut partition_idxs, + &mut group_idxs, + ); + let partition_sizes = p_groupers.iter().map(|g| g.num_groups()).collect_vec(); + let grouped_reductions_p = local + .grouped_reductions + .into_iter() + .map(|r| unsafe { + r.partition(&partition_sizes, &partition_idxs, &group_idxs) + }) + .collect_vec(); + (p_groupers, grouped_reductions_p) + }) + .collect(); + + let frames = unsafe { + let mut partitioned_locals = ManuallyDrop::new(partitioned_locals); + let partitioned_locals_ptr = SyncPtr::new(partitioned_locals.as_mut_ptr()); + (0..num_partitions).into_par_iter().with_max_len(1).map(|p| { + let locals_in_p = (0..num_partitions).map(|l| { + let partitioned_local = &*partitioned_locals_ptr.get().add(l); + let (p_groupers, grouped_reductions_p) = partitioned_local; + LocalGroupBySinkState { + grouper: p_groupers.as_ptr().add(p).read(), + grouped_reductions: grouped_reductions_p.iter().map(|r| r.as_ptr().add(p).read()).collect(), + } + }).collect(); + Self::combine_locals(output_schema, locals_in_p) + }).collect::>>() + }; + + let df = accumulate_dataframes_vertical_unchecked(frames?); + let mut source_node = InMemorySourceNode::new(Arc::new(df)); + source_node.initialize(num_partitions); Ok(source_node) } + + fn into_source(self, output_schema: &Schema) -> PolarsResult { + if std::env::var("POLARS_PARALLEL_GROUPBY_FINALIZE").as_deref() == Ok("1") { + self.into_source_parallel(output_schema) + } else { + let num_pipelines = self.local.len(); + let df = Self::combine_locals(output_schema, self.local); + let mut source_node = InMemorySourceNode::new(Arc::new(df?)); + source_node.initialize(num_pipelines); + Ok(source_node) + } + } } enum GroupByState { diff --git a/crates/polars-utils/src/hashing.rs b/crates/polars-utils/src/hashing.rs index 12e59bf52f26..17ba1686c067 100644 --- a/crates/polars-utils/src/hashing.rs +++ b/crates/polars-utils/src/hashing.rs @@ -2,6 +2,11 @@ use std::hash::{Hash, Hasher}; use crate::nulls::IsNull; +pub const fn folded_multiply(a: u64, b: u64) -> u64 { + let full = (a as u128).wrapping_mul(b as u128); + (full as u64) ^ ((full >> 64) as u64) +} + /// Contains a byte slice and a precomputed hash for that string. /// During rehashes, we will rehash the hash instead of the string, that makes /// rehashing cheap and allows cache coherent small hash tables. From 91ad500225941215a99fa41948c52d7759c1c6ba Mon Sep 17 00:00:00 2001 From: Orson Peters Date: Fri, 25 Oct 2024 15:50:12 +0200 Subject: [PATCH 2/4] add bitmapbuilder --- crates/polars-arrow/src/bitmap/builder.rs | 100 ++++++++++++++++++++++ crates/polars-arrow/src/bitmap/mod.rs | 3 + 2 files changed, 103 insertions(+) create mode 100644 crates/polars-arrow/src/bitmap/builder.rs diff --git a/crates/polars-arrow/src/bitmap/builder.rs b/crates/polars-arrow/src/bitmap/builder.rs new file mode 100644 index 000000000000..40e9ebcd1827 --- /dev/null +++ b/crates/polars-arrow/src/bitmap/builder.rs @@ -0,0 +1,100 @@ +use crate::bitmap::{Bitmap, MutableBitmap}; +use crate::storage::SharedStorage; + +/// Used to build bitmaps bool-by-bool in sequential order. +#[derive(Default, Clone)] +pub struct BitmapBuilder { + buf: u64, + len: usize, + cap: usize, + set_bits: usize, + bytes: Vec, +} + +impl BitmapBuilder { + pub fn new() -> Self { + Self::default() + } + + pub fn len(&self) -> usize { + self.len + } + + pub fn capacity(&self) -> usize { + self.cap + } + + pub fn with_capacity(bits: usize) -> Self { + let bytes = Vec::with_capacity(bits.div_ceil(64) * 8); + let words_available = bytes.capacity() / 8; + Self { + buf: 0, + len: 0, + cap: words_available * 64, + set_bits: 0, + bytes + } + } + + #[inline(always)] + pub fn reserve(&mut self, additional: usize) { + if self.len + additional > self.cap { + self.reserve_slow(additional) + } + } + + #[cold] + #[inline(never)] + fn reserve_slow(&mut self, additional: usize) { + let bytes_needed = (self.len + additional).div_ceil(64) * 8; + self.bytes.reserve(bytes_needed - self.bytes.capacity()); + let words_available = self.bytes.capacity() / 8; + self.cap = words_available * 64; + } + + #[inline(always)] + pub fn push(&mut self, x: bool) { + self.reserve(1); + unsafe { self.push_unchecked(x) } + } + + /// # Safety + /// self.len() < self.capacity() must hold. + #[inline(always)] + pub unsafe fn push_unchecked(&mut self, x: bool) { + debug_assert!(self.len < self.cap); + self.buf |= (x as u64) << (self.len % 64); + self.len += 1; + if self.len % 64 == 0 { + let p = self.bytes.as_mut_ptr().add(self.bytes.len()).cast::(); + p.write_unaligned(self.buf.to_le()); + self.bytes.set_len(self.bytes.len() + 8); + self.set_bits += self.buf.count_ones() as usize; + self.buf = 0; + } + } + + /// # Safety + /// May only be called once at the end. + unsafe fn finish(&mut self) { + if self.len % 64 != 0 { + self.bytes.extend_from_slice(&self.buf.to_le_bytes()); + self.set_bits += self.buf.count_ones() as usize; + } + } + + pub fn into_mut(mut self) -> MutableBitmap { + unsafe { + self.finish(); + MutableBitmap::from_vec(self.bytes, self.len) + } + } + + pub fn freeze(mut self) -> Bitmap { + unsafe { + self.finish(); + let storage = SharedStorage::from_vec(self.bytes); + Bitmap::from_inner_unchecked(storage, 0, self.len, Some(self.len - self.set_bits)) + } + } +} \ No newline at end of file diff --git a/crates/polars-arrow/src/bitmap/mod.rs b/crates/polars-arrow/src/bitmap/mod.rs index e7ed5fa363e8..2191fb2e3faa 100644 --- a/crates/polars-arrow/src/bitmap/mod.rs +++ b/crates/polars-arrow/src/bitmap/mod.rs @@ -19,3 +19,6 @@ pub use assign_ops::*; pub mod utils; pub mod bitmask; + +mod builder; +pub use builder::*; \ No newline at end of file From 76d9d459abf675d5e776abf0f9e8dc8dbb6e6770 Mon Sep 17 00:00:00 2001 From: Orson Peters Date: Fri, 25 Oct 2024 15:50:36 +0200 Subject: [PATCH 3/4] avoid unnecessary materialization of group indices --- crates/polars-expr/src/groups/mod.rs | 5 ++- crates/polars-expr/src/groups/row_encoded.rs | 8 +---- crates/polars-expr/src/reduce/len.rs | 3 +- crates/polars-expr/src/reduce/min_max.rs | 10 ++---- crates/polars-expr/src/reduce/mod.rs | 9 ++---- crates/polars-expr/src/reduce/partition.rs | 34 ++++++-------------- crates/polars-expr/src/reduce/sum.rs | 3 +- crates/polars-stream/src/nodes/group_by.rs | 4 +-- 8 files changed, 21 insertions(+), 55 deletions(-) diff --git a/crates/polars-expr/src/groups/mod.rs b/crates/polars-expr/src/groups/mod.rs index 37103006de7a..43091244c661 100644 --- a/crates/polars-expr/src/groups/mod.rs +++ b/crates/polars-expr/src/groups/mod.rs @@ -25,8 +25,8 @@ pub trait Grouper: Any + Send { /// Partitions this Grouper into the given number of partitions. /// - /// Updates partition_idxs and group_idxs such that the ith group of self - /// has group index group_idxs[i] in partition partition_idxs[i]. + /// Updates partition_idxs such that the ith group of self moves to partition + /// partition_idxs[i]. /// /// It is guaranteed that two equal keys in two independent partition_into /// calls map to the same partition index if the seed and the number of @@ -36,7 +36,6 @@ pub trait Grouper: Any + Send { seed: u64, num_partitions: usize, partition_idxs: &mut Vec, - group_idxs: &mut Vec, ) -> Vec>; /// Returns the keys in this Grouper in group order, that is the key for diff --git a/crates/polars-expr/src/groups/row_encoded.rs b/crates/polars-expr/src/groups/row_encoded.rs index 6745d8952b47..52b1c3aaa348 100644 --- a/crates/polars-expr/src/groups/row_encoded.rs +++ b/crates/polars-expr/src/groups/row_encoded.rs @@ -196,7 +196,6 @@ impl Grouper for RowEncodedHashGrouper { seed: u64, num_partitions: usize, partition_idxs: &mut Vec, - group_idxs: &mut Vec, ) -> Vec> { assert!(num_partitions > 0); @@ -226,21 +225,16 @@ impl Grouper for RowEncodedHashGrouper { unsafe { partition_idxs.clear(); - group_idxs.clear(); partition_idxs.reserve(self.table.len()); - group_idxs.reserve(self.table.len()); let partition_idxs_out = partition_idxs.spare_capacity_mut(); - let group_idxs_out = group_idxs.spare_capacity_mut(); for group in self.table.iter() { let ph = folded_multiply(group.key_hash, seed | 1); let p_idx = hash_to_partition(ph, num_partitions); let p = partitions.get_unchecked_mut(p_idx); - let group_idx = p.insert_key_unique(group.key_hash, group.key(&self.key_data)); + p.insert_key_unique(group.key_hash, group.key(&self.key_data)); *partition_idxs_out.get_unchecked_mut(group.group_idx as usize) = MaybeUninit::new(p_idx as IdxSize); - *group_idxs_out.get_unchecked_mut(group.group_idx as usize) = MaybeUninit::new(group_idx); } partition_idxs.set_len(self.table.len()); - group_idxs.set_len(self.table.len()); } partitions.into_iter().map(|p| Box::new(p) as _).collect() diff --git a/crates/polars-expr/src/reduce/len.rs b/crates/polars-expr/src/reduce/len.rs index 99ae57323a75..89bd4ca3a5aa 100644 --- a/crates/polars-expr/src/reduce/len.rs +++ b/crates/polars-expr/src/reduce/len.rs @@ -67,9 +67,8 @@ impl GroupedReduction for LenReduce { self: Box, partition_sizes: &[IdxSize], partition_idxs: &[IdxSize], - group_idxs: &[IdxSize], ) -> Vec> { - partition_vec(self.groups, partition_sizes, partition_idxs, group_idxs) + partition_vec(self.groups, partition_sizes, partition_idxs) .into_iter() .map(|groups| Box::new(Self { groups }) as _) .collect() diff --git a/crates/polars-expr/src/reduce/min_max.rs b/crates/polars-expr/src/reduce/min_max.rs index 38c148c16206..00b85e99f059 100644 --- a/crates/polars-expr/src/reduce/min_max.rs +++ b/crates/polars-expr/src/reduce/min_max.rs @@ -349,24 +349,21 @@ impl GroupedReduction for BoolMinGroupedReduction { self: Box, partition_sizes: &[IdxSize], partition_idxs: &[IdxSize], - group_idxs: &[IdxSize], ) -> Vec> { let p_values = partition_mask( &self.values.freeze(), partition_sizes, partition_idxs, - group_idxs, ); let p_mask = partition_mask( &self.mask.freeze(), partition_sizes, partition_idxs, - group_idxs, ); p_values .into_iter() .zip(p_mask) - .map(|(values, mask)| Box::new(Self { values, mask }) as _) + .map(|(values, mask)| Box::new(Self { values: values.into_mut(), mask: mask.into_mut() }) as _) .collect() } @@ -480,24 +477,21 @@ impl GroupedReduction for BoolMaxGroupedReduction { self: Box, partition_sizes: &[IdxSize], partition_idxs: &[IdxSize], - group_idxs: &[IdxSize], ) -> Vec> { let p_values = partition_mask( &self.values.freeze(), partition_sizes, partition_idxs, - group_idxs, ); let p_mask = partition_mask( &self.mask.freeze(), partition_sizes, partition_idxs, - group_idxs, ); p_values .into_iter() .zip(p_mask) - .map(|(values, mask)| Box::new(Self { values, mask }) as _) + .map(|(values, mask)| Box::new(Self { values: values.into_mut(), mask: mask.into_mut() }) as _) .collect() } diff --git a/crates/polars-expr/src/reduce/mod.rs b/crates/polars-expr/src/reduce/mod.rs index a5199277cd83..bd1e55e8bb00 100644 --- a/crates/polars-expr/src/reduce/mod.rs +++ b/crates/polars-expr/src/reduce/mod.rs @@ -64,7 +64,6 @@ pub trait GroupedReduction: Any + Send { self: Box, partition_sizes: &[IdxSize], partition_idxs: &[IdxSize], - group_idxs: &[IdxSize], ) -> Vec>; /// Returns the finalized value per group as a Series. @@ -267,9 +266,8 @@ where self: Box, partition_sizes: &[IdxSize], partition_idxs: &[IdxSize], - group_idxs: &[IdxSize], ) -> Vec> { - partition::partition_vec(self.values, partition_sizes, partition_idxs, group_idxs) + partition::partition_vec(self.values, partition_sizes, partition_idxs) .into_iter() .map(|values| { Box::new(Self { @@ -393,14 +391,13 @@ where self: Box, partition_sizes: &[IdxSize], partition_idxs: &[IdxSize], - group_idxs: &[IdxSize], ) -> Vec> { - partition::partition_vec_mask(self.values, &self.mask.freeze(), partition_sizes, partition_idxs, group_idxs) + partition::partition_vec_mask(self.values, &self.mask.freeze(), partition_sizes, partition_idxs) .into_iter() .map(|(values, mask)| { Box::new(Self { values, - mask, + mask: mask.into_mut(), in_dtype: self.in_dtype.clone(), reducer: self.reducer.clone(), }) as _ diff --git a/crates/polars-expr/src/reduce/partition.rs b/crates/polars-expr/src/reduce/partition.rs index 70c7d4fea2a6..33e94e579ab5 100644 --- a/crates/polars-expr/src/reduce/partition.rs +++ b/crates/polars-expr/src/reduce/partition.rs @@ -1,5 +1,6 @@ -use arrow::bitmap::{Bitmap, MutableBitmap}; +use arrow::bitmap::{Bitmap, BitmapBuilder}; use polars_utils::itertools::Itertools; +use polars_utils::vec::PushUnchecked; use polars_utils::IdxSize; /// Partitions this Vec into multiple Vecs. @@ -13,9 +14,7 @@ pub unsafe fn partition_vec( v: Vec, partition_sizes: &[IdxSize], partition_idxs: &[IdxSize], - idx_in_partition: &[IdxSize], ) -> Vec> { - assert!(idx_in_partition.len() == v.len()); assert!(partition_idxs.len() == v.len()); let mut partitions = partition_sizes @@ -27,11 +26,9 @@ pub unsafe fn partition_vec( // Scatter into each partition. for (i, val) in v.into_iter().enumerate() { let p_idx = *partition_idxs.get_unchecked(i) as usize; - let idx_in_p = *idx_in_partition.get_unchecked(i) as usize; debug_assert!(p_idx < partitions.len()); let p = partitions.get_unchecked_mut(p_idx); - debug_assert!(idx_in_p < p.capacity()); - p.as_mut_ptr().add(idx_in_p).write(val); + p.push_unchecked(val); } for (p, sz) in partitions.iter_mut().zip(partition_sizes) { @@ -48,25 +45,20 @@ pub unsafe fn partition_mask( m: &Bitmap, partition_sizes: &[IdxSize], partition_idxs: &[IdxSize], - idx_in_partition: &[IdxSize], -) -> Vec { - assert!(idx_in_partition.len() == m.len()); +) -> Vec { assert!(partition_idxs.len() == m.len()); let mut partitions = partition_sizes .iter() - .map(|sz| MutableBitmap::from_len_zeroed(*sz as usize)) + .map(|sz| BitmapBuilder::with_capacity(*sz as usize)) .collect_vec(); unsafe { // Scatter into each partition. for i in 0..m.len() { let p_idx = *partition_idxs.get_unchecked(i) as usize; - let idx_in_p = *idx_in_partition.get_unchecked(i) as usize; - debug_assert!(p_idx < partitions.len()); let p = partitions.get_unchecked_mut(p_idx); - debug_assert!(idx_in_p < p.capacity()); - p.set_unchecked(idx_in_p, m.get_bit_unchecked(i)); + p.push_unchecked(m.get_bit_unchecked(i)); } } @@ -81,28 +73,22 @@ pub unsafe fn partition_vec_mask( m: &Bitmap, partition_sizes: &[IdxSize], partition_idxs: &[IdxSize], - idx_in_partition: &[IdxSize], -) -> Vec<(Vec, MutableBitmap)> { - assert!(idx_in_partition.len() == v.len()); +) -> Vec<(Vec, BitmapBuilder)> { assert!(partition_idxs.len() == v.len()); assert!(m.len() == v.len()); let mut partitions = partition_sizes .iter() - .map(|sz| (Vec::::with_capacity(*sz as usize), MutableBitmap::from_len_zeroed(*sz as usize))) + .map(|sz| (Vec::::with_capacity(*sz as usize), BitmapBuilder::with_capacity(*sz as usize))) .collect_vec(); unsafe { // Scatter into each partition. for (i, val) in v.into_iter().enumerate() { let p_idx = *partition_idxs.get_unchecked(i) as usize; - let idx_in_p = *idx_in_partition.get_unchecked(i) as usize; - debug_assert!(p_idx < partitions.len()); let (pv, pm) = partitions.get_unchecked_mut(p_idx); - debug_assert!(idx_in_p < pv.capacity()); - debug_assert!(idx_in_p < pm.len()); - pv.as_mut_ptr().add(idx_in_p).write(val); - pm.set_unchecked(idx_in_p, m.get_bit_unchecked(i)); + pv.push_unchecked(val); + pm.push_unchecked(m.get_bit_unchecked(i)); } for (p, sz) in partitions.iter_mut().zip(partition_sizes) { diff --git a/crates/polars-expr/src/reduce/sum.rs b/crates/polars-expr/src/reduce/sum.rs index 0e36b038c331..466d5ffb9f9d 100644 --- a/crates/polars-expr/src/reduce/sum.rs +++ b/crates/polars-expr/src/reduce/sum.rs @@ -130,9 +130,8 @@ where self: Box, partition_sizes: &[IdxSize], partition_idxs: &[IdxSize], - group_idxs: &[IdxSize], ) -> Vec> { - partition::partition_vec(self.sums, partition_sizes, partition_idxs, group_idxs) + partition::partition_vec(self.sums, partition_sizes, partition_idxs) .into_iter() .map(|sums| { Box::new(Self { diff --git a/crates/polars-stream/src/nodes/group_by.rs b/crates/polars-stream/src/nodes/group_by.rs index eb03ef45ef85..e9ca1fd3eb21 100644 --- a/crates/polars-stream/src/nodes/group_by.rs +++ b/crates/polars-stream/src/nodes/group_by.rs @@ -119,20 +119,18 @@ impl GroupBySinkState { .into_par_iter() .with_max_len(1) .map(|local| { - let mut group_idxs = Vec::new(); let mut partition_idxs = Vec::new(); let p_groupers = local.grouper.partition( seed, num_partitions, &mut partition_idxs, - &mut group_idxs, ); let partition_sizes = p_groupers.iter().map(|g| g.num_groups()).collect_vec(); let grouped_reductions_p = local .grouped_reductions .into_iter() .map(|r| unsafe { - r.partition(&partition_sizes, &partition_idxs, &group_idxs) + r.partition(&partition_sizes, &partition_idxs) }) .collect_vec(); (p_groupers, grouped_reductions_p) From 21ad2b77d51dd7d2969be4e19724b2581a716d0b Mon Sep 17 00:00:00 2001 From: Orson Peters Date: Fri, 25 Oct 2024 15:51:17 +0200 Subject: [PATCH 4/4] fmt --- crates/polars-arrow/src/bitmap/builder.rs | 18 +++---- crates/polars-arrow/src/bitmap/mod.rs | 2 +- crates/polars-expr/src/groups/row_encoded.rs | 15 ++++-- crates/polars-expr/src/reduce/len.rs | 5 +- crates/polars-expr/src/reduce/min_max.rs | 38 +++++++-------- crates/polars-expr/src/reduce/mod.rs | 27 ++++++----- crates/polars-expr/src/reduce/partition.rs | 13 ++++-- crates/polars-stream/src/nodes/group_by.rs | 49 +++++++++++--------- 8 files changed, 90 insertions(+), 77 deletions(-) diff --git a/crates/polars-arrow/src/bitmap/builder.rs b/crates/polars-arrow/src/bitmap/builder.rs index 40e9ebcd1827..c507df97c5ba 100644 --- a/crates/polars-arrow/src/bitmap/builder.rs +++ b/crates/polars-arrow/src/bitmap/builder.rs @@ -15,11 +15,11 @@ impl BitmapBuilder { pub fn new() -> Self { Self::default() } - + pub fn len(&self) -> usize { self.len } - + pub fn capacity(&self) -> usize { self.cap } @@ -32,17 +32,17 @@ impl BitmapBuilder { len: 0, cap: words_available * 64, set_bits: 0, - bytes + bytes, } } - + #[inline(always)] pub fn reserve(&mut self, additional: usize) { if self.len + additional > self.cap { self.reserve_slow(additional) } } - + #[cold] #[inline(never)] fn reserve_slow(&mut self, additional: usize) { @@ -51,7 +51,7 @@ impl BitmapBuilder { let words_available = self.bytes.capacity() / 8; self.cap = words_available * 64; } - + #[inline(always)] pub fn push(&mut self, x: bool) { self.reserve(1); @@ -73,7 +73,7 @@ impl BitmapBuilder { self.buf = 0; } } - + /// # Safety /// May only be called once at the end. unsafe fn finish(&mut self) { @@ -82,7 +82,7 @@ impl BitmapBuilder { self.set_bits += self.buf.count_ones() as usize; } } - + pub fn into_mut(mut self) -> MutableBitmap { unsafe { self.finish(); @@ -97,4 +97,4 @@ impl BitmapBuilder { Bitmap::from_inner_unchecked(storage, 0, self.len, Some(self.len - self.set_bits)) } } -} \ No newline at end of file +} diff --git a/crates/polars-arrow/src/bitmap/mod.rs b/crates/polars-arrow/src/bitmap/mod.rs index 2191fb2e3faa..6d518bf596b4 100644 --- a/crates/polars-arrow/src/bitmap/mod.rs +++ b/crates/polars-arrow/src/bitmap/mod.rs @@ -21,4 +21,4 @@ pub mod utils; pub mod bitmask; mod builder; -pub use builder::*; \ No newline at end of file +pub use builder::*; diff --git a/crates/polars-expr/src/groups/row_encoded.rs b/crates/polars-expr/src/groups/row_encoded.rs index 52b1c3aaa348..1a2fd5209436 100644 --- a/crates/polars-expr/src/groups/row_encoded.rs +++ b/crates/polars-expr/src/groups/row_encoded.rs @@ -31,7 +31,7 @@ pub struct RowEncodedHashGrouper { key_data: Vec, // Used for computing canonical hashes. - random_state: PlRandomState, + random_state: PlRandomState, // Internal random seed used to keep hash iteration order decorrelated. // We simply store a random odd number and multiply the canonical hash by it. @@ -83,7 +83,10 @@ impl RowEncodedHashGrouper { group_idx, }; self.key_data.extend(key); - self.table.insert_unique(hash.wrapping_mul(self.seed), group, |g| g.key_hash.wrapping_mul(self.seed)); + self.table + .insert_unique(hash.wrapping_mul(self.seed), group, |g| { + g.key_hash.wrapping_mul(self.seed) + }); group_idx } @@ -149,7 +152,8 @@ impl Grouper for RowEncodedHashGrouper { let other = other.as_any().downcast_ref::().unwrap(); // TODO: cardinality estimation. - self.table.reserve(other.table.len(), |g| g.key_hash.wrapping_mul(self.seed)); + self.table + .reserve(other.table.len(), |g| g.key_hash.wrapping_mul(self.seed)); unsafe { group_idxs.clear(); @@ -232,11 +236,12 @@ impl Grouper for RowEncodedHashGrouper { let p_idx = hash_to_partition(ph, num_partitions); let p = partitions.get_unchecked_mut(p_idx); p.insert_key_unique(group.key_hash, group.key(&self.key_data)); - *partition_idxs_out.get_unchecked_mut(group.group_idx as usize) = MaybeUninit::new(p_idx as IdxSize); + *partition_idxs_out.get_unchecked_mut(group.group_idx as usize) = + MaybeUninit::new(p_idx as IdxSize); } partition_idxs.set_len(self.table.len()); } - + partitions.into_iter().map(|p| Box::new(p) as _).collect() } diff --git a/crates/polars-expr/src/reduce/len.rs b/crates/polars-expr/src/reduce/len.rs index 89bd4ca3a5aa..57641b1a02b6 100644 --- a/crates/polars-expr/src/reduce/len.rs +++ b/crates/polars-expr/src/reduce/len.rs @@ -1,8 +1,7 @@ use polars_core::error::constants::LENGTH_LIMIT_MSG; -use crate::reduce::partition::partition_vec; - use super::*; +use crate::reduce::partition::partition_vec; #[derive(Default)] pub struct LenReduce { @@ -62,7 +61,7 @@ impl GroupedReduction for LenReduce { .collect_ca(PlSmallStr::EMPTY); Ok(ca.into_series()) } - + unsafe fn partition( self: Box, partition_sizes: &[IdxSize], diff --git a/crates/polars-expr/src/reduce/min_max.rs b/crates/polars-expr/src/reduce/min_max.rs index 00b85e99f059..de25d3efc927 100644 --- a/crates/polars-expr/src/reduce/min_max.rs +++ b/crates/polars-expr/src/reduce/min_max.rs @@ -350,20 +350,17 @@ impl GroupedReduction for BoolMinGroupedReduction { partition_sizes: &[IdxSize], partition_idxs: &[IdxSize], ) -> Vec> { - let p_values = partition_mask( - &self.values.freeze(), - partition_sizes, - partition_idxs, - ); - let p_mask = partition_mask( - &self.mask.freeze(), - partition_sizes, - partition_idxs, - ); + let p_values = partition_mask(&self.values.freeze(), partition_sizes, partition_idxs); + let p_mask = partition_mask(&self.mask.freeze(), partition_sizes, partition_idxs); p_values .into_iter() .zip(p_mask) - .map(|(values, mask)| Box::new(Self { values: values.into_mut(), mask: mask.into_mut() }) as _) + .map(|(values, mask)| { + Box::new(Self { + values: values.into_mut(), + mask: mask.into_mut(), + }) as _ + }) .collect() } @@ -478,20 +475,17 @@ impl GroupedReduction for BoolMaxGroupedReduction { partition_sizes: &[IdxSize], partition_idxs: &[IdxSize], ) -> Vec> { - let p_values = partition_mask( - &self.values.freeze(), - partition_sizes, - partition_idxs, - ); - let p_mask = partition_mask( - &self.mask.freeze(), - partition_sizes, - partition_idxs, - ); + let p_values = partition_mask(&self.values.freeze(), partition_sizes, partition_idxs); + let p_mask = partition_mask(&self.mask.freeze(), partition_sizes, partition_idxs); p_values .into_iter() .zip(p_mask) - .map(|(values, mask)| Box::new(Self { values: values.into_mut(), mask: mask.into_mut() }) as _) + .map(|(values, mask)| { + Box::new(Self { + values: values.into_mut(), + mask: mask.into_mut(), + }) as _ + }) .collect() } diff --git a/crates/polars-expr/src/reduce/mod.rs b/crates/polars-expr/src/reduce/mod.rs index bd1e55e8bb00..bfe4cb56417b 100644 --- a/crates/polars-expr/src/reduce/mod.rs +++ b/crates/polars-expr/src/reduce/mod.rs @@ -392,17 +392,22 @@ where partition_sizes: &[IdxSize], partition_idxs: &[IdxSize], ) -> Vec> { - partition::partition_vec_mask(self.values, &self.mask.freeze(), partition_sizes, partition_idxs) - .into_iter() - .map(|(values, mask)| { - Box::new(Self { - values, - mask: mask.into_mut(), - in_dtype: self.in_dtype.clone(), - reducer: self.reducer.clone(), - }) as _ - }) - .collect() + partition::partition_vec_mask( + self.values, + &self.mask.freeze(), + partition_sizes, + partition_idxs, + ) + .into_iter() + .map(|(values, mask)| { + Box::new(Self { + values, + mask: mask.into_mut(), + in_dtype: self.in_dtype.clone(), + reducer: self.reducer.clone(), + }) as _ + }) + .collect() } fn finalize(&mut self) -> PolarsResult { diff --git a/crates/polars-expr/src/reduce/partition.rs b/crates/polars-expr/src/reduce/partition.rs index 33e94e579ab5..0152035879bd 100644 --- a/crates/polars-expr/src/reduce/partition.rs +++ b/crates/polars-expr/src/reduce/partition.rs @@ -16,7 +16,7 @@ pub unsafe fn partition_vec( partition_idxs: &[IdxSize], ) -> Vec> { assert!(partition_idxs.len() == v.len()); - + let mut partitions = partition_sizes .iter() .map(|sz| Vec::::with_capacity(*sz as usize)) @@ -47,7 +47,7 @@ pub unsafe fn partition_mask( partition_idxs: &[IdxSize], ) -> Vec { assert!(partition_idxs.len() == m.len()); - + let mut partitions = partition_sizes .iter() .map(|sz| BitmapBuilder::with_capacity(*sz as usize)) @@ -76,10 +76,15 @@ pub unsafe fn partition_vec_mask( ) -> Vec<(Vec, BitmapBuilder)> { assert!(partition_idxs.len() == v.len()); assert!(m.len() == v.len()); - + let mut partitions = partition_sizes .iter() - .map(|sz| (Vec::::with_capacity(*sz as usize), BitmapBuilder::with_capacity(*sz as usize))) + .map(|sz| { + ( + Vec::::with_capacity(*sz as usize), + BitmapBuilder::with_capacity(*sz as usize), + ) + }) .collect_vec(); unsafe { diff --git a/crates/polars-stream/src/nodes/group_by.rs b/crates/polars-stream/src/nodes/group_by.rs index e9ca1fd3eb21..9827268a4ff5 100644 --- a/crates/polars-stream/src/nodes/group_by.rs +++ b/crates/polars-stream/src/nodes/group_by.rs @@ -81,7 +81,7 @@ impl GroupBySinkState { })); } } - + fn combine_locals( output_schema: &Schema, mut locals: Vec, @@ -120,39 +120,44 @@ impl GroupBySinkState { .with_max_len(1) .map(|local| { let mut partition_idxs = Vec::new(); - let p_groupers = local.grouper.partition( - seed, - num_partitions, - &mut partition_idxs, - ); + let p_groupers = local + .grouper + .partition(seed, num_partitions, &mut partition_idxs); let partition_sizes = p_groupers.iter().map(|g| g.num_groups()).collect_vec(); let grouped_reductions_p = local .grouped_reductions .into_iter() - .map(|r| unsafe { - r.partition(&partition_sizes, &partition_idxs) - }) + .map(|r| unsafe { r.partition(&partition_sizes, &partition_idxs) }) .collect_vec(); (p_groupers, grouped_reductions_p) }) .collect(); - + let frames = unsafe { let mut partitioned_locals = ManuallyDrop::new(partitioned_locals); let partitioned_locals_ptr = SyncPtr::new(partitioned_locals.as_mut_ptr()); - (0..num_partitions).into_par_iter().with_max_len(1).map(|p| { - let locals_in_p = (0..num_partitions).map(|l| { - let partitioned_local = &*partitioned_locals_ptr.get().add(l); - let (p_groupers, grouped_reductions_p) = partitioned_local; - LocalGroupBySinkState { - grouper: p_groupers.as_ptr().add(p).read(), - grouped_reductions: grouped_reductions_p.iter().map(|r| r.as_ptr().add(p).read()).collect(), - } - }).collect(); - Self::combine_locals(output_schema, locals_in_p) - }).collect::>>() + (0..num_partitions) + .into_par_iter() + .with_max_len(1) + .map(|p| { + let locals_in_p = (0..num_partitions) + .map(|l| { + let partitioned_local = &*partitioned_locals_ptr.get().add(l); + let (p_groupers, grouped_reductions_p) = partitioned_local; + LocalGroupBySinkState { + grouper: p_groupers.as_ptr().add(p).read(), + grouped_reductions: grouped_reductions_p + .iter() + .map(|r| r.as_ptr().add(p).read()) + .collect(), + } + }) + .collect(); + Self::combine_locals(output_schema, locals_in_p) + }) + .collect::>>() }; - + let df = accumulate_dataframes_vertical_unchecked(frames?); let mut source_node = InMemorySourceNode::new(Arc::new(df)); source_node.initialize(num_partitions);