From 92babdd1ab07d3eafcdcbb362772827e460a29e2 Mon Sep 17 00:00:00 2001 From: ritchie Date: Tue, 28 May 2024 11:48:36 +0200 Subject: [PATCH 1/3] WIP: schema cache [skip ci] --- crates/polars-lazy/src/frame/mod.rs | 104 +++++++++++++++--- .../src/tests/optimization_checks.rs | 2 +- .../src/logical_plan/conversion/dsl_to_ir.rs | 9 ++ crates/polars-plan/src/logical_plan/mod.rs | 7 ++ crates/polars-plan/src/logical_plan/schema.rs | 18 +-- crates/polars-sql/src/context.rs | 10 +- crates/polars-sql/src/sql_expr.rs | 2 +- crates/polars-utils/src/arena.rs | 1 + py-polars/src/lazyframe/mod.rs | 10 +- 9 files changed, 125 insertions(+), 38 deletions(-) diff --git a/crates/polars-lazy/src/frame/mod.rs b/crates/polars-lazy/src/frame/mod.rs index 03e48a059183..9f1cfb194bfd 100644 --- a/crates/polars-lazy/src/frame/mod.rs +++ b/crates/polars-lazy/src/frame/mod.rs @@ -15,7 +15,7 @@ pub mod pivot; feature = "json" ))] use std::path::PathBuf; -use std::sync::Arc; +use std::sync::{Arc, Mutex}; pub use anonymous_scan::*; #[cfg(feature = "csv")] @@ -55,6 +55,7 @@ impl IntoLazy for DataFrame { LazyFrame { logical_plan: lp, opt_state: Default::default(), + cached_arena: Default::default(), } } } @@ -65,6 +66,11 @@ impl IntoLazy for LazyFrame { } } +struct CachedArena { + lp_arena: Arena, + expr_arena: Arena, +} + /// Lazy abstraction over an eager `DataFrame`. /// It really is an abstraction over a logical plan. The methods of this struct will incrementally /// modify a logical plan until output is requested (via [`collect`](crate::frame::LazyFrame::collect)). @@ -73,6 +79,7 @@ impl IntoLazy for LazyFrame { pub struct LazyFrame { pub logical_plan: DslPlan, pub(crate) opt_state: OptState, + cached_arena: Arc>>, } impl From for LazyFrame { @@ -83,6 +90,7 @@ impl From for LazyFrame { file_caching: true, ..Default::default() }, + cached_arena: Default::default(), } } } @@ -93,8 +101,63 @@ impl LazyFrame { /// /// Returns an `Err` if the logical plan has already encountered an error (i.e., if /// `self.collect()` would fail), `Ok` otherwise. - pub fn schema(&self) -> PolarsResult { - self.logical_plan.compute_schema() + pub fn schema(&mut self) -> PolarsResult { + let mut cached_arenas = self.cached_arena.lock().unwrap(); + + match &mut *cached_arenas { + None => { + let mut lp_arena = Default::default(); + let mut expr_arena = Default::default(); + let node = to_alp( + self.logical_plan.clone(), + &mut expr_arena, + &mut lp_arena, + false, + true, + )?; + + let schema = lp_arena.get(node).schema(&lp_arena).into_owned(); + + // Cache the logical plan and the arenas, so that next schema call is cheap. + self.logical_plan = DslPlan::IR {node: Some(node), dsl: Arc::new(self.logical_plan.clone())}; + dbg!("set", lp_arena.len()); + *cached_arenas = Some(CachedArena { + lp_arena, + expr_arena, + }); + + Ok(schema) + }, + Some(arenas) => { + match self.logical_plan { + // We have got arenas and don't need to convert the DSL. + DslPlan::IR {node: Some(node), ..} => { + Ok(arenas + .lp_arena + .get(node) + .schema(&arenas.lp_arena) + .into_owned()) + }, + _ => { + // We have got arenas, but still need to convert (parts) of the DSL. + let node = to_alp( + self.logical_plan.clone(), + &mut arenas.expr_arena, + &mut arenas.lp_arena, + false, + true, + )?; + + dbg!("set", arenas.lp_arena.len()); + let schema = arenas.lp_arena.get(node).schema(&arenas.lp_arena).into_owned(); + // Cache the logical plan so that next schema call is cheap. + self.logical_plan = DslPlan::IR {node: Some(node), dsl: Arc::new(self.logical_plan.clone())}; + + Ok(schema) + } + } + }, + } } pub(crate) fn get_plan_builder(self) -> DslBuilder { @@ -109,6 +172,7 @@ impl LazyFrame { LazyFrame { logical_plan, opt_state, + cached_arena: Default::default(), } } @@ -516,17 +580,19 @@ impl LazyFrame { self.optimize_with_scratch(lp_arena, expr_arena, &mut vec![], false) } - pub fn to_alp_optimized(self) -> PolarsResult { - let mut lp_arena = Arena::with_capacity(16); - let mut expr_arena = Arena::with_capacity(16); + pub fn to_alp_optimized(mut self) -> PolarsResult { + let (mut lp_arena, mut expr_arena) = self.get_arenas(); let node = self.optimize_with_scratch(&mut lp_arena, &mut expr_arena, &mut vec![], false)?; Ok(IRPlan::new(node, lp_arena, expr_arena)) } - pub fn to_alp(self) -> PolarsResult { - self.logical_plan.to_alp() + pub fn to_alp(mut self) -> PolarsResult { + let (mut lp_arena, mut expr_arena) = self.get_arenas(); + let node = to_alp(self.logical_plan, &mut expr_arena, &mut lp_arena, true, true)?; + let plan = IRPlan::new(node, lp_arena, expr_arena); + Ok(plan) } pub(crate) fn optimize_with_scratch( @@ -587,16 +653,27 @@ impl LazyFrame { Ok(lp_top) } + fn get_arenas(&mut self) -> (Arena, Arena) { + match self.cached_arena.lock().unwrap().as_mut() { + Some(arenas) => { + (arenas.lp_arena.clone(), arenas.expr_arena.clone()) + }, + None => { + (Arena::with_capacity(16), Arena::with_capacity(16)) + } + } + } + fn prepare_collect_post_opt

( - self, + mut self, check_sink: bool, post_opt: P, ) -> PolarsResult<(ExecutionState, Box, bool)> where P: Fn(Node, &mut Arena, &mut Arena) -> PolarsResult<()>, { - let mut expr_arena = Arena::with_capacity(16); - let mut lp_arena = Arena::with_capacity(16); + let (mut lp_arena, mut expr_arena) = self.get_arenas(); + let mut scratch = vec![]; let lp_top = self.optimize_with_scratch(&mut lp_arena, &mut expr_arena, &mut scratch, false)?; @@ -935,7 +1012,7 @@ impl LazyFrame { /// *group_by_dynamic* #[cfg(feature = "dynamic_group_by")] pub fn rolling>( - self, + mut self, index_column: Expr, group_by: E, mut options: RollingGroupOptions, @@ -980,7 +1057,7 @@ impl LazyFrame { /// with a ordinary group_by on these keys. #[cfg(feature = "dynamic_group_by")] pub fn group_by_dynamic>( - self, + mut self, index_column: Expr, group_by: E, mut options: DynamicGroupOptions, @@ -1672,6 +1749,7 @@ impl From for LazyFrame { Self { logical_plan: lgb.logical_plan, opt_state: lgb.opt_state, + cached_arena: Default::default(), } } } diff --git a/crates/polars-lazy/src/tests/optimization_checks.rs b/crates/polars-lazy/src/tests/optimization_checks.rs index 2bb0a0727dd3..293496c52f6b 100644 --- a/crates/polars-lazy/src/tests/optimization_checks.rs +++ b/crates/polars-lazy/src/tests/optimization_checks.rs @@ -485,7 +485,7 @@ fn test_with_column_prune() -> PolarsResult<()> { }); // whole `with_columns` pruned - let q = df.lazy().with_column(col("c0")).select([col("c1")]); + let mut q = df.lazy().with_column(col("c0")).select([col("c1")]); let lp = q.clone().optimize(&mut lp_arena, &mut expr_arena).unwrap(); diff --git a/crates/polars-plan/src/logical_plan/conversion/dsl_to_ir.rs b/crates/polars-plan/src/logical_plan/conversion/dsl_to_ir.rs index 86754f11d02a..24c0aa4b3539 100644 --- a/crates/polars-plan/src/logical_plan/conversion/dsl_to_ir.rs +++ b/crates/polars-plan/src/logical_plan/conversion/dsl_to_ir.rs @@ -79,6 +79,7 @@ pub fn to_alp_impl( Ok(lp_node) } + dbg!(lp_arena.len()); let v = match lp { DslPlan::Scan { @@ -565,6 +566,14 @@ pub fn to_alp_impl( .map_err(|e| e.context(failed_input!(sink)))?; IR::Sink { input, payload } }, + DslPlan::IR { node, dsl } => { + dbg!("IR", lp_arena.len()); + return if let Some(node) = node { + Ok(node) + } else { + to_alp_impl(owned(dsl), expr_arena, lp_arena, convert) + } + }, }; Ok(lp_arena.add(v)) } diff --git a/crates/polars-plan/src/logical_plan/mod.rs b/crates/polars-plan/src/logical_plan/mod.rs index a52115fcd01d..e0ebebb1a212 100644 --- a/crates/polars-plan/src/logical_plan/mod.rs +++ b/crates/polars-plan/src/logical_plan/mod.rs @@ -167,6 +167,12 @@ pub enum DslPlan { input: Arc, payload: SinkType, }, + IR { + #[cfg_attr(feature = "serde", serde(skip))] + node: Option, + // Keep the original Dsl around as we need that for serialization. + dsl: Arc, + }, } impl Clone for DslPlan { @@ -195,6 +201,7 @@ impl Clone for DslPlan { Self::HConcat { inputs, options } => Self::HConcat { inputs: inputs.clone(), options: options.clone() }, Self::ExtContext { input, contexts, } => Self::ExtContext { input: input.clone(), contexts: contexts.clone() }, Self::Sink { input, payload } => Self::Sink { input: input.clone(), payload: payload.clone() }, + Self::IR {node, dsl} => Self::IR {node: *node, dsl: dsl.clone()} } } } diff --git a/crates/polars-plan/src/logical_plan/schema.rs b/crates/polars-plan/src/logical_plan/schema.rs index f3e3eabddf19..e589f95a8b64 100644 --- a/crates/polars-plan/src/logical_plan/schema.rs +++ b/crates/polars-plan/src/logical_plan/schema.rs @@ -18,22 +18,10 @@ impl DslPlan { /// Compute the schema. This requires conversion to [`IR`] and type-resolving. pub fn compute_schema(&self) -> PolarsResult { - let opt_state = OptState { - eager: true, - type_coercion: true, - simplify_expr: false, - ..Default::default() - }; - let mut lp_arena = Default::default(); - let node = optimize( - self.clone(), - opt_state, - &mut lp_arena, - &mut Default::default(), - &mut Default::default(), - Default::default(), - )?; + let mut expr_arena = Default::default(); + let node = to_alp(self.clone(), &mut expr_arena, &mut lp_arena, false, true)?; + Ok(lp_arena.get(node).schema(&lp_arena).into_owned()) } } diff --git a/crates/polars-sql/src/context.rs b/crates/polars-sql/src/context.rs index 46e1c055f645..0b2de194a83a 100644 --- a/crates/polars-sql/src/context.rs +++ b/crates/polars-sql/src/context.rs @@ -694,7 +694,11 @@ impl SQLContext { Ok((tbl_name, lf)) } - fn process_order_by(&mut self, lf: LazyFrame, ob: &[OrderByExpr]) -> PolarsResult { + fn process_order_by( + &mut self, + mut lf: LazyFrame, + ob: &[OrderByExpr], + ) -> PolarsResult { let mut by = Vec::with_capacity(ob.len()); let mut descending = Vec::with_capacity(ob.len()); @@ -718,7 +722,7 @@ impl SQLContext { fn process_group_by( &mut self, - lf: LazyFrame, + mut lf: LazyFrame, contains_wildcard: bool, group_by_keys: &[Expr], projections: &[Expr], @@ -846,7 +850,7 @@ impl SQLContext { let idents = idents.as_slice(); let e = match idents { [tbl_name] => { - let lf = self.table_map.get(&tbl_name.value).ok_or_else(|| { + let lf = self.table_map.get_mut(&tbl_name.value).ok_or_else(|| { polars_err!( ComputeError: "no table named '{}' found", tbl_name diff --git a/crates/polars-sql/src/sql_expr.rs b/crates/polars-sql/src/sql_expr.rs index 480b0cce23c3..c18b79eb4f87 100644 --- a/crates/polars-sql/src/sql_expr.rs +++ b/crates/polars-sql/src/sql_expr.rs @@ -338,7 +338,7 @@ impl SQLExprVisitor<'_> { fn visit_compound_identifier(&self, idents: &[Ident]) -> PolarsResult { match idents { [tbl_name, column_name] => { - let lf = self + let mut lf = self .ctx .get_table_from_current_scope(&tbl_name.value) .ok_or_else(|| { diff --git a/crates/polars-utils/src/arena.rs b/crates/polars-utils/src/arena.rs index a0073b6598cf..2e4505c3b060 100644 --- a/crates/polars-utils/src/arena.rs +++ b/crates/polars-utils/src/arena.rs @@ -79,6 +79,7 @@ impl Arena { #[inline] pub fn get(&self, idx: Node) -> &T { + dbg!(idx.0); self.items.get(idx.0).unwrap() } diff --git a/py-polars/src/lazyframe/mod.rs b/py-polars/src/lazyframe/mod.rs index ec5461398c28..7aa96c9640a9 100644 --- a/py-polars/src/lazyframe/mod.rs +++ b/py-polars/src/lazyframe/mod.rs @@ -33,7 +33,7 @@ pub struct PyLazyFrame { } impl PyLazyFrame { - fn get_schema(&self) -> PyResult { + fn get_schema(&mut self) -> PyResult { let schema = self.ldf.schema().map_err(PyPolarsErr::from)?; Ok(schema) } @@ -1165,13 +1165,13 @@ impl PyLazyFrame { self.ldf.clone().into() } - fn columns(&self, py: Python) -> PyResult { + fn columns(&mut self, py: Python) -> PyResult { let schema = self.get_schema()?; let iter = schema.iter_names().map(|s| s.as_str()); Ok(PyList::new_bound(py, iter).to_object(py)) } - fn dtypes(&self, py: Python) -> PyResult { + fn dtypes(&mut self, py: Python) -> PyResult { let schema = self.get_schema()?; let iter = schema .iter_dtypes() @@ -1179,7 +1179,7 @@ impl PyLazyFrame { Ok(PyList::new_bound(py, iter).to_object(py)) } - fn schema(&self, py: Python) -> PyResult { + fn schema(&mut self, py: Python) -> PyResult { let schema = self.get_schema()?; let schema_dict = PyDict::new_bound(py); @@ -1195,7 +1195,7 @@ impl PyLazyFrame { self.ldf.clone().unnest(columns).into() } - fn width(&self) -> PyResult { + fn width(&mut self) -> PyResult { Ok(self.get_schema()?.len()) } From a7b4dfdcd98f7661c7d6bce2013059880b94a675 Mon Sep 17 00:00:00 2001 From: ritchie Date: Tue, 28 May 2024 14:05:52 +0200 Subject: [PATCH 2/3] partial schema cache --- crates/polars-lazy/src/frame/mod.rs | 62 +++++++++++-------- .../src/logical_plan/conversion/dsl_to_ir.rs | 6 +- crates/polars-plan/src/logical_plan/mod.rs | 3 +- crates/polars-utils/src/arena.rs | 17 ++++- 4 files changed, 56 insertions(+), 32 deletions(-) diff --git a/crates/polars-lazy/src/frame/mod.rs b/crates/polars-lazy/src/frame/mod.rs index 9f1cfb194bfd..66a757721872 100644 --- a/crates/polars-lazy/src/frame/mod.rs +++ b/crates/polars-lazy/src/frame/mod.rs @@ -66,11 +66,6 @@ impl IntoLazy for LazyFrame { } } -struct CachedArena { - lp_arena: Arena, - expr_arena: Arena, -} - /// Lazy abstraction over an eager `DataFrame`. /// It really is an abstraction over a logical plan. The methods of this struct will incrementally /// modify a logical plan until output is requested (via [`collect`](crate::frame::LazyFrame::collect)). @@ -95,6 +90,11 @@ impl From for LazyFrame { } } +struct CachedArena { + lp_arena: Arena, + expr_arena: Arena, +} + impl LazyFrame { /// Get a handle to the schema — a map from column names to data types — of the current /// `LazyFrame` computation. @@ -119,8 +119,11 @@ impl LazyFrame { let schema = lp_arena.get(node).schema(&lp_arena).into_owned(); // Cache the logical plan and the arenas, so that next schema call is cheap. - self.logical_plan = DslPlan::IR {node: Some(node), dsl: Arc::new(self.logical_plan.clone())}; - dbg!("set", lp_arena.len()); + self.logical_plan = DslPlan::IR { + node: Some(node), + dsl: Arc::new(self.logical_plan.clone()), + version: lp_arena.version(), + }; *cached_arenas = Some(CachedArena { lp_arena, expr_arena, @@ -131,13 +134,13 @@ impl LazyFrame { Some(arenas) => { match self.logical_plan { // We have got arenas and don't need to convert the DSL. - DslPlan::IR {node: Some(node), ..} => { - Ok(arenas - .lp_arena - .get(node) - .schema(&arenas.lp_arena) - .into_owned()) - }, + DslPlan::IR { + node: Some(node), .. + } => Ok(arenas + .lp_arena + .get(node) + .schema(&arenas.lp_arena) + .into_owned()), _ => { // We have got arenas, but still need to convert (parts) of the DSL. let node = to_alp( @@ -148,13 +151,20 @@ impl LazyFrame { true, )?; - dbg!("set", arenas.lp_arena.len()); - let schema = arenas.lp_arena.get(node).schema(&arenas.lp_arena).into_owned(); + let schema = arenas + .lp_arena + .get(node) + .schema(&arenas.lp_arena) + .into_owned(); // Cache the logical plan so that next schema call is cheap. - self.logical_plan = DslPlan::IR {node: Some(node), dsl: Arc::new(self.logical_plan.clone())}; + self.logical_plan = DslPlan::IR { + node: Some(node), + dsl: Arc::new(self.logical_plan.clone()), + version: arenas.lp_arena.version(), + }; Ok(schema) - } + }, } }, } @@ -590,7 +600,13 @@ impl LazyFrame { pub fn to_alp(mut self) -> PolarsResult { let (mut lp_arena, mut expr_arena) = self.get_arenas(); - let node = to_alp(self.logical_plan, &mut expr_arena, &mut lp_arena, true, true)?; + let node = to_alp( + self.logical_plan, + &mut expr_arena, + &mut lp_arena, + true, + true, + )?; let plan = IRPlan::new(node, lp_arena, expr_arena); Ok(plan) } @@ -655,12 +671,8 @@ impl LazyFrame { fn get_arenas(&mut self) -> (Arena, Arena) { match self.cached_arena.lock().unwrap().as_mut() { - Some(arenas) => { - (arenas.lp_arena.clone(), arenas.expr_arena.clone()) - }, - None => { - (Arena::with_capacity(16), Arena::with_capacity(16)) - } + Some(arenas) => (arenas.lp_arena.clone(), arenas.expr_arena.clone()), + None => (Arena::with_capacity(16), Arena::with_capacity(16)), } } diff --git a/crates/polars-plan/src/logical_plan/conversion/dsl_to_ir.rs b/crates/polars-plan/src/logical_plan/conversion/dsl_to_ir.rs index 24c0aa4b3539..786d22f06d37 100644 --- a/crates/polars-plan/src/logical_plan/conversion/dsl_to_ir.rs +++ b/crates/polars-plan/src/logical_plan/conversion/dsl_to_ir.rs @@ -79,7 +79,6 @@ pub fn to_alp_impl( Ok(lp_node) } - dbg!(lp_arena.len()); let v = match lp { DslPlan::Scan { @@ -566,9 +565,8 @@ pub fn to_alp_impl( .map_err(|e| e.context(failed_input!(sink)))?; IR::Sink { input, payload } }, - DslPlan::IR { node, dsl } => { - dbg!("IR", lp_arena.len()); - return if let Some(node) = node { + DslPlan::IR { node, dsl, version } => { + return if let (true, Some(node)) = (version == lp_arena.version(), node) { Ok(node) } else { to_alp_impl(owned(dsl), expr_arena, lp_arena, convert) diff --git a/crates/polars-plan/src/logical_plan/mod.rs b/crates/polars-plan/src/logical_plan/mod.rs index e0ebebb1a212..313d6cbe6644 100644 --- a/crates/polars-plan/src/logical_plan/mod.rs +++ b/crates/polars-plan/src/logical_plan/mod.rs @@ -170,6 +170,7 @@ pub enum DslPlan { IR { #[cfg_attr(feature = "serde", serde(skip))] node: Option, + version: u32, // Keep the original Dsl around as we need that for serialization. dsl: Arc, }, @@ -201,7 +202,7 @@ impl Clone for DslPlan { Self::HConcat { inputs, options } => Self::HConcat { inputs: inputs.clone(), options: options.clone() }, Self::ExtContext { input, contexts, } => Self::ExtContext { input: input.clone(), contexts: contexts.clone() }, Self::Sink { input, payload } => Self::Sink { input: input.clone(), payload: payload.clone() }, - Self::IR {node, dsl} => Self::IR {node: *node, dsl: dsl.clone()} + Self::IR {node, dsl, version} => Self::IR {node: *node, dsl: dsl.clone(), version: *version} } } } diff --git a/crates/polars-utils/src/arena.rs b/crates/polars-utils/src/arena.rs index 2e4505c3b060..ccd09b66171e 100644 --- a/crates/polars-utils/src/arena.rs +++ b/crates/polars-utils/src/arena.rs @@ -1,3 +1,5 @@ +use std::sync::atomic::{AtomicU32, Ordering}; + use crate::error::*; use crate::slice::GetSaferUnchecked; @@ -27,8 +29,11 @@ impl Default for Node { } } +static ARENA_VERSION: AtomicU32 = AtomicU32::new(0); + #[derive(Clone)] pub struct Arena { + version: u32, items: Vec, } @@ -41,6 +46,11 @@ impl Default for Arena { /// Simple Arena implementation /// Allocates memory and stores item in a Vec. Only deallocates when being dropped itself. impl Arena { + #[inline] + pub fn version(&self) -> u32 { + self.version + } + pub fn add(&mut self, val: T) -> Node { let idx = self.items.len(); self.items.push(val); @@ -60,12 +70,16 @@ impl Arena { } pub fn new() -> Self { - Arena { items: vec![] } + Arena { + items: vec![], + version: ARENA_VERSION.fetch_add(1, Ordering::Relaxed), + } } pub fn with_capacity(cap: usize) -> Self { Arena { items: Vec::with_capacity(cap), + version: ARENA_VERSION.fetch_add(1, Ordering::Relaxed), } } @@ -79,7 +93,6 @@ impl Arena { #[inline] pub fn get(&self, idx: Node) -> &T { - dbg!(idx.0); self.items.get(idx.0).unwrap() } From edb9440ecc4a6880ecb92ff251057398bd65192a Mon Sep 17 00:00:00 2001 From: ritchie Date: Tue, 28 May 2024 14:18:38 +0200 Subject: [PATCH 3/3] keep caches in concat --- crates/polars-lazy/src/dsl/functions.rs | 19 +++++++++---------- crates/polars-lazy/src/frame/mod.rs | 16 ++++++++++++++-- 2 files changed, 23 insertions(+), 12 deletions(-) diff --git a/crates/polars-lazy/src/dsl/functions.rs b/crates/polars-lazy/src/dsl/functions.rs index ed7fcc84ede4..ec6c23fbad94 100644 --- a/crates/polars-lazy/src/dsl/functions.rs +++ b/crates/polars-lazy/src/dsl/functions.rs @@ -22,6 +22,7 @@ pub(crate) fn concat_impl>( ); let mut opt_state = lf.opt_state; + let cached_arenas = lf.cached_arena.clone(); let mut lps = Vec::with_capacity(inputs.len()); lps.push(lf.logical_plan); @@ -34,9 +35,7 @@ pub(crate) fn concat_impl>( } let lp = DslPlan::Union { inputs: lps, args }; - let mut lf = LazyFrame::from(lp); - lf.opt_state = opt_state; - Ok(lf) + Ok(LazyFrame::from_inner(lp, opt_state, cached_arenas)) } #[cfg(feature = "diagonal_concat")] @@ -56,9 +55,12 @@ pub fn concat_lf_horizontal>( args: UnionArgs, ) -> PolarsResult { let lfs = inputs.as_ref(); - let mut opt_state = lfs.first().map(|lf| lf.opt_state).ok_or_else( - || polars_err!(NoData: "Require at least one LazyFrame for horizontal concatenation"), - )?; + let (mut opt_state, cached_arena) = lfs + .first() + .map(|lf| (lf.opt_state, lf.cached_arena.clone())) + .ok_or_else( + || polars_err!(NoData: "Require at least one LazyFrame for horizontal concatenation"), + )?; for lf in &lfs[1..] { // ensure we enable file caching if any lf has it enabled @@ -72,10 +74,7 @@ pub fn concat_lf_horizontal>( inputs: lfs.iter().map(|lf| lf.logical_plan.clone()).collect(), options, }; - let mut lf = LazyFrame::from(lp); - lf.opt_state = opt_state; - - Ok(lf) + Ok(LazyFrame::from_inner(lp, opt_state, cached_arena)) } /// Concat multiple [`LazyFrame`]s vertically. diff --git a/crates/polars-lazy/src/frame/mod.rs b/crates/polars-lazy/src/frame/mod.rs index 66a757721872..fb79c19cf7c3 100644 --- a/crates/polars-lazy/src/frame/mod.rs +++ b/crates/polars-lazy/src/frame/mod.rs @@ -74,7 +74,7 @@ impl IntoLazy for LazyFrame { pub struct LazyFrame { pub logical_plan: DslPlan, pub(crate) opt_state: OptState, - cached_arena: Arc>>, + pub(crate) cached_arena: Arc>>, } impl From for LazyFrame { @@ -90,12 +90,24 @@ impl From for LazyFrame { } } -struct CachedArena { +pub(crate) struct CachedArena { lp_arena: Arena, expr_arena: Arena, } impl LazyFrame { + pub(crate) fn from_inner( + logical_plan: DslPlan, + opt_state: OptState, + cached_arena: Arc>>, + ) -> Self { + Self { + logical_plan, + opt_state, + cached_arena, + } + } + /// Get a handle to the schema — a map from column names to data types — of the current /// `LazyFrame` computation. ///