From 4094f07d19bb93eb7c54a3ca1e0c272fcc07af0e Mon Sep 17 00:00:00 2001 From: coastalwhite Date: Tue, 3 Dec 2024 14:48:40 +0100 Subject: [PATCH] chore: Move horizontal methods to polars-ops --- .../src/chunked_array/ops/min_max_binary.rs | 81 ---- .../polars-core/src/chunked_array/ops/mod.rs | 2 - crates/polars-core/src/frame/mod.rs | 68 ---- .../src/series/arithmetic/borrowed.rs | 2 +- .../src/series/arithmetic/horizontal.rs | 223 ----------- .../polars-core/src/series/arithmetic/mod.rs | 1 - .../polars-ops/src/series/ops/horizontal.rs | 374 +++++++++++++++++- .../src/dsl/function_expr/dispatch.rs | 2 +- crates/polars-python/src/conversion/mod.rs | 1 - 9 files changed, 359 insertions(+), 395 deletions(-) delete mode 100644 crates/polars-core/src/chunked_array/ops/min_max_binary.rs delete mode 100644 crates/polars-core/src/series/arithmetic/horizontal.rs diff --git a/crates/polars-core/src/chunked_array/ops/min_max_binary.rs b/crates/polars-core/src/chunked_array/ops/min_max_binary.rs deleted file mode 100644 index 28e7c491095b..000000000000 --- a/crates/polars-core/src/chunked_array/ops/min_max_binary.rs +++ /dev/null @@ -1,81 +0,0 @@ -use crate::prelude::*; -use crate::series::arithmetic::coerce_lhs_rhs; - -fn min_binary(left: &ChunkedArray, right: &ChunkedArray) -> ChunkedArray -where - T: PolarsNumericType, - T::Native: PartialOrd, -{ - let op = |l: T::Native, r: T::Native| { - if l < r { - l - } else { - r - } - }; - arity::binary_elementwise_values(left, right, op) -} - -fn max_binary(left: &ChunkedArray, right: &ChunkedArray) -> ChunkedArray -where - T: PolarsNumericType, - T::Native: PartialOrd, -{ - let op = |l: T::Native, r: T::Native| { - if l > r { - l - } else { - r - } - }; - arity::binary_elementwise_values(left, right, op) -} - -pub(crate) fn min_max_binary_columns( - left: &Column, - right: &Column, - min: bool, -) -> PolarsResult { - if left.dtype().to_physical().is_numeric() - && left.null_count() == 0 - && right.null_count() == 0 - && left.len() == right.len() - { - match (left, right) { - (Column::Series(left), Column::Series(right)) => { - let (lhs, rhs) = coerce_lhs_rhs(left, right)?; - let logical = lhs.dtype(); - let lhs = lhs.to_physical_repr(); - let rhs = rhs.to_physical_repr(); - - with_match_physical_numeric_polars_type!(lhs.dtype(), |$T| { - let a: &ChunkedArray<$T> = lhs.as_ref().as_ref().as_ref(); - let b: &ChunkedArray<$T> = rhs.as_ref().as_ref().as_ref(); - - if min { - min_binary(a, b).into_series().cast(logical) - } else { - max_binary(a, b).into_series().cast(logical) - } - }) - .map(Column::from) - }, - _ => { - let mask = if min { - left.lt(right)? - } else { - left.gt(right)? - }; - - left.zip_with(&mask, right) - }, - } - } else { - let mask = if min { - left.lt(right)? & left.is_not_null() | right.is_null() - } else { - left.gt(right)? & left.is_not_null() | right.is_null() - }; - left.zip_with(&mask, right) - } -} diff --git a/crates/polars-core/src/chunked_array/ops/mod.rs b/crates/polars-core/src/chunked_array/ops/mod.rs index c0daaa72bdf6..d598199bfecb 100644 --- a/crates/polars-core/src/chunked_array/ops/mod.rs +++ b/crates/polars-core/src/chunked_array/ops/mod.rs @@ -28,8 +28,6 @@ pub mod float_sorted_arg_max; mod for_each; pub mod full; pub mod gather; -#[cfg(feature = "zip_with")] -pub(crate) mod min_max_binary; pub(crate) mod nulls; mod reverse; #[cfg(feature = "rolling_window")] diff --git a/crates/polars-core/src/frame/mod.rs b/crates/polars-core/src/frame/mod.rs index 09c9b18a03e7..301dd0a53517 100644 --- a/crates/polars-core/src/frame/mod.rs +++ b/crates/polars-core/src/frame/mod.rs @@ -11,7 +11,6 @@ use crate::chunked_array::metadata::MetadataFlags; #[cfg(feature = "algorithm_group_by")] use crate::chunked_array::ops::unique::is_unique_helper; use crate::prelude::*; -use crate::series::arithmetic::horizontal as series_horizontal; #[cfg(feature = "row_hash")] use crate::utils::split_df; use crate::utils::{slice_offsets, try_get_supertype, Container, NoNull}; @@ -43,12 +42,6 @@ use crate::prelude::sort::{argsort_multiple_row_fmt, prepare_arg_sort}; use crate::series::IsSorted; use crate::POOL; -#[derive(Copy, Clone, Debug)] -pub enum NullStrategy { - Ignore, - Propagate, -} - #[derive(Copy, Clone, Debug, PartialEq, Eq, Default, Hash, IntoStaticStr)] #[cfg_attr(feature = "serde", derive(Serialize, Deserialize))] #[strum(serialize_all = "snake_case")] @@ -2791,28 +2784,6 @@ impl DataFrame { Ok(unsafe { DataFrame::new_no_checks(self.height(), col) }) } - /// Aggregate the column horizontally to their min values. - #[cfg(feature = "zip_with")] - pub fn min_horizontal(&self) -> PolarsResult> { - series_horizontal::min_horizontal(&self.columns) - } - - /// Aggregate the column horizontally to their max values. - #[cfg(feature = "zip_with")] - pub fn max_horizontal(&self) -> PolarsResult> { - series_horizontal::max_horizontal(&self.columns) - } - - /// Sum all values horizontally across columns. - pub fn sum_horizontal(&self, null_strategy: NullStrategy) -> PolarsResult> { - series_horizontal::sum_horizontal(&self.columns, null_strategy) - } - - /// Compute the mean of all numeric values horizontally across columns. - pub fn mean_horizontal(&self, null_strategy: NullStrategy) -> PolarsResult> { - series_horizontal::mean_horizontal(&self.columns, null_strategy) - } - /// Pipe different functions/ closure operations that work on a DataFrame together. pub fn pipe(self, f: F) -> PolarsResult where @@ -3515,45 +3486,6 @@ mod test { assert_eq!(df.height, 6) } - #[test] - #[cfg(feature = "zip_with")] - #[cfg_attr(miri, ignore)] - fn test_horizontal_agg() { - let a = Column::new("a".into(), [1, 2, 6]); - let b = Column::new("b".into(), [Some(1), None, None]); - let c = Column::new("c".into(), [Some(4), None, Some(3)]); - - let df = DataFrame::new(vec![a, b, c]).unwrap(); - assert_eq!( - Vec::from( - df.mean_horizontal(NullStrategy::Ignore) - .unwrap() - .unwrap() - .f64() - .unwrap() - ), - &[Some(2.0), Some(2.0), Some(4.5)] - ); - assert_eq!( - Vec::from( - df.sum_horizontal(NullStrategy::Ignore) - .unwrap() - .unwrap() - .i32() - .unwrap() - ), - &[Some(6), Some(2), Some(9)] - ); - assert_eq!( - Vec::from(df.min_horizontal().unwrap().unwrap().i32().unwrap()), - &[Some(1), Some(2), Some(3)] - ); - assert_eq!( - Vec::from(df.max_horizontal().unwrap().unwrap().i32().unwrap()), - &[Some(4), Some(2), Some(6)] - ); - } - #[test] fn test_replace_or_add() -> PolarsResult<()> { let mut df = df!( diff --git a/crates/polars-core/src/series/arithmetic/borrowed.rs b/crates/polars-core/src/series/arithmetic/borrowed.rs index d52916364b0e..de85c793681f 100644 --- a/crates/polars-core/src/series/arithmetic/borrowed.rs +++ b/crates/polars-core/src/series/arithmetic/borrowed.rs @@ -339,7 +339,7 @@ pub mod checked { } } -pub(crate) fn coerce_lhs_rhs<'a>( +pub fn coerce_lhs_rhs<'a>( lhs: &'a Series, rhs: &'a Series, ) -> PolarsResult<(Cow<'a, Series>, Cow<'a, Series>)> { diff --git a/crates/polars-core/src/series/arithmetic/horizontal.rs b/crates/polars-core/src/series/arithmetic/horizontal.rs deleted file mode 100644 index 94996b7302fe..000000000000 --- a/crates/polars-core/src/series/arithmetic/horizontal.rs +++ /dev/null @@ -1,223 +0,0 @@ -use std::borrow::Cow; - -use polars_error::{polars_bail, PolarsResult}; -use polars_utils::pl_str::PlSmallStr; -use rayon::iter::{IntoParallelIterator, IntoParallelRefIterator, ParallelIterator}; - -#[cfg(feature = "zip_with")] -use super::min_max_binary::min_max_binary_columns; -use super::{ - ChunkCompareEq, ChunkSet, Column, DataType, FillNullStrategy, IntoColumn, Scalar, Series, - UInt32Chunked, -}; -use crate::chunked_array::cast::CastOptions; -use crate::frame::NullStrategy; -use crate::POOL; - -/// Aggregate the column horizontally to their min values. -/// -/// All columns need to be the same length or a scalar. -#[cfg(feature = "zip_with")] -pub fn min_horizontal(columns: &[Column]) -> PolarsResult> { - let min_fn = |acc: &Column, s: &Column| min_max_binary_columns(acc, s, true); - - match columns.len() { - 0 => Ok(None), - 1 => Ok(Some(columns[0].clone())), - 2 => min_fn(&columns[0], &columns[1]).map(Some), - _ => { - // the try_reduce_with is a bit slower in parallelism, - // but I don't think it matters here as we parallelize over columns, not over elements - POOL.install(|| { - columns - .par_iter() - .map(|s| Ok(Cow::Borrowed(s))) - .try_reduce_with(|l, r| min_fn(&l, &r).map(Cow::Owned)) - // we can unwrap the option, because we are certain there is a column - // we started this operation on 3 columns - .unwrap() - .map(|cow| Some(cow.into_owned())) - }) - }, - } -} - -/// Aggregate the column horizontally to their max values. -/// -/// All columns need to be the same length or a scalar. -#[cfg(feature = "zip_with")] -pub fn max_horizontal(columns: &[Column]) -> PolarsResult> { - let max_fn = |acc: &Column, s: &Column| min_max_binary_columns(acc, s, false); - - match columns.len() { - 0 => Ok(None), - 1 => Ok(Some(columns[0].clone())), - 2 => max_fn(&columns[0], &columns[1]).map(Some), - _ => { - // the try_reduce_with is a bit slower in parallelism, - // but I don't think it matters here as we parallelize over columns, not over elements - POOL.install(|| { - columns - .par_iter() - .map(|s| Ok(Cow::Borrowed(s))) - .try_reduce_with(|l, r| max_fn(&l, &r).map(Cow::Owned)) - // we can unwrap the option, because we are certain there is a column - // we started this operation on 3 columns - .unwrap() - .map(|cow| Some(cow.into_owned())) - }) - }, - } -} - -/// Sum all values horizontally across columns. -/// -/// All columns need to be the same length or a scalar. -pub fn sum_horizontal( - columns: &[Column], - null_strategy: NullStrategy, -) -> PolarsResult> { - let apply_null_strategy = |s: Series, null_strategy: NullStrategy| -> PolarsResult { - if let NullStrategy::Ignore = null_strategy { - // if has nulls - if s.null_count() > 0 { - return s.fill_null(FillNullStrategy::Zero); - } - } - Ok(s) - }; - - let sum_fn = |acc: Series, s: Series, null_strategy: NullStrategy| -> PolarsResult { - let acc: Series = apply_null_strategy(acc, null_strategy)?; - let s = apply_null_strategy(s, null_strategy)?; - // This will do owned arithmetic and can be mutable - std::ops::Add::add(acc, s) - }; - - // @scalar-opt - let non_null_cols = columns - .iter() - .filter(|x| x.dtype() != &DataType::Null) - .map(|c| c.as_materialized_series()) - .collect::>(); - - match non_null_cols.len() { - 0 => { - if columns.is_empty() { - Ok(None) - } else { - // all columns are null dtype, so result is null dtype - Ok(Some(columns[0].clone())) - } - }, - 1 => Ok(Some( - apply_null_strategy( - if non_null_cols[0].dtype() == &DataType::Boolean { - non_null_cols[0].cast(&DataType::UInt32)? - } else { - non_null_cols[0].clone() - }, - null_strategy, - )? - .into(), - )), - 2 => sum_fn( - non_null_cols[0].clone(), - non_null_cols[1].clone(), - null_strategy, - ) - .map(Column::from) - .map(Some), - _ => { - // the try_reduce_with is a bit slower in parallelism, - // but I don't think it matters here as we parallelize over columns, not over elements - let out = POOL.install(|| { - non_null_cols - .into_par_iter() - .cloned() - .map(Ok) - .try_reduce_with(|l, r| sum_fn(l, r, null_strategy)) - // We can unwrap because we started with at least 3 columns, so we always get a Some - .unwrap() - }); - out.map(Column::from).map(Some) - }, - } -} - -/// Compute the mean of all values horizontally across columns. -/// -/// All columns need to be the same length or a scalar. -pub fn mean_horizontal( - columns: &[Column], - null_strategy: NullStrategy, -) -> PolarsResult> { - let (numeric_columns, non_numeric_columns): (Vec<_>, Vec<_>) = columns.iter().partition(|s| { - let dtype = s.dtype(); - dtype.is_numeric() || dtype.is_decimal() || dtype.is_bool() || dtype.is_null() - }); - - if !non_numeric_columns.is_empty() { - let col = non_numeric_columns.first().cloned(); - polars_bail!( - InvalidOperation: "'horizontal_mean' expects numeric expressions, found {:?} (dtype={})", - col.unwrap().name(), - col.unwrap().dtype(), - ); - } - let columns = numeric_columns.into_iter().cloned().collect::>(); - match columns.len() { - 0 => Ok(None), - 1 => Ok(Some(match columns[0].dtype() { - dt if dt != &DataType::Float32 && !dt.is_decimal() => { - columns[0].cast(&DataType::Float64)? - }, - _ => columns[0].clone(), - })), - _ => { - let sum = || sum_horizontal(&columns, null_strategy); - let null_count = || { - columns - .par_iter() - .map(|c| { - c.is_null() - .into_column() - .cast_with_options(&DataType::UInt32, CastOptions::NonStrict) - }) - .reduce_with(|l, r| { - let l = l?; - let r = r?; - let result = std::ops::Add::add(&l, &r)?; - PolarsResult::Ok(result) - }) - // we can unwrap the option, because we are certain there is a column - // we started this operation on 2 columns - .unwrap() - }; - - let (sum, null_count) = POOL.install(|| rayon::join(sum, null_count)); - let sum = sum?; - let null_count = null_count?; - - // value lengths: len - null_count - let value_length: UInt32Chunked = (Column::new_scalar( - PlSmallStr::EMPTY, - Scalar::from(columns.len() as u32), - null_count.len(), - ) - null_count)? - .u32() - .unwrap() - .clone(); - - // make sure that we do not divide by zero - // by replacing with None - let value_length = value_length - .set(&value_length.equal(0), None)? - .into_column() - .cast(&DataType::Float64)?; - - sum.map(|sum| std::ops::Div::div(&sum, &value_length)) - .transpose() - }, - } -} diff --git a/crates/polars-core/src/series/arithmetic/mod.rs b/crates/polars-core/src/series/arithmetic/mod.rs index 7cb1fd4674f2..8a4d317276c9 100644 --- a/crates/polars-core/src/series/arithmetic/mod.rs +++ b/crates/polars-core/src/series/arithmetic/mod.rs @@ -1,6 +1,5 @@ mod bitops; mod borrowed; -pub mod horizontal; mod list; mod owned; diff --git a/crates/polars-ops/src/series/ops/horizontal.rs b/crates/polars-ops/src/series/ops/horizontal.rs index 7e5cec8c1474..2e96ab27394a 100644 --- a/crates/polars-ops/src/series/ops/horizontal.rs +++ b/crates/polars-ops/src/series/ops/horizontal.rs @@ -1,5 +1,10 @@ -use polars_core::frame::NullStrategy; +use std::borrow::Cow; + +use polars_core::chunked_array::cast::CastOptions; use polars_core::prelude::*; +use polars_core::series::arithmetic::coerce_lhs_rhs; +use polars_core::{with_match_physical_numeric_polars_type, POOL}; +use rayon::iter::{IntoParallelIterator, IntoParallelRefIterator, ParallelIterator}; fn validate_column_lengths(cs: &[Column]) -> PolarsResult<()> { let mut length = 1; @@ -16,28 +21,320 @@ fn validate_column_lengths(cs: &[Column]) -> PolarsResult<()> { Ok(()) } -pub fn max_horizontal(s: &[Column]) -> PolarsResult> { - validate_column_lengths(s)?; - polars_core::series::arithmetic::horizontal::max_horizontal(s) - .map(|opt_c| opt_c.map(|res| res.with_name(s[0].name().clone()))) +pub trait MinMaxHorizontal { + /// Aggregate the column horizontally to their min values. + fn min_horizontal(&self) -> PolarsResult>; + /// Aggregate the column horizontally to their max values. + fn max_horizontal(&self) -> PolarsResult>; +} + +impl MinMaxHorizontal for DataFrame { + fn min_horizontal(&self) -> PolarsResult> { + min_horizontal(self.get_columns()) + } + fn max_horizontal(&self) -> PolarsResult> { + max_horizontal(self.get_columns()) + } +} + +#[derive(Copy, Clone, Debug)] +pub enum NullStrategy { + Ignore, + Propagate, +} + +pub trait SumMeanHorizontal { + /// Sum all values horizontally across columns. + fn sum_horizontal(&self, null_strategy: NullStrategy) -> PolarsResult>; + + /// Compute the mean of all numeric values horizontally across columns. + fn mean_horizontal(&self, null_strategy: NullStrategy) -> PolarsResult>; +} + +impl SumMeanHorizontal for DataFrame { + fn sum_horizontal(&self, null_strategy: NullStrategy) -> PolarsResult> { + sum_horizontal(self.get_columns(), null_strategy) + } + fn mean_horizontal(&self, null_strategy: NullStrategy) -> PolarsResult> { + mean_horizontal(self.get_columns(), null_strategy) + } +} + +fn min_binary(left: &ChunkedArray, right: &ChunkedArray) -> ChunkedArray +where + T: PolarsNumericType, + T::Native: PartialOrd, +{ + let op = |l: T::Native, r: T::Native| { + if l < r { + l + } else { + r + } + }; + arity::binary_elementwise_values(left, right, op) +} + +fn max_binary(left: &ChunkedArray, right: &ChunkedArray) -> ChunkedArray +where + T: PolarsNumericType, + T::Native: PartialOrd, +{ + let op = |l: T::Native, r: T::Native| { + if l > r { + l + } else { + r + } + }; + arity::binary_elementwise_values(left, right, op) +} + +fn min_max_binary_columns(left: &Column, right: &Column, min: bool) -> PolarsResult { + if left.dtype().to_physical().is_numeric() + && left.null_count() == 0 + && right.null_count() == 0 + && left.len() == right.len() + { + match (left, right) { + (Column::Series(left), Column::Series(right)) => { + let (lhs, rhs) = coerce_lhs_rhs(left, right)?; + let logical = lhs.dtype(); + let lhs = lhs.to_physical_repr(); + let rhs = rhs.to_physical_repr(); + + with_match_physical_numeric_polars_type!(lhs.dtype(), |$T| { + let a: &ChunkedArray<$T> = lhs.as_ref().as_ref().as_ref(); + let b: &ChunkedArray<$T> = rhs.as_ref().as_ref().as_ref(); + + if min { + min_binary(a, b).into_series().cast(logical) + } else { + max_binary(a, b).into_series().cast(logical) + } + }) + .map(Column::from) + }, + _ => { + let mask = if min { + left.lt(right)? + } else { + left.gt(right)? + }; + + left.zip_with(&mask, right) + }, + } + } else { + let mask = if min { + left.lt(right)? & left.is_not_null() | right.is_null() + } else { + left.gt(right)? & left.is_not_null() | right.is_null() + }; + left.zip_with(&mask, right) + } +} + +pub fn max_horizontal(columns: &[Column]) -> PolarsResult> { + validate_column_lengths(columns)?; + + let max_fn = |acc: &Column, s: &Column| min_max_binary_columns(acc, s, false); + + match columns.len() { + 0 => Ok(None), + 1 => Ok(Some(columns[0].clone())), + 2 => max_fn(&columns[0], &columns[1]).map(Some), + _ => { + // the try_reduce_with is a bit slower in parallelism, + // but I don't think it matters here as we parallelize over columns, not over elements + POOL.install(|| { + columns + .par_iter() + .map(|s| Ok(Cow::Borrowed(s))) + .try_reduce_with(|l, r| max_fn(&l, &r).map(Cow::Owned)) + // we can unwrap the option, because we are certain there is a column + // we started this operation on 3 columns + .unwrap() + .map(|cow| Some(cow.into_owned())) + }) + }, + } } -pub fn min_horizontal(s: &[Column]) -> PolarsResult> { - validate_column_lengths(s)?; - polars_core::series::arithmetic::horizontal::min_horizontal(s) - .map(|opt_c| opt_c.map(|res| res.with_name(s[0].name().clone()))) +pub fn min_horizontal(columns: &[Column]) -> PolarsResult> { + validate_column_lengths(columns)?; + + let min_fn = |acc: &Column, s: &Column| min_max_binary_columns(acc, s, true); + + match columns.len() { + 0 => Ok(None), + 1 => Ok(Some(columns[0].clone())), + 2 => min_fn(&columns[0], &columns[1]).map(Some), + _ => { + // the try_reduce_with is a bit slower in parallelism, + // but I don't think it matters here as we parallelize over columns, not over elements + POOL.install(|| { + columns + .par_iter() + .map(|s| Ok(Cow::Borrowed(s))) + .try_reduce_with(|l, r| min_fn(&l, &r).map(Cow::Owned)) + // we can unwrap the option, because we are certain there is a column + // we started this operation on 3 columns + .unwrap() + .map(|cow| Some(cow.into_owned())) + }) + }, + } } -pub fn sum_horizontal(s: &[Column], null_strategy: NullStrategy) -> PolarsResult> { - validate_column_lengths(s)?; - polars_core::series::arithmetic::horizontal::sum_horizontal(s, null_strategy) - .map(|opt_c| opt_c.map(|res| res.with_name(s[0].name().clone()))) +pub fn sum_horizontal( + columns: &[Column], + null_strategy: NullStrategy, +) -> PolarsResult> { + validate_column_lengths(columns)?; + + let apply_null_strategy = |s: Series, null_strategy: NullStrategy| -> PolarsResult { + if let NullStrategy::Ignore = null_strategy { + // if has nulls + if s.null_count() > 0 { + return s.fill_null(FillNullStrategy::Zero); + } + } + Ok(s) + }; + + let sum_fn = |acc: Series, s: Series, null_strategy: NullStrategy| -> PolarsResult { + let acc: Series = apply_null_strategy(acc, null_strategy)?; + let s = apply_null_strategy(s, null_strategy)?; + // This will do owned arithmetic and can be mutable + std::ops::Add::add(acc, s) + }; + + // @scalar-opt + let non_null_cols = columns + .iter() + .filter(|x| x.dtype() != &DataType::Null) + .map(|c| c.as_materialized_series()) + .collect::>(); + + match non_null_cols.len() { + 0 => { + if columns.is_empty() { + Ok(None) + } else { + // all columns are null dtype, so result is null dtype + Ok(Some(columns[0].clone())) + } + }, + 1 => Ok(Some( + apply_null_strategy( + if non_null_cols[0].dtype() == &DataType::Boolean { + non_null_cols[0].cast(&DataType::UInt32)? + } else { + non_null_cols[0].clone() + }, + null_strategy, + )? + .into(), + )), + 2 => sum_fn( + non_null_cols[0].clone(), + non_null_cols[1].clone(), + null_strategy, + ) + .map(Column::from) + .map(Some), + _ => { + // the try_reduce_with is a bit slower in parallelism, + // but I don't think it matters here as we parallelize over columns, not over elements + let out = POOL.install(|| { + non_null_cols + .into_par_iter() + .cloned() + .map(Ok) + .try_reduce_with(|l, r| sum_fn(l, r, null_strategy)) + // We can unwrap because we started with at least 3 columns, so we always get a Some + .unwrap() + }); + out.map(Column::from).map(Some) + }, + } } -pub fn mean_horizontal(s: &[Column], null_strategy: NullStrategy) -> PolarsResult> { - validate_column_lengths(s)?; - polars_core::series::arithmetic::horizontal::mean_horizontal(s, null_strategy) - .map(|opt_c| opt_c.map(|res| res.with_name(s[0].name().clone()))) +pub fn mean_horizontal( + columns: &[Column], + null_strategy: NullStrategy, +) -> PolarsResult> { + validate_column_lengths(columns)?; + + let (numeric_columns, non_numeric_columns): (Vec<_>, Vec<_>) = columns.iter().partition(|s| { + let dtype = s.dtype(); + dtype.is_numeric() || dtype.is_decimal() || dtype.is_bool() || dtype.is_null() + }); + + if !non_numeric_columns.is_empty() { + let col = non_numeric_columns.first().cloned(); + polars_bail!( + InvalidOperation: "'horizontal_mean' expects numeric expressions, found {:?} (dtype={})", + col.unwrap().name(), + col.unwrap().dtype(), + ); + } + let columns = numeric_columns.into_iter().cloned().collect::>(); + match columns.len() { + 0 => Ok(None), + 1 => Ok(Some(match columns[0].dtype() { + dt if dt != &DataType::Float32 && !dt.is_decimal() => { + columns[0].cast(&DataType::Float64)? + }, + _ => columns[0].clone(), + })), + _ => { + let sum = || sum_horizontal(columns.as_slice(), null_strategy); + let null_count = || { + columns + .par_iter() + .map(|c| { + c.is_null() + .into_column() + .cast_with_options(&DataType::UInt32, CastOptions::NonStrict) + }) + .reduce_with(|l, r| { + let l = l?; + let r = r?; + let result = std::ops::Add::add(&l, &r)?; + PolarsResult::Ok(result) + }) + // we can unwrap the option, because we are certain there is a column + // we started this operation on 2 columns + .unwrap() + }; + + let (sum, null_count) = POOL.install(|| rayon::join(sum, null_count)); + let sum = sum?; + let null_count = null_count?; + + // value lengths: len - null_count + let value_length: UInt32Chunked = (Column::new_scalar( + PlSmallStr::EMPTY, + Scalar::from(columns.len() as u32), + null_count.len(), + ) - null_count)? + .u32() + .unwrap() + .clone(); + + // make sure that we do not divide by zero + // by replacing with None + let value_length = value_length + .set(&value_length.equal(0), None)? + .into_column() + .cast(&DataType::Float64)?; + + sum.map(|sum| std::ops::Div::div(&sum, &value_length)) + .transpose() + }, + } } pub fn coalesce_columns(s: &[Column]) -> PolarsResult { @@ -57,3 +354,46 @@ pub fn coalesce_columns(s: &[Column]) -> PolarsResult { } Ok(out) } + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + #[cfg_attr(miri, ignore)] + fn test_horizontal_agg() { + let a = Column::new("a".into(), [1, 2, 6]); + let b = Column::new("b".into(), [Some(1), None, None]); + let c = Column::new("c".into(), [Some(4), None, Some(3)]); + + let df = DataFrame::new(vec![a, b, c]).unwrap(); + assert_eq!( + Vec::from( + df.mean_horizontal(NullStrategy::Ignore) + .unwrap() + .unwrap() + .f64() + .unwrap() + ), + &[Some(2.0), Some(2.0), Some(4.5)] + ); + assert_eq!( + Vec::from( + df.sum_horizontal(NullStrategy::Ignore) + .unwrap() + .unwrap() + .i32() + .unwrap() + ), + &[Some(6), Some(2), Some(9)] + ); + assert_eq!( + Vec::from(df.min_horizontal().unwrap().unwrap().i32().unwrap()), + &[Some(1), Some(2), Some(3)] + ); + assert_eq!( + Vec::from(df.max_horizontal().unwrap().unwrap().i32().unwrap()), + &[Some(4), Some(2), Some(6)] + ); + } +} diff --git a/crates/polars-plan/src/dsl/function_expr/dispatch.rs b/crates/polars-plan/src/dsl/function_expr/dispatch.rs index 7a338553e1e1..573685d16177 100644 --- a/crates/polars-plan/src/dsl/function_expr/dispatch.rs +++ b/crates/polars-plan/src/dsl/function_expr/dispatch.rs @@ -1,4 +1,4 @@ -use polars_core::frame::NullStrategy; +use polars_ops::series::NullStrategy; use super::*; diff --git a/crates/polars-python/src/conversion/mod.rs b/crates/polars-python/src/conversion/mod.rs index ac3286aaab36..c99e7fcfecf2 100644 --- a/crates/polars-python/src/conversion/mod.rs +++ b/crates/polars-python/src/conversion/mod.rs @@ -9,7 +9,6 @@ use std::path::PathBuf; #[cfg(feature = "object")] use polars::chunked_array::object::PolarsObjectSafe; use polars::frame::row::Row; -use polars::frame::NullStrategy; #[cfg(feature = "avro")] use polars::io::avro::AvroCompression; #[cfg(feature = "cloud")]