From cc05ff2d319159bf3d87a6c157806563031009e8 Mon Sep 17 00:00:00 2001 From: Gijs Burghoorn Date: Tue, 3 Dec 2024 14:34:11 +0100 Subject: [PATCH] fix: Check shape for `*_horizontal` functions (#20130) --- crates/polars-core/src/frame/mod.rs | 181 +------------- .../src/series/arithmetic/horizontal.rs | 223 ++++++++++++++++++ .../polars-core/src/series/arithmetic/mod.rs | 1 + .../polars-ops/src/series/ops/horizontal.rs | 51 ++-- .../src/dsl/function_expr/dispatch.rs | 27 ++- .../polars-plan/src/dsl/function_expr/mod.rs | 28 ++- .../src/dsl/function_expr/schema.rs | 4 +- .../src/dsl/functions/horizontal.rs | 8 +- crates/polars-python/src/dataframe/general.rs | 39 --- .../src/functions/aggregation.rs | 8 +- crates/polars-python/src/lazyframe/visit.rs | 2 +- .../src/lazyframe/visitor/expr_nodes.rs | 8 +- py-polars/polars/dataframe/frame.py | 8 +- .../functions/aggregation/horizontal.py | 18 +- .../tests/unit/functions/test_horizontal.py | 21 ++ 15 files changed, 359 insertions(+), 268 deletions(-) create mode 100644 crates/polars-core/src/series/arithmetic/horizontal.rs create mode 100644 py-polars/tests/unit/functions/test_horizontal.py diff --git a/crates/polars-core/src/frame/mod.rs b/crates/polars-core/src/frame/mod.rs index 2dc78741f977..09c9b18a03e7 100644 --- a/crates/polars-core/src/frame/mod.rs +++ b/crates/polars-core/src/frame/mod.rs @@ -1,6 +1,4 @@ //! DataFrame module. -#[cfg(feature = "zip_with")] -use std::borrow::Cow; use std::sync::OnceLock; use std::{mem, ops}; @@ -13,6 +11,7 @@ use crate::chunked_array::metadata::MetadataFlags; #[cfg(feature = "algorithm_group_by")] use crate::chunked_array::ops::unique::is_unique_helper; use crate::prelude::*; +use crate::series::arithmetic::horizontal as series_horizontal; #[cfg(feature = "row_hash")] use crate::utils::split_df; use crate::utils::{slice_offsets, try_get_supertype, Container, NoNull}; @@ -38,11 +37,8 @@ use polars_utils::pl_str::PlSmallStr; use serde::{Deserialize, Serialize}; use strum_macros::IntoStaticStr; -use crate::chunked_array::cast::CastOptions; #[cfg(feature = "row_hash")] use crate::hashing::_df_rows_to_hashes_threaded_vertical; -#[cfg(feature = "zip_with")] -use crate::prelude::min_max_binary::min_max_binary_columns; use crate::prelude::sort::{argsort_multiple_row_fmt, prepare_arg_sort}; use crate::series::IsSorted; use crate::POOL; @@ -2798,186 +2794,23 @@ impl DataFrame { /// Aggregate the column horizontally to their min values. #[cfg(feature = "zip_with")] pub fn min_horizontal(&self) -> PolarsResult> { - let min_fn = |acc: &Column, s: &Column| min_max_binary_columns(acc, s, true); - - match self.columns.len() { - 0 => Ok(None), - 1 => Ok(Some(self.columns[0].clone())), - 2 => min_fn(&self.columns[0], &self.columns[1]).map(Some), - _ => { - // the try_reduce_with is a bit slower in parallelism, - // but I don't think it matters here as we parallelize over columns, not over elements - POOL.install(|| { - self.columns - .par_iter() - .map(|s| Ok(Cow::Borrowed(s))) - .try_reduce_with(|l, r| min_fn(&l, &r).map(Cow::Owned)) - // we can unwrap the option, because we are certain there is a column - // we started this operation on 3 columns - .unwrap() - .map(|cow| Some(cow.into_owned())) - }) - }, - } + series_horizontal::min_horizontal(&self.columns) } /// Aggregate the column horizontally to their max values. #[cfg(feature = "zip_with")] pub fn max_horizontal(&self) -> PolarsResult> { - let max_fn = |acc: &Column, s: &Column| min_max_binary_columns(acc, s, false); - - match self.columns.len() { - 0 => Ok(None), - 1 => Ok(Some(self.columns[0].clone())), - 2 => max_fn(&self.columns[0], &self.columns[1]).map(Some), - _ => { - // the try_reduce_with is a bit slower in parallelism, - // but I don't think it matters here as we parallelize over columns, not over elements - POOL.install(|| { - self.columns - .par_iter() - .map(|s| Ok(Cow::Borrowed(s))) - .try_reduce_with(|l, r| max_fn(&l, &r).map(Cow::Owned)) - // we can unwrap the option, because we are certain there is a column - // we started this operation on 3 columns - .unwrap() - .map(|cow| Some(cow.into_owned())) - }) - }, - } + series_horizontal::max_horizontal(&self.columns) } /// Sum all values horizontally across columns. - pub fn sum_horizontal(&self, null_strategy: NullStrategy) -> PolarsResult> { - let apply_null_strategy = - |s: Series, null_strategy: NullStrategy| -> PolarsResult { - if let NullStrategy::Ignore = null_strategy { - // if has nulls - if s.null_count() > 0 { - return s.fill_null(FillNullStrategy::Zero); - } - } - Ok(s) - }; - - let sum_fn = - |acc: Series, s: Series, null_strategy: NullStrategy| -> PolarsResult { - let acc: Series = apply_null_strategy(acc, null_strategy)?; - let s = apply_null_strategy(s, null_strategy)?; - // This will do owned arithmetic and can be mutable - std::ops::Add::add(acc, s) - }; - - let non_null_cols = self - .materialized_column_iter() - .filter(|x| x.dtype() != &DataType::Null) - .collect::>(); - - match non_null_cols.len() { - 0 => { - if self.columns.is_empty() { - Ok(None) - } else { - // all columns are null dtype, so result is null dtype - Ok(Some(self.columns[0].as_materialized_series().clone())) - } - }, - 1 => Ok(Some(apply_null_strategy( - if non_null_cols[0].dtype() == &DataType::Boolean { - non_null_cols[0].cast(&DataType::UInt32)? - } else { - non_null_cols[0].clone() - }, - null_strategy, - )?)), - 2 => sum_fn( - non_null_cols[0].clone(), - non_null_cols[1].clone(), - null_strategy, - ) - .map(Some), - _ => { - // the try_reduce_with is a bit slower in parallelism, - // but I don't think it matters here as we parallelize over columns, not over elements - let out = POOL.install(|| { - non_null_cols - .into_par_iter() - .cloned() - .map(Ok) - .try_reduce_with(|l, r| sum_fn(l, r, null_strategy)) - // We can unwrap because we started with at least 3 columns, so we always get a Some - .unwrap() - }); - out.map(Some) - }, - } + pub fn sum_horizontal(&self, null_strategy: NullStrategy) -> PolarsResult> { + series_horizontal::sum_horizontal(&self.columns, null_strategy) } /// Compute the mean of all numeric values horizontally across columns. - pub fn mean_horizontal(&self, null_strategy: NullStrategy) -> PolarsResult> { - let (numeric_columns, non_numeric_columns): (Vec<_>, Vec<_>) = - self.columns.iter().partition(|s| { - let dtype = s.dtype(); - dtype.is_numeric() || dtype.is_decimal() || dtype.is_bool() || dtype.is_null() - }); - - if !non_numeric_columns.is_empty() { - let col = non_numeric_columns.first().cloned(); - polars_bail!( - InvalidOperation: "'horizontal_mean' expects numeric expressions, found {:?} (dtype={})", - col.unwrap().name(), - col.unwrap().dtype(), - ); - } - let columns = numeric_columns.into_iter().cloned().collect::>(); - match columns.len() { - 0 => Ok(None), - 1 => Ok(Some(match columns[0].dtype() { - dt if dt != &DataType::Float32 && !dt.is_decimal() => columns[0] - .as_materialized_series() - .cast(&DataType::Float64)?, - _ => columns[0].as_materialized_series().clone(), - })), - _ => { - let numeric_df = unsafe { DataFrame::_new_no_checks_impl(self.height(), columns) }; - let sum = || numeric_df.sum_horizontal(null_strategy); - let null_count = || { - numeric_df - .par_materialized_column_iter() - .map(|s| { - s.is_null() - .cast_with_options(&DataType::UInt32, CastOptions::NonStrict) - }) - .reduce_with(|l, r| { - let l = l?; - let r = r?; - let result = std::ops::Add::add(&l, &r)?; - PolarsResult::Ok(result) - }) - // we can unwrap the option, because we are certain there is a column - // we started this operation on 2 columns - .unwrap() - }; - - let (sum, null_count) = POOL.install(|| rayon::join(sum, null_count)); - let sum = sum?; - let null_count = null_count?; - - // value lengths: len - null_count - let value_length: UInt32Chunked = - (numeric_df.width().sub(&null_count)).u32().unwrap().clone(); - - // make sure that we do not divide by zero - // by replacing with None - let value_length = value_length - .set(&value_length.equal(0), None)? - .into_series() - .cast(&DataType::Float64)?; - - sum.map(|sum| std::ops::Div::div(&sum, &value_length)) - .transpose() - }, - } + pub fn mean_horizontal(&self, null_strategy: NullStrategy) -> PolarsResult> { + series_horizontal::mean_horizontal(&self.columns, null_strategy) } /// Pipe different functions/ closure operations that work on a DataFrame together. diff --git a/crates/polars-core/src/series/arithmetic/horizontal.rs b/crates/polars-core/src/series/arithmetic/horizontal.rs new file mode 100644 index 000000000000..94996b7302fe --- /dev/null +++ b/crates/polars-core/src/series/arithmetic/horizontal.rs @@ -0,0 +1,223 @@ +use std::borrow::Cow; + +use polars_error::{polars_bail, PolarsResult}; +use polars_utils::pl_str::PlSmallStr; +use rayon::iter::{IntoParallelIterator, IntoParallelRefIterator, ParallelIterator}; + +#[cfg(feature = "zip_with")] +use super::min_max_binary::min_max_binary_columns; +use super::{ + ChunkCompareEq, ChunkSet, Column, DataType, FillNullStrategy, IntoColumn, Scalar, Series, + UInt32Chunked, +}; +use crate::chunked_array::cast::CastOptions; +use crate::frame::NullStrategy; +use crate::POOL; + +/// Aggregate the column horizontally to their min values. +/// +/// All columns need to be the same length or a scalar. +#[cfg(feature = "zip_with")] +pub fn min_horizontal(columns: &[Column]) -> PolarsResult> { + let min_fn = |acc: &Column, s: &Column| min_max_binary_columns(acc, s, true); + + match columns.len() { + 0 => Ok(None), + 1 => Ok(Some(columns[0].clone())), + 2 => min_fn(&columns[0], &columns[1]).map(Some), + _ => { + // the try_reduce_with is a bit slower in parallelism, + // but I don't think it matters here as we parallelize over columns, not over elements + POOL.install(|| { + columns + .par_iter() + .map(|s| Ok(Cow::Borrowed(s))) + .try_reduce_with(|l, r| min_fn(&l, &r).map(Cow::Owned)) + // we can unwrap the option, because we are certain there is a column + // we started this operation on 3 columns + .unwrap() + .map(|cow| Some(cow.into_owned())) + }) + }, + } +} + +/// Aggregate the column horizontally to their max values. +/// +/// All columns need to be the same length or a scalar. +#[cfg(feature = "zip_with")] +pub fn max_horizontal(columns: &[Column]) -> PolarsResult> { + let max_fn = |acc: &Column, s: &Column| min_max_binary_columns(acc, s, false); + + match columns.len() { + 0 => Ok(None), + 1 => Ok(Some(columns[0].clone())), + 2 => max_fn(&columns[0], &columns[1]).map(Some), + _ => { + // the try_reduce_with is a bit slower in parallelism, + // but I don't think it matters here as we parallelize over columns, not over elements + POOL.install(|| { + columns + .par_iter() + .map(|s| Ok(Cow::Borrowed(s))) + .try_reduce_with(|l, r| max_fn(&l, &r).map(Cow::Owned)) + // we can unwrap the option, because we are certain there is a column + // we started this operation on 3 columns + .unwrap() + .map(|cow| Some(cow.into_owned())) + }) + }, + } +} + +/// Sum all values horizontally across columns. +/// +/// All columns need to be the same length or a scalar. +pub fn sum_horizontal( + columns: &[Column], + null_strategy: NullStrategy, +) -> PolarsResult> { + let apply_null_strategy = |s: Series, null_strategy: NullStrategy| -> PolarsResult { + if let NullStrategy::Ignore = null_strategy { + // if has nulls + if s.null_count() > 0 { + return s.fill_null(FillNullStrategy::Zero); + } + } + Ok(s) + }; + + let sum_fn = |acc: Series, s: Series, null_strategy: NullStrategy| -> PolarsResult { + let acc: Series = apply_null_strategy(acc, null_strategy)?; + let s = apply_null_strategy(s, null_strategy)?; + // This will do owned arithmetic and can be mutable + std::ops::Add::add(acc, s) + }; + + // @scalar-opt + let non_null_cols = columns + .iter() + .filter(|x| x.dtype() != &DataType::Null) + .map(|c| c.as_materialized_series()) + .collect::>(); + + match non_null_cols.len() { + 0 => { + if columns.is_empty() { + Ok(None) + } else { + // all columns are null dtype, so result is null dtype + Ok(Some(columns[0].clone())) + } + }, + 1 => Ok(Some( + apply_null_strategy( + if non_null_cols[0].dtype() == &DataType::Boolean { + non_null_cols[0].cast(&DataType::UInt32)? + } else { + non_null_cols[0].clone() + }, + null_strategy, + )? + .into(), + )), + 2 => sum_fn( + non_null_cols[0].clone(), + non_null_cols[1].clone(), + null_strategy, + ) + .map(Column::from) + .map(Some), + _ => { + // the try_reduce_with is a bit slower in parallelism, + // but I don't think it matters here as we parallelize over columns, not over elements + let out = POOL.install(|| { + non_null_cols + .into_par_iter() + .cloned() + .map(Ok) + .try_reduce_with(|l, r| sum_fn(l, r, null_strategy)) + // We can unwrap because we started with at least 3 columns, so we always get a Some + .unwrap() + }); + out.map(Column::from).map(Some) + }, + } +} + +/// Compute the mean of all values horizontally across columns. +/// +/// All columns need to be the same length or a scalar. +pub fn mean_horizontal( + columns: &[Column], + null_strategy: NullStrategy, +) -> PolarsResult> { + let (numeric_columns, non_numeric_columns): (Vec<_>, Vec<_>) = columns.iter().partition(|s| { + let dtype = s.dtype(); + dtype.is_numeric() || dtype.is_decimal() || dtype.is_bool() || dtype.is_null() + }); + + if !non_numeric_columns.is_empty() { + let col = non_numeric_columns.first().cloned(); + polars_bail!( + InvalidOperation: "'horizontal_mean' expects numeric expressions, found {:?} (dtype={})", + col.unwrap().name(), + col.unwrap().dtype(), + ); + } + let columns = numeric_columns.into_iter().cloned().collect::>(); + match columns.len() { + 0 => Ok(None), + 1 => Ok(Some(match columns[0].dtype() { + dt if dt != &DataType::Float32 && !dt.is_decimal() => { + columns[0].cast(&DataType::Float64)? + }, + _ => columns[0].clone(), + })), + _ => { + let sum = || sum_horizontal(&columns, null_strategy); + let null_count = || { + columns + .par_iter() + .map(|c| { + c.is_null() + .into_column() + .cast_with_options(&DataType::UInt32, CastOptions::NonStrict) + }) + .reduce_with(|l, r| { + let l = l?; + let r = r?; + let result = std::ops::Add::add(&l, &r)?; + PolarsResult::Ok(result) + }) + // we can unwrap the option, because we are certain there is a column + // we started this operation on 2 columns + .unwrap() + }; + + let (sum, null_count) = POOL.install(|| rayon::join(sum, null_count)); + let sum = sum?; + let null_count = null_count?; + + // value lengths: len - null_count + let value_length: UInt32Chunked = (Column::new_scalar( + PlSmallStr::EMPTY, + Scalar::from(columns.len() as u32), + null_count.len(), + ) - null_count)? + .u32() + .unwrap() + .clone(); + + // make sure that we do not divide by zero + // by replacing with None + let value_length = value_length + .set(&value_length.equal(0), None)? + .into_column() + .cast(&DataType::Float64)?; + + sum.map(|sum| std::ops::Div::div(&sum, &value_length)) + .transpose() + }, + } +} diff --git a/crates/polars-core/src/series/arithmetic/mod.rs b/crates/polars-core/src/series/arithmetic/mod.rs index 8a4d317276c9..7cb1fd4674f2 100644 --- a/crates/polars-core/src/series/arithmetic/mod.rs +++ b/crates/polars-core/src/series/arithmetic/mod.rs @@ -1,5 +1,6 @@ mod bitops; mod borrowed; +pub mod horizontal; mod list; mod owned; diff --git a/crates/polars-ops/src/series/ops/horizontal.rs b/crates/polars-ops/src/series/ops/horizontal.rs index 663ac3664c8e..7e5cec8c1474 100644 --- a/crates/polars-ops/src/series/ops/horizontal.rs +++ b/crates/polars-ops/src/series/ops/horizontal.rs @@ -1,36 +1,43 @@ use polars_core::frame::NullStrategy; use polars_core::prelude::*; +fn validate_column_lengths(cs: &[Column]) -> PolarsResult<()> { + let mut length = 1; + for c in cs { + let len = c.len(); + if len != 1 && len != length { + if length == 1 { + length = len; + } else { + polars_bail!(ShapeMismatch: "cannot evaluate two Series of different lengths ({len} and {length})"); + } + } + } + Ok(()) +} + pub fn max_horizontal(s: &[Column]) -> PolarsResult> { - let df = - unsafe { DataFrame::_new_no_checks_impl(s.first().map_or(0, Column::len), Vec::from(s)) }; - df.max_horizontal() - .map(|s| s.map(Column::from)) - .map(|opt_s| opt_s.map(|res| res.with_name(s[0].name().clone()))) + validate_column_lengths(s)?; + polars_core::series::arithmetic::horizontal::max_horizontal(s) + .map(|opt_c| opt_c.map(|res| res.with_name(s[0].name().clone()))) } pub fn min_horizontal(s: &[Column]) -> PolarsResult> { - let df = - unsafe { DataFrame::_new_no_checks_impl(s.first().map_or(0, Column::len), Vec::from(s)) }; - df.min_horizontal() - .map(|s| s.map(Column::from)) - .map(|opt_s| opt_s.map(|res| res.with_name(s[0].name().clone()))) + validate_column_lengths(s)?; + polars_core::series::arithmetic::horizontal::min_horizontal(s) + .map(|opt_c| opt_c.map(|res| res.with_name(s[0].name().clone()))) } -pub fn sum_horizontal(s: &[Column]) -> PolarsResult> { - let df = - unsafe { DataFrame::_new_no_checks_impl(s.first().map_or(0, Column::len), Vec::from(s)) }; - df.sum_horizontal(NullStrategy::Ignore) - .map(|s| s.map(Column::from)) - .map(|opt_s| opt_s.map(|res| res.with_name(s[0].name().clone()))) +pub fn sum_horizontal(s: &[Column], null_strategy: NullStrategy) -> PolarsResult> { + validate_column_lengths(s)?; + polars_core::series::arithmetic::horizontal::sum_horizontal(s, null_strategy) + .map(|opt_c| opt_c.map(|res| res.with_name(s[0].name().clone()))) } -pub fn mean_horizontal(s: &[Column]) -> PolarsResult> { - let df = - unsafe { DataFrame::_new_no_checks_impl(s.first().map_or(0, Column::len), Vec::from(s)) }; - df.mean_horizontal(NullStrategy::Ignore) - .map(|s| s.map(Column::from)) - .map(|opt_s| opt_s.map(|res| res.with_name(s[0].name().clone()))) +pub fn mean_horizontal(s: &[Column], null_strategy: NullStrategy) -> PolarsResult> { + validate_column_lengths(s)?; + polars_core::series::arithmetic::horizontal::mean_horizontal(s, null_strategy) + .map(|opt_c| opt_c.map(|res| res.with_name(s[0].name().clone()))) } pub fn coalesce_columns(s: &[Column]) -> PolarsResult { diff --git a/crates/polars-plan/src/dsl/function_expr/dispatch.rs b/crates/polars-plan/src/dsl/function_expr/dispatch.rs index 895803cbcc63..7a338553e1e1 100644 --- a/crates/polars-plan/src/dsl/function_expr/dispatch.rs +++ b/crates/polars-plan/src/dsl/function_expr/dispatch.rs @@ -1,3 +1,5 @@ +use polars_core::frame::NullStrategy; + use super::*; pub(super) fn reverse(s: &Column) -> PolarsResult { @@ -103,12 +105,25 @@ pub(super) fn min_horizontal(s: &mut [Column]) -> PolarsResult> { polars_ops::prelude::min_horizontal(s) } -pub(super) fn sum_horizontal(s: &mut [Column]) -> PolarsResult> { - polars_ops::prelude::sum_horizontal(s) -} - -pub(super) fn mean_horizontal(s: &mut [Column]) -> PolarsResult> { - polars_ops::prelude::mean_horizontal(s) +pub(super) fn sum_horizontal(s: &mut [Column], ignore_nulls: bool) -> PolarsResult> { + let null_strategy = if ignore_nulls { + NullStrategy::Ignore + } else { + NullStrategy::Propagate + }; + polars_ops::prelude::sum_horizontal(s, null_strategy) +} + +pub(super) fn mean_horizontal( + s: &mut [Column], + ignore_nulls: bool, +) -> PolarsResult> { + let null_strategy = if ignore_nulls { + NullStrategy::Ignore + } else { + NullStrategy::Propagate + }; + polars_ops::prelude::mean_horizontal(s, null_strategy) } pub(super) fn drop_nulls(s: &Column) -> PolarsResult { diff --git a/crates/polars-plan/src/dsl/function_expr/mod.rs b/crates/polars-plan/src/dsl/function_expr/mod.rs index 5813fa7a72cd..d694fcd0409e 100644 --- a/crates/polars-plan/src/dsl/function_expr/mod.rs +++ b/crates/polars-plan/src/dsl/function_expr/mod.rs @@ -333,8 +333,12 @@ pub enum FunctionExpr { }, MaxHorizontal, MinHorizontal, - SumHorizontal, - MeanHorizontal, + SumHorizontal { + ignore_nulls: bool, + }, + MeanHorizontal { + ignore_nulls: bool, + }, #[cfg(feature = "ewma")] EwmMean { options: EWMOptions, @@ -420,8 +424,16 @@ impl Hash for FunctionExpr { lib.hash(state); symbol.hash(state); }, - MaxHorizontal | MinHorizontal | SumHorizontal | MeanHorizontal | DropNans - | DropNulls | Reverse | ArgUnique | Shift | ShiftAndFill => {}, + MaxHorizontal + | MinHorizontal + | SumHorizontal { .. } + | MeanHorizontal { .. } + | DropNans + | DropNulls + | Reverse + | ArgUnique + | Shift + | ShiftAndFill => {}, #[cfg(feature = "mode")] Mode => {}, #[cfg(feature = "abs")] @@ -760,8 +772,8 @@ impl Display for FunctionExpr { ForwardFill { .. } => "forward_fill", MaxHorizontal => "max_horizontal", MinHorizontal => "min_horizontal", - SumHorizontal => "sum_horizontal", - MeanHorizontal => "mean_horizontal", + SumHorizontal { .. } => "sum_horizontal", + MeanHorizontal { .. } => "mean_horizontal", #[cfg(feature = "ewma")] EwmMean { .. } => "ewm_mean", #[cfg(feature = "ewma_by")] @@ -1170,8 +1182,8 @@ impl From for SpecialEq> { ForwardFill { limit } => map!(dispatch::forward_fill, limit), MaxHorizontal => wrap!(dispatch::max_horizontal), MinHorizontal => wrap!(dispatch::min_horizontal), - SumHorizontal => wrap!(dispatch::sum_horizontal), - MeanHorizontal => wrap!(dispatch::mean_horizontal), + SumHorizontal { ignore_nulls } => wrap!(dispatch::sum_horizontal, ignore_nulls), + MeanHorizontal { ignore_nulls } => wrap!(dispatch::mean_horizontal, ignore_nulls), #[cfg(feature = "ewma")] EwmMean { options } => map!(ewm::ewm_mean, options), #[cfg(feature = "ewma_by")] diff --git a/crates/polars-plan/src/dsl/function_expr/schema.rs b/crates/polars-plan/src/dsl/function_expr/schema.rs index 8ac54c172993..018a3b0207d1 100644 --- a/crates/polars-plan/src/dsl/function_expr/schema.rs +++ b/crates/polars-plan/src/dsl/function_expr/schema.rs @@ -329,14 +329,14 @@ impl FunctionExpr { ForwardFill { .. } => mapper.with_same_dtype(), MaxHorizontal => mapper.map_to_supertype(), MinHorizontal => mapper.map_to_supertype(), - SumHorizontal => { + SumHorizontal { .. } => { if mapper.fields[0].dtype() == &DataType::Boolean { mapper.with_dtype(DataType::UInt32) } else { mapper.map_to_supertype() } }, - MeanHorizontal => mapper.map_to_float_dtype(), + MeanHorizontal { .. } => mapper.map_to_float_dtype(), #[cfg(feature = "ewma")] EwmMean { .. } => mapper.map_to_float_dtype(), #[cfg(feature = "ewma_by")] diff --git a/crates/polars-plan/src/dsl/functions/horizontal.rs b/crates/polars-plan/src/dsl/functions/horizontal.rs index 26b6209a720e..f81571c6ff32 100644 --- a/crates/polars-plan/src/dsl/functions/horizontal.rs +++ b/crates/polars-plan/src/dsl/functions/horizontal.rs @@ -274,13 +274,13 @@ pub fn min_horizontal>(exprs: E) -> PolarsResult { } /// Sum all values horizontally across columns. -pub fn sum_horizontal>(exprs: E) -> PolarsResult { +pub fn sum_horizontal>(exprs: E, ignore_nulls: bool) -> PolarsResult { let exprs = exprs.as_ref().to_vec(); polars_ensure!(!exprs.is_empty(), ComputeError: "cannot return empty fold because the number of output rows is unknown"); Ok(Expr::Function { input: exprs, - function: FunctionExpr::SumHorizontal, + function: FunctionExpr::SumHorizontal { ignore_nulls }, options: FunctionOptions { collect_groups: ApplyOptions::ElementWise, flags: FunctionFlags::default() @@ -292,13 +292,13 @@ pub fn sum_horizontal>(exprs: E) -> PolarsResult { } /// Compute the mean of all values horizontally across columns. -pub fn mean_horizontal>(exprs: E) -> PolarsResult { +pub fn mean_horizontal>(exprs: E, ignore_nulls: bool) -> PolarsResult { let exprs = exprs.as_ref().to_vec(); polars_ensure!(!exprs.is_empty(), ComputeError: "cannot return empty fold because the number of output rows is unknown"); Ok(Expr::Function { input: exprs, - function: FunctionExpr::MeanHorizontal, + function: FunctionExpr::MeanHorizontal { ignore_nulls }, options: FunctionOptions { collect_groups: ApplyOptions::ElementWise, flags: FunctionFlags::default() diff --git a/crates/polars-python/src/dataframe/general.rs b/crates/polars-python/src/dataframe/general.rs index 1c2f94368275..80741dea3700 100644 --- a/crates/polars-python/src/dataframe/general.rs +++ b/crates/polars-python/src/dataframe/general.rs @@ -3,7 +3,6 @@ use std::mem::ManuallyDrop; use either::Either; use polars::export::arrow::bitmap::MutableBitmap; use polars::prelude::*; -use polars_core::frame::*; #[cfg(feature = "pivot")] use polars_lazy::frame::pivot::{pivot, pivot_stable}; use polars_row::RowEncodingOptions; @@ -514,44 +513,6 @@ impl PyDataFrame { self.df.clone().lazy().into() } - pub fn max_horizontal(&self, py: Python) -> PyResult> { - let s = py - .allow_threads(|| self.df.max_horizontal()) - .map_err(PyPolarsErr::from)?; - Ok(s.map(|s| s.take_materialized_series().into())) - } - - pub fn min_horizontal(&self, py: Python) -> PyResult> { - let s = py - .allow_threads(|| self.df.min_horizontal()) - .map_err(PyPolarsErr::from)?; - Ok(s.map(|s| s.take_materialized_series().into())) - } - - pub fn sum_horizontal(&self, py: Python, ignore_nulls: bool) -> PyResult> { - let null_strategy = if ignore_nulls { - NullStrategy::Ignore - } else { - NullStrategy::Propagate - }; - let s = py - .allow_threads(|| self.df.sum_horizontal(null_strategy)) - .map_err(PyPolarsErr::from)?; - Ok(s.map(|s| s.into())) - } - - pub fn mean_horizontal(&self, py: Python, ignore_nulls: bool) -> PyResult> { - let null_strategy = if ignore_nulls { - NullStrategy::Ignore - } else { - NullStrategy::Propagate - }; - let s = py - .allow_threads(|| self.df.mean_horizontal(null_strategy)) - .map_err(PyPolarsErr::from)?; - Ok(s.map(|s| s.into())) - } - #[pyo3(signature = (columns, separator, drop_first=false))] pub fn to_dummies( &self, diff --git a/crates/polars-python/src/functions/aggregation.rs b/crates/polars-python/src/functions/aggregation.rs index 1d27ae8fee69..03c7f802f7ee 100644 --- a/crates/polars-python/src/functions/aggregation.rs +++ b/crates/polars-python/src/functions/aggregation.rs @@ -34,15 +34,15 @@ pub fn min_horizontal(exprs: Vec) -> PyResult { } #[pyfunction] -pub fn sum_horizontal(exprs: Vec) -> PyResult { +pub fn sum_horizontal(exprs: Vec, ignore_nulls: bool) -> PyResult { let exprs = exprs.to_exprs(); - let e = dsl::sum_horizontal(exprs).map_err(PyPolarsErr::from)?; + let e = dsl::sum_horizontal(exprs, ignore_nulls).map_err(PyPolarsErr::from)?; Ok(e.into()) } #[pyfunction] -pub fn mean_horizontal(exprs: Vec) -> PyResult { +pub fn mean_horizontal(exprs: Vec, ignore_nulls: bool) -> PyResult { let exprs = exprs.to_exprs(); - let e = dsl::mean_horizontal(exprs).map_err(PyPolarsErr::from)?; + let e = dsl::mean_horizontal(exprs, ignore_nulls).map_err(PyPolarsErr::from)?; Ok(e.into()) } diff --git a/crates/polars-python/src/lazyframe/visit.rs b/crates/polars-python/src/lazyframe/visit.rs index b698f68a47c7..27633e401301 100644 --- a/crates/polars-python/src/lazyframe/visit.rs +++ b/crates/polars-python/src/lazyframe/visit.rs @@ -57,7 +57,7 @@ impl NodeTraverser { // Increment major on breaking changes to the IR (e.g. renaming // fields, reordering tuples), minor on backwards compatible // changes (e.g. exposing a new expression node). - const VERSION: Version = (3, 2); + const VERSION: Version = (4, 2); pub fn new(root: Node, lp_arena: Arena, expr_arena: Arena) -> Self { Self { diff --git a/crates/polars-python/src/lazyframe/visitor/expr_nodes.rs b/crates/polars-python/src/lazyframe/visitor/expr_nodes.rs index c5cda028f74b..604398d78857 100644 --- a/crates/polars-python/src/lazyframe/visitor/expr_nodes.rs +++ b/crates/polars-python/src/lazyframe/visitor/expr_nodes.rs @@ -1326,9 +1326,13 @@ pub(crate) fn into_py(py: Python<'_>, expr: &AExpr) -> PyResult { }, FunctionExpr::BackwardFill { limit } => ("backward_fill", limit).to_object(py), FunctionExpr::ForwardFill { limit } => ("forward_fill", limit).to_object(py), - FunctionExpr::SumHorizontal => ("sum_horizontal",).to_object(py), + FunctionExpr::SumHorizontal { ignore_nulls } => { + ("sum_horizontal", ignore_nulls).to_object(py) + }, FunctionExpr::MaxHorizontal => ("max_horizontal",).to_object(py), - FunctionExpr::MeanHorizontal => ("mean_horizontal",).to_object(py), + FunctionExpr::MeanHorizontal { ignore_nulls } => { + ("mean_horizontal", ignore_nulls).to_object(py) + }, FunctionExpr::MinHorizontal => ("min_horizontal",).to_object(py), FunctionExpr::EwmMean { options: _ } => { return Err(PyNotImplementedError::new_err("ewm mean")) diff --git a/py-polars/polars/dataframe/frame.py b/py-polars/polars/dataframe/frame.py index 92b10e81ca45..49e1053e375d 100644 --- a/py-polars/polars/dataframe/frame.py +++ b/py-polars/polars/dataframe/frame.py @@ -9528,7 +9528,9 @@ def sum_horizontal(self, *, ignore_nulls: bool = True) -> Series: 9.0 ] """ - return wrap_s(self._df.sum_horizontal(ignore_nulls)).alias("sum") + return self.select( + sum=F.sum_horizontal(F.all(), ignore_nulls=ignore_nulls) + ).to_series() def mean(self) -> DataFrame: """ @@ -9588,7 +9590,9 @@ def mean_horizontal(self, *, ignore_nulls: bool = True) -> Series: 4.5 ] """ - return wrap_s(self._df.mean_horizontal(ignore_nulls)).alias("mean") + return self.select( + mean=F.mean_horizontal(F.all(), ignore_nulls=ignore_nulls) + ).to_series() def std(self, ddof: int = 1) -> DataFrame: """ diff --git a/py-polars/polars/functions/aggregation/horizontal.py b/py-polars/polars/functions/aggregation/horizontal.py index 5406a77d287d..ba72d42b609a 100644 --- a/py-polars/polars/functions/aggregation/horizontal.py +++ b/py-polars/polars/functions/aggregation/horizontal.py @@ -178,7 +178,9 @@ def min_horizontal(*exprs: IntoExpr | Iterable[IntoExpr]) -> Expr: return wrap_expr(plr.min_horizontal(pyexprs)) -def sum_horizontal(*exprs: IntoExpr | Iterable[IntoExpr]) -> Expr: +def sum_horizontal( + *exprs: IntoExpr | Iterable[IntoExpr], ignore_nulls: bool = True +) -> Expr: """ Sum all values horizontally across columns. @@ -187,6 +189,9 @@ def sum_horizontal(*exprs: IntoExpr | Iterable[IntoExpr]) -> Expr: *exprs Column(s) to use in the aggregation. Accepts expression input. Strings are parsed as column names, other non-expression inputs are parsed as literals. + ignore_nulls + Ignore null values (default). + If set to `False`, any null value in the input will lead to a null output. Examples -------- @@ -210,10 +215,12 @@ def sum_horizontal(*exprs: IntoExpr | Iterable[IntoExpr]) -> Expr: └─────┴──────┴─────┴─────┘ """ pyexprs = parse_into_list_of_expressions(*exprs) - return wrap_expr(plr.sum_horizontal(pyexprs)) + return wrap_expr(plr.sum_horizontal(pyexprs, ignore_nulls)) -def mean_horizontal(*exprs: IntoExpr | Iterable[IntoExpr]) -> Expr: +def mean_horizontal( + *exprs: IntoExpr | Iterable[IntoExpr], ignore_nulls: bool = True +) -> Expr: """ Compute the mean of all values horizontally across columns. @@ -222,6 +229,9 @@ def mean_horizontal(*exprs: IntoExpr | Iterable[IntoExpr]) -> Expr: *exprs Column(s) to use in the aggregation. Accepts expression input. Strings are parsed as column names, other non-expression inputs are parsed as literals. + ignore_nulls + Ignore null values (default). + If set to `False`, any null value in the input will lead to a null output. Examples -------- @@ -245,7 +255,7 @@ def mean_horizontal(*exprs: IntoExpr | Iterable[IntoExpr]) -> Expr: └─────┴──────┴─────┴──────┘ """ pyexprs = parse_into_list_of_expressions(*exprs) - return wrap_expr(plr.mean_horizontal(pyexprs)) + return wrap_expr(plr.mean_horizontal(pyexprs, ignore_nulls)) def cum_sum_horizontal(*exprs: IntoExpr | Iterable[IntoExpr]) -> Expr: diff --git a/py-polars/tests/unit/functions/test_horizontal.py b/py-polars/tests/unit/functions/test_horizontal.py new file mode 100644 index 000000000000..688e2bfcf58a --- /dev/null +++ b/py-polars/tests/unit/functions/test_horizontal.py @@ -0,0 +1,21 @@ +import pytest + +import polars as pl + + +@pytest.mark.parametrize( + "f", + [ + "min", + "max", + "sum", + "mean", + ], +) +def test_shape_mismatch_19336(f: str) -> None: + a = pl.Series([1, 2, 3]) + b = pl.Series([1, 2]) + fn = getattr(pl, f"{f}_horizontal") + + with pytest.raises(pl.exceptions.ShapeError): + pl.select((fn)(a, b))