Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

perf(rust, python): use sorted fast path in streaming groupby #5727

Merged
merged 1 commit into from
Dec 6, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
84 changes: 48 additions & 36 deletions polars/polars-arrow/src/kernels/sort_partition.rs
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,53 @@ where
out
}

pub fn partition_to_groups_amortized<T>(
values: &[T],
first_group_offset: IdxSize,
nulls_first: bool,
offset: IdxSize,
out: &mut Vec<[IdxSize; 2]>,
) where
T: Debug + NativeType + PartialOrd,
{
if let Some(mut first) = values.get(0) {
out.clear();
if nulls_first && first_group_offset > 0 {
out.push([0, first_group_offset])
}

let mut first_idx = if nulls_first { first_group_offset } else { 0 } + offset;

for val in values {
// new group reached
if val != first {
let val_ptr = val as *const T;
let first_ptr = first as *const T;

// Safety
// all pointers suffice the invariants
let len = unsafe { val_ptr.offset_from(first_ptr) } as IdxSize;
out.push([first_idx, len]);
first_idx += len;
first = val;
}
}
// add last group
if nulls_first {
out.push([
first_idx,
values.len() as IdxSize + first_group_offset - first_idx,
]);
} else {
out.push([first_idx, values.len() as IdxSize - (first_idx - offset)]);
}

if !nulls_first && first_group_offset > 0 {
out.push([values.len() as IdxSize + offset, first_group_offset])
}
}
}

/// Take a clean-partitioned slice and return the groups slices
/// With clean-partitioned we mean that the slice contains all groups and are not spilled to another partition.
///
Expand All @@ -84,43 +131,8 @@ where
if values.is_empty() {
return vec![];
}
let mut first = values.get(0).unwrap();
let mut out = Vec::with_capacity(values.len() / 10);

if nulls_first && first_group_offset > 0 {
out.push([0, first_group_offset])
}

let mut first_idx = if nulls_first { first_group_offset } else { 0 } + offset;

for val in values {
// new group reached
if val != first {
let val_ptr = val as *const T;
let first_ptr = first as *const T;

// Safety
// all pointers suffice the invariants
let len = unsafe { val_ptr.offset_from(first_ptr) } as IdxSize;
out.push([first_idx, len]);
first_idx += len;
first = val;
}
}
// add last group
if nulls_first {
out.push([
first_idx,
values.len() as IdxSize + first_group_offset - first_idx,
]);
} else {
out.push([first_idx, values.len() as IdxSize - (first_idx - offset)]);
}

if !nulls_first && first_group_offset > 0 {
out.push([values.len() as IdxSize + offset, first_group_offset])
}

partition_to_groups_amortized(values, first_group_offset, nulls_first, offset, &mut out);
out
}

Expand Down
1 change: 1 addition & 0 deletions polars/polars-core/src/datatypes/dtype.rs
Original file line number Diff line number Diff line change
Expand Up @@ -198,6 +198,7 @@ impl DataType {
}

/// Convert to an Arrow data type.
#[inline]
pub fn to_arrow(&self) -> ArrowDataType {
use DataType::*;
match self {
Expand Down
4 changes: 3 additions & 1 deletion polars/polars-core/src/datatypes/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,7 @@ macro_rules! impl_polars_datatype {
pub struct $ca {}

impl PolarsDataType for $ca {
#[inline]
fn get_dtype() -> DataType {
DataType::$variant
}
Expand Down Expand Up @@ -178,8 +179,9 @@ pub trait NumericNative:
+ IsFloat
+ NativeArithmetics
{
type POLARSTYPE;
type POLARSTYPE: PolarsNumericType;
}

impl NumericNative for i8 {
type POLARSTYPE = Int8Type;
}
Expand Down
2 changes: 1 addition & 1 deletion polars/polars-core/src/prelude.rs
Original file line number Diff line number Diff line change
Expand Up @@ -50,5 +50,5 @@ pub use crate::series::{IntoSeries, Series, SeriesTrait};
pub use crate::testing::*;
pub(crate) use crate::utils::CustomIterTools;
pub use crate::utils::IntoVec;
pub use crate::vector_hasher::VecHash;
pub use crate::vector_hasher::{VecHash, VecHashSingle};
pub use crate::{datatypes, df};
6 changes: 4 additions & 2 deletions polars/polars-core/src/utils/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -148,9 +148,11 @@ fn flatten_df(df: &DataFrame) -> impl Iterator<Item = DataFrame> + '_ {
.map(|(s, arr)| {
// Safety:
// datatypes are correct
unsafe {
let mut out = unsafe {
Series::from_chunks_and_dtype_unchecked(s.name(), vec![arr], s.dtype())
}
};
out.set_sorted(s.is_sorted());
out
})
.collect(),
);
Expand Down
70 changes: 68 additions & 2 deletions polars/polars-core/src/vector_hasher.rs
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,7 @@ macro_rules! fx_hash_64_bit {
($val as u64).wrapping_mul($k)
}};
}
const FXHASH_K: u64 = 0x517cc1b727220a95;

fn finish_vec_hash<T>(ca: &ChunkedArray<T>, random_state: RandomState, buf: &mut Vec<u64>)
where
Expand Down Expand Up @@ -142,8 +143,7 @@ macro_rules! vec_hash_int {
buf.clear();
buf.reserve(self.len());

let k: u64 = 0x517cc1b727220a95;
let k = random_state.hash_one(k);
let k = random_state.hash_one(FXHASH_K);

#[allow(unused_unsafe)]
#[allow(clippy::useless_transmute)]
Expand Down Expand Up @@ -175,6 +175,72 @@ vec_hash_int!(UInt32Chunked, fx_hash_32_bit);
vec_hash_int!(UInt16Chunked, fx_hash_16_bit);
vec_hash_int!(UInt8Chunked, fx_hash_8_bit);

/// Ensure that the same hash is used as with `VecHash`.
pub trait VecHashSingle {
fn get_k(random_state: RandomState) -> u64 {
random_state.hash_one(FXHASH_K)
}
fn _vec_hash_single(self, k: u64) -> u64;
}
impl VecHashSingle for i8 {
#[inline]
fn _vec_hash_single(self, k: u64) -> u64 {
unsafe { fx_hash_8_bit!(self, k) }
}
}
impl VecHashSingle for u8 {
#[inline]
fn _vec_hash_single(self, k: u64) -> u64 {
#[allow(clippy::useless_transmute)]
unsafe {
fx_hash_8_bit!(self, k)
}
}
}
impl VecHashSingle for i16 {
#[inline]
fn _vec_hash_single(self, k: u64) -> u64 {
unsafe { fx_hash_16_bit!(self, k) }
}
}
impl VecHashSingle for u16 {
#[inline]
fn _vec_hash_single(self, k: u64) -> u64 {
#[allow(clippy::useless_transmute)]
unsafe {
fx_hash_16_bit!(self, k)
}
}
}

impl VecHashSingle for i32 {
#[inline]
fn _vec_hash_single(self, k: u64) -> u64 {
unsafe { fx_hash_32_bit!(self, k) }
}
}
impl VecHashSingle for u32 {
#[inline]
fn _vec_hash_single(self, k: u64) -> u64 {
#[allow(clippy::useless_transmute)]
unsafe {
fx_hash_32_bit!(self, k)
}
}
}
impl VecHashSingle for i64 {
#[inline]
fn _vec_hash_single(self, k: u64) -> u64 {
fx_hash_64_bit!(self, k)
}
}
impl VecHashSingle for u64 {
#[inline]
fn _vec_hash_single(self, k: u64) -> u64 {
fx_hash_64_bit!(self, k)
}
}

impl VecHash for Utf8Chunked {
fn vec_hash(&self, random_state: RandomState, buf: &mut Vec<u64>) {
buf.clear();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,7 @@ pub fn can_convert_to_hash_agg(
}
}

pub fn convert_to_hash_agg<F>(
pub(crate) fn convert_to_hash_agg<F>(
node: Node,
expr_arena: &Arena<AExpr>,
schema: &SchemaRef,
Expand Down
Original file line number Diff line number Diff line change
@@ -1,13 +1,13 @@
use std::any::Any;

use polars_core::datatypes::{AnyValue, DataType};
use polars_core::prelude::IDX_DTYPE;
use polars_core::prelude::{Series, IDX_DTYPE};
use polars_utils::unwrap::UnwrapUncheckedRelease;

use super::*;
use crate::operators::IdxSize;

pub struct CountAgg {
pub(crate) struct CountAgg {
count: IdxSize,
}

Expand All @@ -28,6 +28,15 @@ impl AggregateFn for CountAgg {
fn pre_agg(&mut self, _chunk_idx: IdxSize, _item: &mut dyn ExactSizeIterator<Item = AnyValue>) {
self.incr();
}
fn pre_agg_ordered(
&mut self,
_chunk_idx: IdxSize,
_offset: IdxSize,
length: IdxSize,
_values: &Series,
) {
self.count += length
}

fn dtype(&self) -> DataType {
IDX_DTYPE
Expand Down
Original file line number Diff line number Diff line change
@@ -1,13 +1,13 @@
use std::any::Any;

use polars_core::datatypes::DataType;
use polars_core::prelude::AnyValue;
use polars_core::prelude::{AnyValue, Series};
use polars_utils::unwrap::UnwrapUncheckedRelease;

use crate::executors::sinks::groupby::aggregates::AggregateFn;
use crate::operators::IdxSize;

pub struct FirstAgg {
pub(crate) struct FirstAgg {
chunk_idx: IdxSize,
first: Option<AnyValue<'static>>,
pub(crate) dtype: DataType,
Expand All @@ -31,6 +31,22 @@ impl AggregateFn for FirstAgg {
self.first = Some(item.into_static().unwrap())
}
}
fn pre_agg_ordered(
&mut self,
chunk_idx: IdxSize,
offset: IdxSize,
_length: IdxSize,
values: &Series,
) {
if self.first.is_none() {
self.chunk_idx = chunk_idx;
self.first = Some(
unsafe { values.get_unchecked(offset as usize) }
.into_static()
.unwrap(),
)
}
}

fn dtype(&self) -> DataType {
self.dtype.clone()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ use std::cmp::Ordering;

use enum_dispatch::enum_dispatch;
use polars_core::datatypes::DataType;
use polars_core::prelude::AnyValue;
use polars_core::prelude::{AnyValue, Series};

use crate::executors::sinks::groupby::aggregates::count::CountAgg;
use crate::executors::sinks::groupby::aggregates::first::FirstAgg;
Expand All @@ -15,16 +15,21 @@ use crate::executors::sinks::groupby::aggregates::SumAgg;
use crate::operators::IdxSize;

#[enum_dispatch(AggregateFunction)]
pub trait AggregateFn: Send + Sync {
pub(crate) trait AggregateFn: Send + Sync {
fn has_physical_agg(&self) -> bool {
false
}
fn pre_agg(&mut self, _chunk_idx: IdxSize, item: &mut dyn ExactSizeIterator<Item = AnyValue>);

fn pre_agg_ordered(
&mut self,
_chunk_idx: IdxSize,
offset: IdxSize,
length: IdxSize,
values: &Series,
);
fn pre_agg_u8(&mut self, _chunk_idx: IdxSize, _item: Option<u8>) {
unimplemented!()
}

fn pre_agg_u16(&mut self, _chunk_idx: IdxSize, _item: Option<u16>) {
unimplemented!()
}
Expand Down Expand Up @@ -65,7 +70,7 @@ pub trait AggregateFn: Send + Sync {
// We dispatch via an enum
// as that saves an indirection
#[enum_dispatch]
pub enum AggregateFunction {
pub(crate) enum AggregateFunction {
First(FirstAgg),
Last(LastAgg),
Count(CountAgg),
Expand Down
Original file line number Diff line number Diff line change
@@ -1,13 +1,13 @@
use std::any::Any;

use polars_core::datatypes::DataType;
use polars_core::prelude::AnyValue;
use polars_core::prelude::{AnyValue, Series};
use polars_utils::unwrap::UnwrapUncheckedRelease;

use crate::executors::sinks::groupby::aggregates::AggregateFn;
use crate::operators::IdxSize;

pub struct LastAgg {
pub(crate) struct LastAgg {
chunk_idx: IdxSize,
last: AnyValue<'static>,
pub(crate) dtype: DataType,
Expand All @@ -29,6 +29,21 @@ impl AggregateFn for LastAgg {
self.chunk_idx = chunk_idx;
self.last = unsafe { item.into_static().unwrap_unchecked() };
}
fn pre_agg_ordered(
&mut self,
chunk_idx: IdxSize,
offset: IdxSize,
length: IdxSize,
values: &Series,
) {
self.chunk_idx = chunk_idx;
self.last = unsafe {
values
.get_unchecked((offset + length - 1) as usize)
.into_static()
.unwrap_unchecked()
}
}

fn dtype(&self) -> DataType {
self.dtype.clone()
Expand Down
Loading