Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor: Add reduce ComputeNode in new streaming engine #17389

Merged
merged 14 commits into from
Jul 17, 2024
Merged
Show file tree
Hide file tree
Changes from 10 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,7 @@ uuid = { version = "1.7.0", features = ["v4"] }
version_check = "0.9.4"
xxhash-rust = { version = "0.8.6", features = ["xxh3"] }
zstd = "0.13"
dyn-clone = "1"

polars = { version = "0.41.3", path = "crates/polars", default-features = false }
polars-compute = { version = "0.41.3", path = "crates/polars-compute", default-features = false }
Expand Down
2 changes: 1 addition & 1 deletion crates/Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ fmt: ## Run rustfmt and dprint

.PHONY: check
check: ## Run cargo check with all features
cargo check -p polars --all-features
cargo check -p polars --all-features -p polars-stream
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Was this supposed to be part of the PR?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe.. Then we can ensure that all warnings are fixed. But if CI doesn't like it, I will remove it.


.PHONY: clippy
clippy: ## Run clippy with all features
Expand Down
38 changes: 2 additions & 36 deletions crates/polars-core/src/datatypes/any_value.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,40 +9,6 @@ use polars_utils::sync::SyncPtr;
use polars_utils::total_ord::ToTotalOrd;
use polars_utils::unwrap::UnwrapUncheckedRelease;

#[derive(Clone)]
pub struct Scalar {
dtype: DataType,
value: AnyValue<'static>,
}

impl Scalar {
pub fn new(dtype: DataType, value: AnyValue<'static>) -> Self {
Self { dtype, value }
}

pub fn value(&self) -> &AnyValue<'static> {
&self.value
}

pub fn as_any_value(&self) -> AnyValue {
self.value
.strict_cast(&self.dtype)
.unwrap_or_else(|| self.value.clone())
}

pub fn into_series(self, name: &str) -> Series {
Series::from_any_values_and_dtype(name, &[self.as_any_value()], &self.dtype, true).unwrap()
}

pub fn dtype(&self) -> &DataType {
&self.dtype
}

pub fn update(&mut self, value: AnyValue<'static>) {
self.value = value;
}
}

use super::*;
#[cfg(feature = "dtype-struct")]
use crate::prelude::any_value::arr_to_any_value;
Expand Down Expand Up @@ -854,8 +820,8 @@ impl<'a> AnyValue<'a> {
pub fn add(&self, rhs: &AnyValue) -> AnyValue<'static> {
use AnyValue::*;
match (self, rhs) {
(Null, _) => Null,
(_, Null) => Null,
(Null, r) => r.clone().into_static().unwrap(),
(l, Null) => l.clone().into_static().unwrap(),
(Int32(l), Int32(r)) => Int32(l + r),
(Int64(l), Int64(r)) => Int64(l + r),
(UInt32(l), UInt32(r)) => UInt32(l + r),
Expand Down
1 change: 1 addition & 0 deletions crates/polars-core/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ mod named_from;
pub mod prelude;
#[cfg(feature = "random")]
pub mod random;
pub mod scalar;
pub mod schema;
#[cfg(feature = "serde")]
pub mod serde;
Expand Down
1 change: 1 addition & 0 deletions crates/polars-core/src/prelude.rs
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ pub use crate::frame::group_by::*;
pub use crate::frame::{DataFrame, UniqueKeepStrategy};
pub use crate::hashing::VecHash;
pub use crate::named_from::{NamedFrom, NamedFromOwned};
pub use crate::scalar::Scalar;
pub use crate::schema::*;
#[cfg(feature = "checked_arithmetic")]
pub use crate::series::arithmetic::checked::NumOpsDispatchChecked;
Expand Down
38 changes: 38 additions & 0 deletions crates/polars-core/src/scalar/mod.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
pub mod reduce;

use crate::datatypes::{AnyValue, DataType};
use crate::prelude::Series;

#[derive(Clone)]
pub struct Scalar {
dtype: DataType,
value: AnyValue<'static>,
}

impl Scalar {
pub fn new(dtype: DataType, value: AnyValue<'static>) -> Self {
Self { dtype, value }
}

pub fn value(&self) -> &AnyValue<'static> {
&self.value
}

pub fn as_any_value(&self) -> AnyValue {
self.value
.strict_cast(&self.dtype)
.unwrap_or_else(|| self.value.clone())
}

pub fn into_series(self, name: &str) -> Series {
Series::from_any_values_and_dtype(name, &[self.as_any_value()], &self.dtype, true).unwrap()
}

pub fn dtype(&self) -> &DataType {
&self.dtype
}

pub fn update(&mut self, value: AnyValue<'static>) {
self.value = value;
}
}
35 changes: 35 additions & 0 deletions crates/polars-core/src/scalar/reduce.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
use crate::datatypes::{AnyValue, TimeUnit};
use crate::prelude::{DataType, Scalar, MS_IN_DAY};

pub fn mean_reduce(value: Option<f64>, dtype: DataType) -> Scalar {
match dtype {
DataType::Float32 => {
let val = value.map(|m| m as f32);
Scalar::new(dtype, val.into())
},
dt if dt.is_numeric() || dt.is_decimal() || dt.is_bool() => {
Scalar::new(DataType::Float64, value.into())
},
#[cfg(feature = "dtype-date")]
DataType::Date => {
let val = value.map(|v| (v * MS_IN_DAY as f64) as i64);
Scalar::new(DataType::Datetime(TimeUnit::Milliseconds, None), val.into())
},
#[cfg(feature = "dtype-datetime")]
dt @ DataType::Datetime(_, _) => {
let val = value.map(|v| v as i64);
Scalar::new(dt, val.into())
},
#[cfg(feature = "dtype-duration")]
dt @ DataType::Duration(_) => {
let val = value.map(|v| v as i64);
Scalar::new(dt, val.into())
},
#[cfg(feature = "dtype-time")]
dt @ DataType::Time => {
let val = value.map(|v| v as i64);
Scalar::new(dt, val.into())
},
dt => Scalar::new(dt, AnyValue::Null),
}
}
36 changes: 1 addition & 35 deletions crates/polars-core/src/series/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -808,41 +808,7 @@ impl Series {
}

pub fn mean_reduce(&self) -> Scalar {
match self.dtype() {
DataType::Float32 => {
let val = self.mean().map(|m| m as f32);
Scalar::new(self.dtype().clone(), val.into())
},
dt if dt.is_numeric() || dt.is_decimal() || dt.is_bool() => {
let val = self.mean();
Scalar::new(DataType::Float64, val.into())
},
#[cfg(feature = "dtype-date")]
DataType::Date => {
let val = self.mean().map(|v| (v * MS_IN_DAY as f64) as i64);
let av: AnyValue = val.into();
Scalar::new(DataType::Datetime(TimeUnit::Milliseconds, None), av)
},
#[cfg(feature = "dtype-datetime")]
dt @ DataType::Datetime(_, _) => {
let val = self.mean().map(|v| v as i64);
let av: AnyValue = val.into();
Scalar::new(dt.clone(), av)
},
#[cfg(feature = "dtype-duration")]
dt @ DataType::Duration(_) => {
let val = self.mean().map(|v| v as i64);
let av: AnyValue = val.into();
Scalar::new(dt.clone(), av)
},
#[cfg(feature = "dtype-time")]
dt @ DataType::Time => {
let val = self.mean().map(|v| v as i64);
let av: AnyValue = val.into();
Scalar::new(dt.clone(), av)
},
dt => Scalar::new(dt.clone(), AnyValue::Null),
}
crate::scalar::reduce::mean_reduce(self.mean(), self.dtype().clone())
}

/// Compute the unique elements, but maintain order. This requires more work
Expand Down
1 change: 1 addition & 0 deletions crates/polars-expr/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ description = "Physical expression implementation of the Polars project."
ahash = { workspace = true }
arrow = { workspace = true }
bitflags = { workspace = true }
dyn-clone = { workspace = true }
once_cell = { workspace = true }
polars-core = { workspace = true, features = ["lazy", "zip_with", "random"] }
polars-io = { workspace = true, features = ["lazy"] }
Expand Down
1 change: 1 addition & 0 deletions crates/polars-expr/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
mod expressions;
pub mod planner;
pub mod prelude;
pub mod reduce;
pub mod state;

pub use crate::planner::{create_physical_expr, ExpressionConversionState};
82 changes: 82 additions & 0 deletions crates/polars-expr/src/reduce/convert.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,82 @@
use polars_core::error::feature_gated;
use polars_plan::prelude::*;
use polars_utils::arena::{Arena, Node};

use super::extrema::*;
use super::sum::SumReduce;
use super::*;
use crate::reduce::mean::MeanReduce;

pub fn can_convert_into_reduction(node: Node, expr_arena: &Arena<AExpr>) -> bool {
match expr_arena.get(node) {
AExpr::Agg(agg) => matches!(
agg,
IRAggExpr::Min { .. }
| IRAggExpr::Max { .. }
| IRAggExpr::Mean { .. }
| IRAggExpr::Sum(_)
),
_ => false,
}
}

pub fn into_reduction(
node: Node,
expr_arena: &Arena<AExpr>,
schema: &Schema,
) -> PolarsResult<Option<(Box<dyn Reduction>, Node)>> {
let e = expr_arena.get(node);
let field = e.to_field(schema, Context::Default, expr_arena)?;
let out = match expr_arena.get(node) {
AExpr::Agg(agg) => match agg {
IRAggExpr::Sum(node) => (
Box::new(SumReduce::new(field.dtype.clone())) as Box<dyn Reduction>,
*node,
),
IRAggExpr::Min {
propagate_nans,
input,
} => {
if *propagate_nans && field.dtype.is_float() {
feature_gated!("propagate_nans", {
let out: Box<dyn Reduction> = match field.dtype {
DataType::Float32 => Box::new(MinNanReduce::<Float32Type>::new()),
DataType::Float64 => Box::new(MinNanReduce::<Float64Type>::new()),
_ => unreachable!(),
};
(out, *input)
})
} else {
(
Box::new(MinReduce::new(field.dtype.clone())) as Box<dyn Reduction>,
*input,
)
}
},
IRAggExpr::Max {
propagate_nans,
input,
} => {
if *propagate_nans && field.dtype.is_float() {
feature_gated!("propagate_nans", {
let out: Box<dyn Reduction> = match field.dtype {
DataType::Float32 => Box::new(MaxNanReduce::<Float32Type>::new()),
DataType::Float64 => Box::new(MaxNanReduce::<Float64Type>::new()),
_ => unreachable!(),
};
(out, *input)
})
} else {
(Box::new(MaxReduce::new(field.dtype.clone())) as _, *input)
}
},
IRAggExpr::Mean(input) => {
let out: Box<dyn Reduction> = Box::new(MeanReduce::new(field.dtype.clone()));
(out, *input)
},
_ => return Ok(None),
},
_ => return Ok(None),
};
Ok(Some(out))
}
Loading
Loading