diff --git a/crates/polars-core/src/datatypes/aliases.rs b/crates/polars-core/src/datatypes/aliases.rs index b4ff2e9075c2..263598de0140 100644 --- a/crates/polars-core/src/datatypes/aliases.rs +++ b/crates/polars-core/src/datatypes/aliases.rs @@ -19,6 +19,8 @@ pub type IdxType = UInt32Type; #[cfg(feature = "bigidx")] pub type IdxType = UInt64Type; +pub use smartstring::alias::String as SmartString; + /// This hashmap uses an IdHasher pub type PlIdHashMap = hashbrown::HashMap; diff --git a/crates/polars-lazy/src/physical_plan/executors/scan/mod.rs b/crates/polars-lazy/src/physical_plan/executors/scan/mod.rs index c03ad56615e6..c5bde87dca6c 100644 --- a/crates/polars-lazy/src/physical_plan/executors/scan/mod.rs +++ b/crates/polars-lazy/src/physical_plan/executors/scan/mod.rs @@ -59,8 +59,8 @@ fn prepare_scan_args( /// Producer of an in memory DataFrame pub struct DataFrameExec { pub(crate) df: Arc, - pub(crate) selection: Option>, - pub(crate) projection: Option>, + pub(crate) filter: Option>, + pub(crate) projection: Option>, pub(crate) predicate_has_windows: bool, } @@ -72,10 +72,10 @@ impl Executor for DataFrameExec { // projection should be before selection as those are free // TODO: this is only the case if we don't create new columns if let Some(projection) = &self.projection { - df = df.select(projection.as_ref())?; + df = df.select(projection.as_slice())?; } - if let Some(selection) = &self.selection { + if let Some(selection) = &self.filter { if self.predicate_has_windows { state.insert_has_window_function_flag() } diff --git a/crates/polars-lazy/src/physical_plan/planner/lp.rs b/crates/polars-lazy/src/physical_plan/planner/lp.rs index e6e29d35a2af..cbd9c260a3c6 100644 --- a/crates/polars-lazy/src/physical_plan/planner/lp.rs +++ b/crates/polars-lazy/src/physical_plan/planner/lp.rs @@ -350,9 +350,9 @@ fn create_physical_plan_impl( }, DataFrameScan { df, - projection, filter: predicate, schema, + output_schema, .. } => { let mut state = ExpressionConversionState::new(true, state.expr_depth); @@ -369,8 +369,8 @@ fn create_physical_plan_impl( .transpose()?; Ok(Box::new(executors::DataFrameExec { df, - projection, - selection, + projection: output_schema.map(|s| s.iter_names().cloned().collect()), + filter: selection, predicate_has_windows: state.has_windows, })) }, diff --git a/crates/polars-lazy/src/physical_plan/streaming/construct_pipeline.rs b/crates/polars-lazy/src/physical_plan/streaming/construct_pipeline.rs index a6e92a6c9b6d..1dbc5d555dda 100644 --- a/crates/polars-lazy/src/physical_plan/streaming/construct_pipeline.rs +++ b/crates/polars-lazy/src/physical_plan/streaming/construct_pipeline.rs @@ -241,7 +241,6 @@ fn get_pipeline_node( df: Arc::new(DataFrame::empty()), schema: Arc::new(Schema::new()), output_schema: None, - projection: None, filter: None, }); diff --git a/crates/polars-lazy/src/tests/cse.rs b/crates/polars-lazy/src/tests/cse.rs index ae4ceaa70243..c8cb65564ffd 100644 --- a/crates/polars-lazy/src/tests/cse.rs +++ b/crates/polars-lazy/src/tests/cse.rs @@ -109,7 +109,7 @@ fn test_cse_cache_union_projection_pd() -> PolarsResult<()> { true }, DataFrameScan { - projection: Some(projection), + output_schema: Some(projection), .. } => projection.as_ref().len() <= 2, DataFrameScan { .. } => false, diff --git a/crates/polars-lazy/src/tests/optimization_checks.rs b/crates/polars-lazy/src/tests/optimization_checks.rs index 5d96890fb66e..e11c0b5014cf 100644 --- a/crates/polars-lazy/src/tests/optimization_checks.rs +++ b/crates/polars-lazy/src/tests/optimization_checks.rs @@ -480,11 +480,10 @@ fn test_with_column_prune() -> PolarsResult<()> { (&lp_arena).iter(lp).for_each(|(_, lp)| { use IR::*; match lp { - DataFrameScan { projection, .. } => { - let projection = projection.as_ref().unwrap(); - let projection = projection.as_ref(); + DataFrameScan { output_schema, .. } => { + let projection = output_schema.as_ref().unwrap(); assert_eq!(projection.len(), 1); - let name = &projection[0]; + let name = projection.get_at_index(0).unwrap().0; assert_eq!(name, "c1"); }, HStack { exprs, .. } => { @@ -503,7 +502,7 @@ fn test_with_column_prune() -> PolarsResult<()> { assert!((&lp_arena).iter(lp).all(|(_, lp)| { use IR::*; - matches!(lp, IR::SimpleProjection { .. } | DataFrameScan { .. }) + matches!(lp, SimpleProjection { .. } | DataFrameScan { .. }) })); assert_eq!( q.schema().unwrap().as_ref(), diff --git a/crates/polars-pipe/src/pipeline/convert.rs b/crates/polars-pipe/src/pipeline/convert.rs index bcd47c59d122..5d6e65774241 100644 --- a/crates/polars-pipe/src/pipeline/convert.rs +++ b/crates/polars-pipe/src/pipeline/convert.rs @@ -53,7 +53,6 @@ where match source { DataFrameScan { df, - projection, filter: selection, output_schema, .. @@ -67,8 +66,9 @@ where operator_objects.push(op) } // projection is free - if let Some(projection) = projection { - df = df.select(projection.as_ref())?; + if let Some(schema) = output_schema { + let columns = schema.iter_names().cloned().collect::>(); + df = df._select_impl_unchecked(&columns)?; } } Ok(Box::new(sources::DataFrameSource::from_df(df)) as Box) diff --git a/crates/polars-plan/src/logical_plan/builder_dsl.rs b/crates/polars-plan/src/logical_plan/builder_dsl.rs index bb3ef85c666b..2f52997a7ede 100644 --- a/crates/polars-plan/src/logical_plan/builder_dsl.rs +++ b/crates/polars-plan/src/logical_plan/builder_dsl.rs @@ -312,7 +312,6 @@ impl DslBuilder { df: Arc::new(df), schema, output_schema: None, - projection: None, filter: None, } .into() diff --git a/crates/polars-plan/src/logical_plan/conversion/dsl_to_ir.rs b/crates/polars-plan/src/logical_plan/conversion/dsl_to_ir.rs index 651a09369093..a0feae3e11d9 100644 --- a/crates/polars-plan/src/logical_plan/conversion/dsl_to_ir.rs +++ b/crates/polars-plan/src/logical_plan/conversion/dsl_to_ir.rs @@ -19,7 +19,6 @@ fn empty_df() -> IR { df: Arc::new(Default::default()), schema: Arc::new(Default::default()), output_schema: None, - projection: None, filter: None, } } @@ -203,13 +202,11 @@ pub fn to_alp_impl( df, schema, output_schema, - projection, filter: selection, } => IR::DataFrameScan { df, schema, output_schema, - projection, filter: selection.map(|expr| to_expr_ir(expr, expr_arena)), }, DslPlan::Select { diff --git a/crates/polars-plan/src/logical_plan/conversion/mod.rs b/crates/polars-plan/src/logical_plan/conversion/mod.rs index e9b5664ae019..29a165111046 100644 --- a/crates/polars-plan/src/logical_plan/conversion/mod.rs +++ b/crates/polars-plan/src/logical_plan/conversion/mod.rs @@ -99,13 +99,11 @@ impl IR { df, schema, output_schema, - projection, filter: selection, } => DslPlan::DataFrameScan { df, schema, output_schema, - projection, filter: selection.map(|e| e.to_expr(expr_arena)), }, IR::Select { diff --git a/crates/polars-plan/src/logical_plan/ir/dot.rs b/crates/polars-plan/src/logical_plan/ir/dot.rs index 4c3d7202cb42..55fc9a99afa0 100644 --- a/crates/polars-plan/src/logical_plan/ir/dot.rs +++ b/crates/polars-plan/src/logical_plan/ir/dot.rs @@ -1,6 +1,8 @@ use std::fmt; use std::path::PathBuf; +use polars_core::schema::Schema; + use super::format::ExprIRSliceDisplay; use crate::constants::UNLIMITED_CACHE; use crate::prelude::ir::format::ColumnsDisplay; @@ -227,11 +229,11 @@ impl<'a> IRDotDisplay<'a> { }, DataFrameScan { schema, - projection, + output_schema, filter: selection, .. } => { - let num_columns = NumColumns(projection.as_ref().map(|p| p.as_ref())); + let num_columns = NumColumnsSchema(output_schema.as_ref().map(|p| p.as_ref())); let selection = selection.as_ref().map(|e| self.display_expr(e)); let selection = OptionExprIRDisplay(selection); let total_columns = schema.len(); @@ -331,6 +333,7 @@ impl<'a> IRDotDisplay<'a> { // A few utility structures for formatting struct PathsDisplay<'a>(&'a [PathBuf]); struct NumColumns<'a>(Option<&'a [String]>); +struct NumColumnsSchema<'a>(Option<&'a Schema>); struct OptionExprIRDisplay<'a>(Option>); impl fmt::Display for PathsDisplay<'_> { @@ -357,6 +360,15 @@ impl fmt::Display for NumColumns<'_> { } } +impl fmt::Display for NumColumnsSchema<'_> { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + match self.0 { + None => f.write_str("*"), + Some(columns) => columns.len().fmt(f), + } + } +} + impl fmt::Display for OptionExprIRDisplay<'_> { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { match self.0 { diff --git a/crates/polars-plan/src/logical_plan/ir/format.rs b/crates/polars-plan/src/logical_plan/ir/format.rs index 9f215798e8f8..78ecbb034ad6 100644 --- a/crates/polars-plan/src/logical_plan/ir/format.rs +++ b/crates/polars-plan/src/logical_plan/ir/format.rs @@ -249,12 +249,12 @@ impl<'a> IRDisplay<'a> { }, DataFrameScan { schema, - projection, + output_schema, filter: selection, .. } => { let total_columns = schema.len(); - let n_columns = if let Some(columns) = projection { + let n_columns = if let Some(columns) = output_schema { columns.len().to_string() } else { "*".to_string() diff --git a/crates/polars-plan/src/logical_plan/ir/inputs.rs b/crates/polars-plan/src/logical_plan/ir/inputs.rs index e3b829bd6505..7d1bf84bb8a5 100644 --- a/crates/polars-plan/src/logical_plan/ir/inputs.rs +++ b/crates/polars-plan/src/logical_plan/ir/inputs.rs @@ -126,7 +126,6 @@ impl IR { df, schema, output_schema, - projection, filter: selection, } => { let mut new_selection = None; @@ -138,7 +137,6 @@ impl IR { df: df.clone(), schema: schema.clone(), output_schema: output_schema.clone(), - projection: projection.clone(), filter: new_selection, } }, diff --git a/crates/polars-plan/src/logical_plan/ir/mod.rs b/crates/polars-plan/src/logical_plan/ir/mod.rs index a288c8c7c0ff..8f7cbd9089cc 100644 --- a/crates/polars-plan/src/logical_plan/ir/mod.rs +++ b/crates/polars-plan/src/logical_plan/ir/mod.rs @@ -60,9 +60,11 @@ pub enum IR { DataFrameScan { df: Arc, schema: SchemaRef, - // schema of the projected file + // Schema of the projected file + // If `None`, no projection is applied output_schema: Option, - projection: Option>, + // Predicate to apply on the DataFrame + // All the columns required for the predicate are projected. filter: Option, }, // Only selects columns (semantically only has row access). diff --git a/crates/polars-plan/src/logical_plan/ir/tree_format.rs b/crates/polars-plan/src/logical_plan/ir/tree_format.rs index ca4bf5b8edd5..fae4302d5b26 100644 --- a/crates/polars-plan/src/logical_plan/ir/tree_format.rs +++ b/crates/polars-plan/src/logical_plan/ir/tree_format.rs @@ -192,7 +192,7 @@ impl<'a> TreeFmtNode<'a> { Scan { .. } => ND(wh(h, &lp.describe()), vec![]), DataFrameScan { schema, - projection, + output_schema, filter: selection, .. } => ND( @@ -201,7 +201,7 @@ impl<'a> TreeFmtNode<'a> { &format!( "DF {:?}\nPROJECT {}/{} COLUMNS", schema.iter_names().take(4).collect::>(), - if let Some(columns) = projection { + if let Some(columns) = output_schema { format!("{}", columns.len()) } else { "*".to_string() diff --git a/crates/polars-plan/src/logical_plan/mod.rs b/crates/polars-plan/src/logical_plan/mod.rs index 080d56ff9d11..0086c105c4fa 100644 --- a/crates/polars-plan/src/logical_plan/mod.rs +++ b/crates/polars-plan/src/logical_plan/mod.rs @@ -91,7 +91,6 @@ pub enum DslPlan { schema: SchemaRef, // schema of the projected file output_schema: Option, - projection: Option>, filter: Option, }, /// Polars' `select` operation, this can mean projection, but also full data access. @@ -188,7 +187,7 @@ impl Clone for DslPlan { Self::Filter { input, predicate } => Self::Filter { input: input.clone(), predicate: predicate.clone() }, Self::Cache { input, id, cache_hits } => Self::Cache { input: input.clone(), id: id.clone(), cache_hits: cache_hits.clone() }, Self::Scan { paths, file_info, predicate, file_options, scan_type } => Self::Scan { paths: paths.clone(), file_info: file_info.clone(), predicate: predicate.clone(), file_options: file_options.clone(), scan_type: scan_type.clone() }, - Self::DataFrameScan { df, schema, output_schema, projection, filter: selection } => Self::DataFrameScan { df: df.clone(), schema: schema.clone(), output_schema: output_schema.clone(), projection: projection.clone(), filter: selection.clone() }, + Self::DataFrameScan { df, schema, output_schema, filter: selection } => Self::DataFrameScan { df: df.clone(), schema: schema.clone(), output_schema: output_schema.clone(), filter: selection.clone() }, Self::Select { expr, input, options } => Self::Select { expr: expr.clone(), input: input.clone(), options: options.clone() }, Self::GroupBy { input, keys, aggs, apply, maintain_order, options } => Self::GroupBy { input: input.clone(), keys: keys.clone(), aggs: aggs.clone(), apply: apply.clone(), maintain_order: maintain_order.clone(), options: options.clone() }, Self::Join { input_left, input_right, left_on, right_on, options } => Self::Join { input_left: input_left.clone(), input_right: input_right.clone(), left_on: left_on.clone(), right_on: right_on.clone(), options: options.clone() }, @@ -214,7 +213,6 @@ impl Default for DslPlan { df: Arc::new(df), schema: Arc::new(schema), output_schema: None, - projection: None, filter: None, } } diff --git a/crates/polars-plan/src/logical_plan/optimizer/count_star.rs b/crates/polars-plan/src/logical_plan/optimizer/count_star.rs index 4d1c3c7073af..d971b10bcf02 100644 --- a/crates/polars-plan/src/logical_plan/optimizer/count_star.rs +++ b/crates/polars-plan/src/logical_plan/optimizer/count_star.rs @@ -25,7 +25,6 @@ impl OptimizationRule for CountStar { df: Arc::new(Default::default()), schema: Arc::new(Default::default()), output_schema: None, - projection: None, filter: None, }; let placeholder_node = lp_arena.add(placeholder); diff --git a/crates/polars-plan/src/logical_plan/optimizer/predicate_pushdown/mod.rs b/crates/polars-plan/src/logical_plan/optimizer/predicate_pushdown/mod.rs index bcba865a9c0d..e6899af7a4d1 100644 --- a/crates/polars-plan/src/logical_plan/optimizer/predicate_pushdown/mod.rs +++ b/crates/polars-plan/src/logical_plan/optimizer/predicate_pushdown/mod.rs @@ -307,7 +307,6 @@ impl<'a> PredicatePushDown<'a> { df, schema, output_schema, - projection, filter: selection, } => { let selection = predicate_at_scan(acc_predicates, selection, expr_arena); @@ -315,7 +314,6 @@ impl<'a> PredicatePushDown<'a> { df, schema, output_schema, - projection, filter: selection, }; Ok(lp) @@ -390,7 +388,6 @@ impl<'a> PredicatePushDown<'a> { df: Arc::new(df), schema: schema.clone(), output_schema: None, - projection: None, filter: None, }); } else { diff --git a/crates/polars-plan/src/logical_plan/optimizer/projection_pushdown/mod.rs b/crates/polars-plan/src/logical_plan/optimizer/projection_pushdown/mod.rs index 0a44d989afd8..283ebd611c4d 100644 --- a/crates/polars-plan/src/logical_plan/optimizer/projection_pushdown/mod.rs +++ b/crates/polars-plan/src/logical_plan/optimizer/projection_pushdown/mod.rs @@ -357,7 +357,6 @@ impl ProjectionPushDown { filter: selection, .. } => { - let mut projection = None; if !acc_projections.is_empty() { output_schema = Some(Arc::new(update_scan_schema( &acc_projections, @@ -365,13 +364,11 @@ impl ProjectionPushDown { &schema, false, )?)); - projection = get_scan_columns(&mut acc_projections, expr_arena, None); } let lp = DataFrameScan { df, schema, output_schema, - projection, filter: selection, }; Ok(lp) diff --git a/crates/polars-plan/src/logical_plan/visitor/hash.rs b/crates/polars-plan/src/logical_plan/visitor/hash.rs index d040def75435..4538375b8a2c 100644 --- a/crates/polars-plan/src/logical_plan/visitor/hash.rs +++ b/crates/polars-plan/src/logical_plan/visitor/hash.rs @@ -90,12 +90,11 @@ impl Hash for HashableEqLP<'_> { IR::DataFrameScan { df, schema: _, - output_schema: _, - projection, + output_schema, filter: selection, } => { (Arc::as_ptr(df) as usize).hash(state); - projection.hash(state); + output_schema.hash(state); hash_option_expr(selection, self.expr_arena, state); }, IR::SimpleProjection { columns, input: _ } => { @@ -279,20 +278,18 @@ impl HashableEqLP<'_> { IR::DataFrameScan { df: dfl, schema: _, - output_schema: _, - projection: pl, + output_schema: s_l, filter: sl, }, IR::DataFrameScan { df: dfr, schema: _, - output_schema: _, - projection: pr, + output_schema: s_r, filter: sr, }, ) => { Arc::as_ptr(dfl) == Arc::as_ptr(dfr) - && pl == pr + && s_l == s_r && opt_expr_ir_eq(sl, sr, self.expr_arena) }, ( diff --git a/crates/polars-stream/src/physical_plan/lower_ir.rs b/crates/polars-stream/src/physical_plan/lower_ir.rs index f867e15789ea..47b6036e4bfe 100644 --- a/crates/polars-stream/src/physical_plan/lower_ir.rs +++ b/crates/polars-stream/src/physical_plan/lower_ir.rs @@ -26,7 +26,6 @@ pub fn lower_ir( IR::DataFrameScan { df, output_schema, - projection, filter, .. } => { @@ -38,12 +37,10 @@ pub fn lower_ir( let mut phys_node = phys_sm.insert(PhysNode::DataFrameScan { df: df.clone() }); - if projection.is_some() { - // TODO: normalize output_schema <-> projection so we don't have to unwrap here. - let schema = output_schema.clone().unwrap(); + if let Some(schema) = output_schema { phys_node = phys_sm.insert(PhysNode::SimpleProjection { input: phys_node, - schema, + schema: schema.clone(), }) } diff --git a/py-polars/polars/lazyframe/frame.py b/py-polars/polars/lazyframe/frame.py index 6cf6e3a46835..9fec88719fee 100644 --- a/py-polars/polars/lazyframe/frame.py +++ b/py-polars/polars/lazyframe/frame.py @@ -607,7 +607,7 @@ def serialize(self, file: IOBase | str | Path | None = None) -> str | None: >>> lf = pl.LazyFrame({"a": [1, 2, 3]}).sum() >>> json = lf.serialize() >>> json - '{"MapFunction":{"input":{"DataFrameScan":{"df":{"columns":[{"name":"a","datatype":"Int64","bit_settings":"","values":[1,2,3]}]},"schema":{"inner":{"a":"Int64"}},"output_schema":null,"projection":null,"filter":null}},"function":{"Stats":"Sum"}}}' + '{"MapFunction":{"input":{"DataFrameScan":{"df":{"columns":[{"name":"a","datatype":"Int64","bit_settings":"","values":[1,2,3]}]},"schema":{"inner":{"a":"Int64"}},"output_schema":null,"filter":null}},"function":{"Stats":"Sum"}}}' The logical plan can later be deserialized back into a LazyFrame. diff --git a/py-polars/src/lazyframe/visitor/nodes.rs b/py-polars/src/lazyframe/visitor/nodes.rs index cec4752d2a4b..538e4d97ea1d 100644 --- a/py-polars/src/lazyframe/visitor/nodes.rs +++ b/py-polars/src/lazyframe/visitor/nodes.rs @@ -320,14 +320,19 @@ pub(crate) fn into_py(py: Python<'_>, plan: &IR) -> PyResult { IR::DataFrameScan { df, schema: _, - output_schema: _, - projection, + output_schema, filter: selection, } => DataFrameScan { df: PyDataFrame::new((**df).clone()), - projection: projection - .as_ref() - .map_or_else(|| py.None(), |f| f.to_object(py)), + projection: output_schema.as_ref().map_or_else( + || py.None(), + |s| { + s.iter_names() + .map(|s| s.as_str()) + .collect::>() + .to_object(py) + }, + ), selection: selection.as_ref().map(|e| e.into()), } .into_py(py),