diff --git a/crates/polars-error/src/lib.rs b/crates/polars-error/src/lib.rs index 63669029acbd..8c06452e3f89 100644 --- a/crates/polars-error/src/lib.rs +++ b/crates/polars-error/src/lib.rs @@ -55,11 +55,11 @@ pub enum PolarsError { ComputeError(ErrString), #[error("duplicate: {0}")] Duplicate(ErrString), - #[error("invalid operation: {0}")] + #[error("{0}")] InvalidOperation(ErrString), #[error("{}", match msg { - Some(msg) => format!("{}", msg), - None => format!("{}", error) + Some(msg) => format!("{}", msg), + None => format!("{}", error) })] IO { error: Arc, diff --git a/crates/polars-lazy/Cargo.toml b/crates/polars-lazy/Cargo.toml index a93d4743f17c..91477915c381 100644 --- a/crates/polars-lazy/Cargo.toml +++ b/crates/polars-lazy/Cargo.toml @@ -250,6 +250,7 @@ binary_encoding = ["polars-plan/binary_encoding"] string_encoding = ["polars-plan/string_encoding"] bigidx = ["polars-plan/bigidx"] +polars_cloud = ["polars-plan/polars_cloud"] panic_on_schema = ["polars-plan/panic_on_schema", "polars-expr/panic_on_schema"] diff --git a/crates/polars-lazy/src/prelude.rs b/crates/polars-lazy/src/prelude.rs index edb1366e55aa..e81967b1a6e4 100644 --- a/crates/polars-lazy/src/prelude.rs +++ b/crates/polars-lazy/src/prelude.rs @@ -10,6 +10,8 @@ pub use polars_io::parquet::write::ParquetWriteOptions; pub use polars_ops::prelude::{JoinArgs, JoinType, JoinValidation}; #[cfg(feature = "rank")] pub use polars_ops::prelude::{RankMethod, RankOptions}; +#[cfg(feature = "polars_cloud")] +pub use polars_plan::client::assert_cloud_eligible; pub use polars_plan::plans::{ AnonymousScan, AnonymousScanArgs, AnonymousScanOptions, DslPlan, Literal, LiteralValue, Null, NULL, diff --git a/crates/polars-plan/Cargo.toml b/crates/polars-plan/Cargo.toml index ede4f1530934..55de822d1a4a 100644 --- a/crates/polars-plan/Cargo.toml +++ b/crates/polars-plan/Cargo.toml @@ -179,6 +179,7 @@ month_end = ["polars-time/month_end"] offset_by = ["polars-time/offset_by"] bigidx = ["polars-core/bigidx"] +polars_cloud = [] panic_on_schema = [] diff --git a/crates/polars-plan/src/client/dsl.rs b/crates/polars-plan/src/client/dsl.rs new file mode 100644 index 000000000000..cbb4eb22b937 --- /dev/null +++ b/crates/polars-plan/src/client/dsl.rs @@ -0,0 +1,102 @@ +use crate::dsl::Expr; +use crate::prelude::DslPlan; + +impl DslPlan { + fn inputs<'a>(&'a self, scratch: &mut Vec<&'a DslPlan>) { + use DslPlan::*; + match self { + Select { input, .. } + | GroupBy { input, .. } + | Filter { input, .. } + | Distinct { input, .. } + | Sort { input, .. } + | Slice { input, .. } + | HStack { input, .. } + | MapFunction { input, .. } + | Sink { input, .. } + | Cache { input, .. } => scratch.push(input), + Union { inputs, .. } | HConcat { inputs, .. } => scratch.extend(inputs), + Join { + input_left, + input_right, + .. + } => { + scratch.push(input_left); + scratch.push(input_right); + }, + ExtContext { input, contexts } => { + scratch.push(input); + scratch.extend(contexts); + }, + IR { dsl, .. } => scratch.push(dsl), + Scan { .. } | DataFrameScan { .. } => (), + #[cfg(feature = "python")] + PythonScan { .. } => (), + } + } + + pub(super) fn get_expr<'a>(&'a self, scratch: &mut Vec<&'a Expr>) { + use DslPlan::*; + match self { + Filter { predicate, .. } => scratch.push(predicate), + Scan { predicate, .. } => { + if let Some(expr) = predicate { + scratch.push(expr) + } + }, + DataFrameScan { filter, .. } => { + if let Some(expr) = filter { + scratch.push(expr) + } + }, + Select { expr, .. } => scratch.extend(expr), + HStack { exprs, .. } => scratch.extend(exprs), + Sort { by_column, .. } => scratch.extend(by_column), + GroupBy { keys, aggs, .. } => { + scratch.extend(keys); + scratch.extend(aggs); + }, + Join { + left_on, right_on, .. + } => { + scratch.extend(left_on); + scratch.extend(right_on); + }, + Cache { .. } + | Distinct { .. } + | Slice { .. } + | MapFunction { .. } + | Union { .. } + | HConcat { .. } + | ExtContext { .. } + | Sink { .. } + | IR { .. } => (), + #[cfg(feature = "python")] + PythonScan { .. } => (), + } + } +} + +pub struct DslPlanIter<'a> { + stack: Vec<&'a DslPlan>, +} + +impl<'a> Iterator for DslPlanIter<'a> { + type Item = &'a DslPlan; + + fn next(&mut self) -> Option { + self.stack.pop().map(|next| { + next.inputs(&mut self.stack); + next + }) + } +} + +impl<'a> IntoIterator for &'a DslPlan { + type Item = &'a DslPlan; + type IntoIter = DslPlanIter<'a>; + + fn into_iter(self) -> Self::IntoIter { + DslPlanIter { stack: vec![self] } + } +} diff --git a/crates/polars-plan/src/client/mod.rs b/crates/polars-plan/src/client/mod.rs new file mode 100644 index 000000000000..d7b042ae59a5 --- /dev/null +++ b/crates/polars-plan/src/client/mod.rs @@ -0,0 +1,68 @@ +mod dsl; + +use polars_core::error::{polars_err, PolarsResult}; +use polars_io::path_utils::is_cloud_url; + +use crate::dsl::Expr; +use crate::plans::options::SinkType; +use crate::plans::{DslFunction, DslPlan, FunctionNode}; + +/// Assert that the given [`DslPlan`] is eligible to be executed on Polars Cloud. +pub fn assert_cloud_eligible(dsl: &DslPlan) -> PolarsResult<()> { + let mut expr_stack = vec![]; + for plan_node in dsl.into_iter() { + match plan_node { + DslPlan::MapFunction { + function: DslFunction::FunctionNode(function), + .. + } => match function { + FunctionNode::Opaque { .. } => return ineligible_error("contains opaque function"), + #[cfg(feature = "python")] + FunctionNode::OpaquePython { .. } => { + return ineligible_error("contains Python function") + }, + _ => (), + }, + #[cfg(feature = "python")] + DslPlan::PythonScan { .. } => return ineligible_error("contains Python scan"), + DslPlan::GroupBy { apply: Some(_), .. } => { + return ineligible_error("contains Python function in group by operation") + }, + DslPlan::Scan { paths, .. } + if paths.lock().unwrap().0.iter().any(|p| !is_cloud_url(p)) => + { + return ineligible_error("contains scan of local file system") + }, + DslPlan::Sink { payload, .. } => { + if !matches!(payload, SinkType::Cloud { .. }) { + return ineligible_error("contains sink to non-cloud location"); + } + }, + plan => { + plan.get_expr(&mut expr_stack); + + for expr in expr_stack.drain(..) { + for expr_node in expr.into_iter() { + match expr_node { + Expr::AnonymousFunction { .. } => { + return ineligible_error("contains anonymous function") + }, + Expr::RenameAlias { .. } => { + return ineligible_error("contains custom name remapping") + }, + _ => (), + } + } + } + }, + } + } + Ok(()) +} + +fn ineligible_error(message: &str) -> PolarsResult<()> { + Err(polars_err!( + InvalidOperation: + "logical plan ineligible for execution on Polars Cloud: {message}" + )) +} diff --git a/crates/polars-plan/src/lib.rs b/crates/polars-plan/src/lib.rs index 66ea5e34eece..a3b1b42808ab 100644 --- a/crates/polars-plan/src/lib.rs +++ b/crates/polars-plan/src/lib.rs @@ -12,4 +12,6 @@ pub mod plans; pub mod prelude; // Activate later // mod reduce; +#[cfg(feature = "polars_cloud")] +pub mod client; pub mod utils; diff --git a/crates/polars/Cargo.toml b/crates/polars/Cargo.toml index 49204000d01d..97bb99b279cf 100644 --- a/crates/polars/Cargo.toml +++ b/crates/polars/Cargo.toml @@ -129,7 +129,6 @@ approx_unique = ["polars-lazy?/approx_unique", "polars-ops/approx_unique"] arg_where = ["polars-lazy?/arg_where"] array_any_all = ["polars-lazy?/array_any_all", "dtype-array"] asof_join = ["polars-lazy?/asof_join", "polars-ops/asof_join"] -bigidx = ["polars-core/bigidx", "polars-lazy?/bigidx", "polars-ops/big_idx"] binary_encoding = ["polars-ops/binary_encoding", "polars-lazy?/binary_encoding", "polars-sql?/binary_encoding"] business = ["polars-lazy?/business", "polars-ops/business"] checked_arithmetic = ["polars-core/checked_arithmetic"] @@ -227,6 +226,9 @@ true_div = ["polars-lazy?/true_div"] unique_counts = ["polars-ops/unique_counts", "polars-lazy?/unique_counts"] zip_with = ["polars-core/zip_with"] +bigidx = ["polars-core/bigidx", "polars-lazy?/bigidx", "polars-ops/big_idx"] +polars_cloud = ["polars-lazy?/polars_cloud"] + test = [ "lazy", "rolling_window", diff --git a/py-polars/Cargo.toml b/py-polars/Cargo.toml index 17a8af3fe1b5..c399694d3f40 100644 --- a/py-polars/Cargo.toml +++ b/py-polars/Cargo.toml @@ -232,6 +232,8 @@ optimizations = [ "streaming", ] +polars_cloud = ["polars/polars_cloud"] + all = [ "optimizations", "io", @@ -244,6 +246,7 @@ all = [ "sql", "binary_encoding", "ffi_plugin", + "polars_cloud", # "new_streaming", ] diff --git a/py-polars/polars/_utils/cloud.py b/py-polars/polars/_utils/cloud.py new file mode 100644 index 000000000000..f722d1ab304b --- /dev/null +++ b/py-polars/polars/_utils/cloud.py @@ -0,0 +1,30 @@ +from __future__ import annotations + +from typing import TYPE_CHECKING + +import polars.polars as plr + +if TYPE_CHECKING: + from polars import LazyFrame + + +def assert_cloud_eligible(lf: LazyFrame) -> None: + """ + Assert that the given LazyFrame is eligible to be executed on Polars Cloud. + + The following conditions will disqualify a LazyFrame from being eligible: + + - Contains a user-defined function + - Scans a local filesystem + + Parameters + ---------- + lf + The LazyFrame to check. + + Raises + ------ + AssertionError + If the given LazyFrame is not eligible to be run on Polars Cloud. + """ + plr.assert_cloud_eligible(lf._ldf) diff --git a/py-polars/src/cloud.rs b/py-polars/src/cloud.rs new file mode 100644 index 000000000000..acd538a238d9 --- /dev/null +++ b/py-polars/src/cloud.rs @@ -0,0 +1,12 @@ +use pyo3::exceptions::PyAssertionError; +use pyo3::prelude::*; + +use crate::PyLazyFrame; + +#[pyfunction] +pub fn assert_cloud_eligible(lf: PyLazyFrame) -> PyResult<()> { + let plan = &lf.ldf.logical_plan; + polars::prelude::assert_cloud_eligible(plan) + .map_err(|e| PyAssertionError::new_err(e.to_string()))?; + Ok(()) +} diff --git a/py-polars/src/lib.rs b/py-polars/src/lib.rs index 4e91dbb63e1f..eea76cbc7ad7 100644 --- a/py-polars/src/lib.rs +++ b/py-polars/src/lib.rs @@ -13,6 +13,8 @@ mod build { mod allocator; #[cfg(feature = "csv")] mod batched_csv; +#[cfg(feature = "polars_cloud")] +mod cloud; mod conversion; mod dataframe; mod datatypes; @@ -402,6 +404,11 @@ fn polars(py: Python, m: &Bound) -> PyResult<()> { ) .unwrap(); + // Cloud + #[cfg(feature = "polars_cloud")] + m.add_wrapped(wrap_pyfunction!(cloud::assert_cloud_eligible)) + .unwrap(); + // Build info m.add("__version__", env!("CARGO_PKG_VERSION"))?; #[cfg(feature = "build_info")] diff --git a/py-polars/tests/unit/cloud/__init__.py b/py-polars/tests/unit/cloud/__init__.py new file mode 100644 index 000000000000..e69de29bb2d1 diff --git a/py-polars/tests/unit/cloud/test_assert_cloud_eligible.py b/py-polars/tests/unit/cloud/test_assert_cloud_eligible.py new file mode 100644 index 000000000000..76730dc9c01a --- /dev/null +++ b/py-polars/tests/unit/cloud/test_assert_cloud_eligible.py @@ -0,0 +1,86 @@ +from pathlib import Path + +import pyarrow.dataset as ds +import pytest + +import polars as pl +from polars._utils.cloud import assert_cloud_eligible + +CLOUD_PATH = "s3://my-nonexistent-bucket/dataset" + + +@pytest.mark.parametrize( + "lf", + [ + pl.scan_parquet(CLOUD_PATH).select("c", pl.lit(2)).with_row_index(), + pl.LazyFrame({"a": [1, 2], "b": [3, 4]}) + .select("a", "b") + .filter(pl.col("a") == pl.lit(1)), + ], +) +def test_assert_cloud_eligible(lf: pl.LazyFrame) -> None: + assert_cloud_eligible(lf) + + +@pytest.mark.parametrize( + "lf", + [ + pl.LazyFrame({"a": [1, 2], "b": [3, 4]}).select( + pl.col("a").map_elements(lambda x: sum(x)) + ), + pl.LazyFrame({"a": [1, 2], "b": [3, 4]}).select( + pl.col("b").map_batches(lambda x: sum(x)) + ), + pl.LazyFrame({"a": [{"x": 1, "y": 2}]}).select( + pl.col("a").name.map(lambda x: x.upper()) + ), + pl.LazyFrame({"a": [{"x": 1, "y": 2}]}).select( + pl.col("a").name.map_fields(lambda x: x.upper()) + ), + pl.LazyFrame({"a": [1, 2], "b": [3, 4]}).map_batches(lambda x: x), + pl.LazyFrame({"a": [1, 2], "b": [3, 4]}) + .group_by("a") + .map_groups(lambda x: x, schema={"b": pl.Int64}), + pl.LazyFrame({"a": [1, 2], "b": [3, 4]}) + .group_by("a") + .agg(pl.col("b").map_batches(lambda x: sum(x))), + pl.scan_parquet(CLOUD_PATH).filter( + pl.col("a") < pl.lit(1).map_elements(lambda x: x + 1) + ), + ], +) +def test_assert_cloud_eligible_fail_on_udf(lf: pl.LazyFrame) -> None: + with pytest.raises( + AssertionError, match="logical plan ineligible for execution on Polars Cloud" + ): + assert_cloud_eligible(lf) + + +@pytest.mark.parametrize( + "lf", + [ + pl.scan_parquet("data.parquet"), + pl.scan_ndjson(Path("data.ndjson")), + pl.scan_csv("data-*.csv"), + pl.scan_ipc(["data-1.feather", "data-2.feather"]), + ], +) +def test_assert_cloud_eligible_fail_on_local_data_source(lf: pl.LazyFrame) -> None: + with pytest.raises( + AssertionError, match="logical plan ineligible for execution on Polars Cloud" + ): + assert_cloud_eligible(lf) + + +@pytest.mark.write_disk() +def test_assert_cloud_eligible_fail_on_python_scan(tmp_path: Path) -> None: + tmp_path.mkdir(exist_ok=True) + data_path = tmp_path / "data.parquet" + pl.DataFrame({"a": [1, 2]}).write_parquet(data_path) + dataset = ds.dataset(data_path, format="parquet") + + lf = pl.scan_pyarrow_dataset(dataset) + with pytest.raises( + AssertionError, match="logical plan ineligible for execution on Polars Cloud" + ): + assert_cloud_eligible(lf)