Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor(rust): Add groupby partitioning and parallel groupby finishing to new-streaming engine #19451

Merged
merged 4 commits into from
Oct 28, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

100 changes: 100 additions & 0 deletions crates/polars-arrow/src/bitmap/builder.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,100 @@
use crate::bitmap::{Bitmap, MutableBitmap};
use crate::storage::SharedStorage;

/// Used to build bitmaps bool-by-bool in sequential order.
#[derive(Default, Clone)]
pub struct BitmapBuilder {
buf: u64,
len: usize,
cap: usize,
set_bits: usize,
bytes: Vec<u8>,
}

impl BitmapBuilder {
pub fn new() -> Self {
Self::default()
}

pub fn len(&self) -> usize {
self.len
}

pub fn capacity(&self) -> usize {
self.cap
}

pub fn with_capacity(bits: usize) -> Self {
let bytes = Vec::with_capacity(bits.div_ceil(64) * 8);
let words_available = bytes.capacity() / 8;
Self {
buf: 0,
len: 0,
cap: words_available * 64,
set_bits: 0,
bytes,
}
}

#[inline(always)]
pub fn reserve(&mut self, additional: usize) {
if self.len + additional > self.cap {
self.reserve_slow(additional)
}
}

#[cold]
#[inline(never)]
fn reserve_slow(&mut self, additional: usize) {
let bytes_needed = (self.len + additional).div_ceil(64) * 8;
self.bytes.reserve(bytes_needed - self.bytes.capacity());
let words_available = self.bytes.capacity() / 8;
self.cap = words_available * 64;
}

#[inline(always)]
pub fn push(&mut self, x: bool) {
self.reserve(1);
unsafe { self.push_unchecked(x) }
}

/// # Safety
/// self.len() < self.capacity() must hold.
#[inline(always)]
pub unsafe fn push_unchecked(&mut self, x: bool) {
debug_assert!(self.len < self.cap);
self.buf |= (x as u64) << (self.len % 64);
self.len += 1;
if self.len % 64 == 0 {
let p = self.bytes.as_mut_ptr().add(self.bytes.len()).cast::<u64>();
p.write_unaligned(self.buf.to_le());
self.bytes.set_len(self.bytes.len() + 8);
self.set_bits += self.buf.count_ones() as usize;
self.buf = 0;
}
}

/// # Safety
/// May only be called once at the end.
unsafe fn finish(&mut self) {
if self.len % 64 != 0 {
self.bytes.extend_from_slice(&self.buf.to_le_bytes());
self.set_bits += self.buf.count_ones() as usize;
}
}

pub fn into_mut(mut self) -> MutableBitmap {
unsafe {
self.finish();
MutableBitmap::from_vec(self.bytes, self.len)
}
}

pub fn freeze(mut self) -> Bitmap {
unsafe {
self.finish();
let storage = SharedStorage::from_vec(self.bytes);
Bitmap::from_inner_unchecked(storage, 0, self.len, Some(self.len - self.set_bits))
}
}
}
3 changes: 3 additions & 0 deletions crates/polars-arrow/src/bitmap/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,3 +19,6 @@ pub use assign_ops::*;
pub mod utils;

pub mod bitmask;

mod builder;
pub use builder::*;
6 changes: 1 addition & 5 deletions crates/polars-core/src/hashing/vector_hasher.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
use arrow::bitmap::utils::get_bit_unchecked;
use polars_utils::hashing::folded_multiply;
use polars_utils::total_ord::{ToTotalOrd, TotalHash};
use rayon::prelude::*;
use xxhash_rust::xxh3::xxh3_64_with_seed;
Expand Down Expand Up @@ -30,11 +31,6 @@ pub trait VecHash {
}
}

pub(crate) const fn folded_multiply(s: u64, by: u64) -> u64 {
let result = (s as u128).wrapping_mul(by as u128);
((result & 0xffff_ffff_ffff_ffff) as u64) ^ ((result >> 64) as u64)
}

pub(crate) fn get_null_hash_value(random_state: &PlRandomState) -> u64 {
// we just start with a large prime number and hash that twice
// to get a constant hash value for null/None
Expand Down
1 change: 1 addition & 0 deletions crates/polars-expr/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ polars-plan = { workspace = true }
polars-row = { workspace = true }
polars-time = { workspace = true, optional = true }
polars-utils = { workspace = true }
rand = { workspace = true }
rayon = { workspace = true }

[features]
Expand Down
13 changes: 6 additions & 7 deletions crates/polars-expr/src/groups/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,21 +23,20 @@ pub trait Grouper: Any + Send {
/// the ith group of other now has group index group_idxs[i] in self.
fn combine(&mut self, other: &dyn Grouper, group_idxs: &mut Vec<IdxSize>);

/// Partitions this Grouper into the given partitions.
/// Partitions this Grouper into the given number of partitions.
///
/// Updates partition_idxs and group_idxs such that the ith group of self
/// has group index group_idxs[i] in partition partition_idxs[i].
/// Updates partition_idxs such that the ith group of self moves to partition
/// partition_idxs[i].
///
/// It is guaranteed that two equal keys in two independent partition_into
/// calls map to the same partition index if the seed and the number of
/// partitions is equal.
fn partition_into(
fn partition(
&self,
seed: u64,
partitions: &mut [Box<dyn Grouper>],
num_partitions: usize,
partition_idxs: &mut Vec<IdxSize>,
group_idxs: &mut Vec<IdxSize>,
);
) -> Vec<Box<dyn Grouper>>;

/// Returns the keys in this Grouper in group order, that is the key for
/// group i is returned in row i.
Expand Down
88 changes: 78 additions & 10 deletions crates/polars-expr/src/groups/row_encoded.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,10 @@ use hashbrown::hash_table::{Entry, HashTable};
use polars_core::chunked_array::ops::row_encode::_get_rows_encoded_unordered;
use polars_row::EncodingField;
use polars_utils::aliases::PlRandomState;
use polars_utils::hashing::{folded_multiply, hash_to_partition};
use polars_utils::itertools::Itertools;
use polars_utils::vec::PushUnchecked;
use rand::Rng;

use super::*;

Expand All @@ -27,24 +29,31 @@ pub struct RowEncodedHashGrouper {
key_schema: Arc<Schema>,
table: HashTable<Group>,
key_data: Vec<u8>,

// Used for computing canonical hashes.
random_state: PlRandomState,

// Internal random seed used to keep hash iteration order decorrelated.
// We simply store a random odd number and multiply the canonical hash by it.
seed: u64,
}

impl RowEncodedHashGrouper {
pub fn new(key_schema: Arc<Schema>, random_state: PlRandomState) -> Self {
Self {
key_schema,
random_state,
seed: rand::random::<u64>() | 1,
..Default::default()
}
}

fn insert_key(&mut self, hash: u64, key: &[u8]) -> IdxSize {
let num_groups = self.table.len();
let entry = self.table.entry(
hash,
hash.wrapping_mul(self.seed),
|g| unsafe { hash == g.key_hash && key == g.key(&self.key_data) },
|g| g.key_hash,
|g| g.key_hash.wrapping_mul(self.seed),
);

match entry {
Expand All @@ -64,6 +73,23 @@ impl RowEncodedHashGrouper {
}
}

/// Insert a key, without checking that it is unique.
fn insert_key_unique(&mut self, hash: u64, key: &[u8]) -> IdxSize {
let group_idx = self.table.len().try_into().unwrap();
let group = Group {
key_hash: hash,
key_offset: self.key_data.len(),
key_length: key.len().try_into().unwrap(),
group_idx,
};
self.key_data.extend(key);
self.table
.insert_unique(hash.wrapping_mul(self.seed), group, |g| {
g.key_hash.wrapping_mul(self.seed)
});
group_idx
}

fn finalize_keys(&self, mut key_rows: Vec<&[u8]>) -> DataFrame {
let key_dtypes = self
.key_schema
Expand Down Expand Up @@ -125,7 +151,9 @@ impl Grouper for RowEncodedHashGrouper {
fn combine(&mut self, other: &dyn Grouper, group_idxs: &mut Vec<IdxSize>) {
let other = other.as_any().downcast_ref::<Self>().unwrap();

self.table.reserve(other.table.len(), |g| g.key_hash); // TODO: cardinality estimation.
// TODO: cardinality estimation.
self.table
.reserve(other.table.len(), |g| g.key_hash.wrapping_mul(self.seed));

unsafe {
group_idxs.clear();
Expand Down Expand Up @@ -167,14 +195,54 @@ impl Grouper for RowEncodedHashGrouper {
)
}

fn partition_into(
fn partition(
&self,
_seed: u64,
_partitions: &mut [Box<dyn Grouper>],
_partition_idxs: &mut Vec<IdxSize>,
_group_idxs: &mut Vec<IdxSize>,
) {
unimplemented!()
seed: u64,
num_partitions: usize,
partition_idxs: &mut Vec<IdxSize>,
) -> Vec<Box<dyn Grouper>> {
assert!(num_partitions > 0);

// Two-pass algorithm to prevent reallocations.
let mut partition_size = vec![(0, 0); num_partitions]; // (keys, bytes)
unsafe {
for group in self.table.iter() {
let ph = folded_multiply(group.key_hash, seed | 1);
let p_idx = hash_to_partition(ph, num_partitions);
let (p_keys, p_bytes) = partition_size.get_unchecked_mut(p_idx as usize);
*p_keys += 1;
*p_bytes += group.key_length as usize;
}
}

let mut rng = rand::thread_rng();
let mut partitions = partition_size
.into_iter()
.map(|(keys, bytes)| Self {
key_schema: self.key_schema.clone(),
table: HashTable::with_capacity(keys),
key_data: Vec::with_capacity(bytes),
random_state: self.random_state.clone(),
seed: rng.gen::<u64>() | 1,
})
.collect_vec();

unsafe {
partition_idxs.clear();
partition_idxs.reserve(self.table.len());
let partition_idxs_out = partition_idxs.spare_capacity_mut();
for group in self.table.iter() {
let ph = folded_multiply(group.key_hash, seed | 1);
let p_idx = hash_to_partition(ph, num_partitions);
let p = partitions.get_unchecked_mut(p_idx);
p.insert_key_unique(group.key_hash, group.key(&self.key_data));
*partition_idxs_out.get_unchecked_mut(group.group_idx as usize) =
MaybeUninit::new(p_idx as IdxSize);
}
partition_idxs.set_len(self.table.len());
}

partitions.into_iter().map(|p| Box::new(p) as _).collect()
}

fn as_any(&self) -> &dyn Any {
Expand Down
12 changes: 12 additions & 0 deletions crates/polars-expr/src/reduce/len.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
use polars_core::error::constants::LENGTH_LIMIT_MSG;

use super::*;
use crate::reduce::partition::partition_vec;

#[derive(Default)]
pub struct LenReduce {
Expand Down Expand Up @@ -61,6 +62,17 @@ impl GroupedReduction for LenReduce {
Ok(ca.into_series())
}

unsafe fn partition(
self: Box<Self>,
partition_sizes: &[IdxSize],
partition_idxs: &[IdxSize],
) -> Vec<Box<dyn GroupedReduction>> {
partition_vec(self.groups, partition_sizes, partition_idxs)
.into_iter()
.map(|groups| Box::new(Self { groups }) as _)
.collect()
}

fn as_any(&self) -> &dyn Any {
self
}
Expand Down
39 changes: 39 additions & 0 deletions crates/polars-expr/src/reduce/min_max.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ use polars_utils::float::IsFloat;
use polars_utils::min_max::MinMax;

use super::*;
use crate::reduce::partition::partition_mask;

pub fn new_min_reduction(dtype: DataType, propagate_nans: bool) -> Box<dyn GroupedReduction> {
use DataType::*;
Expand Down Expand Up @@ -344,6 +345,25 @@ impl GroupedReduction for BoolMinGroupedReduction {
Ok(())
}

unsafe fn partition(
self: Box<Self>,
partition_sizes: &[IdxSize],
partition_idxs: &[IdxSize],
) -> Vec<Box<dyn GroupedReduction>> {
let p_values = partition_mask(&self.values.freeze(), partition_sizes, partition_idxs);
let p_mask = partition_mask(&self.mask.freeze(), partition_sizes, partition_idxs);
p_values
.into_iter()
.zip(p_mask)
.map(|(values, mask)| {
Box::new(Self {
values: values.into_mut(),
mask: mask.into_mut(),
}) as _
})
.collect()
}

fn finalize(&mut self) -> PolarsResult<Series> {
let v = core::mem::take(&mut self.values);
let m = core::mem::take(&mut self.mask);
Expand Down Expand Up @@ -450,6 +470,25 @@ impl GroupedReduction for BoolMaxGroupedReduction {
})
}

unsafe fn partition(
self: Box<Self>,
partition_sizes: &[IdxSize],
partition_idxs: &[IdxSize],
) -> Vec<Box<dyn GroupedReduction>> {
let p_values = partition_mask(&self.values.freeze(), partition_sizes, partition_idxs);
let p_mask = partition_mask(&self.mask.freeze(), partition_sizes, partition_idxs);
p_values
.into_iter()
.zip(p_mask)
.map(|(values, mask)| {
Box::new(Self {
values: values.into_mut(),
mask: mask.into_mut(),
}) as _
})
.collect()
}

fn as_any(&self) -> &dyn Any {
self
}
Expand Down
Loading