diff --git a/Cargo.lock b/Cargo.lock index acb1f7889821..4451eec39152 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3006,6 +3006,7 @@ dependencies = [ "raw-cpuid", "rayon", "smartstring", + "stacker", "sysinfo", "version_check", ] @@ -3076,6 +3077,15 @@ dependencies = [ "syn 1.0.109", ] +[[package]] +name = "psm" +version = "0.1.21" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5787f7cda34e3033a72192c018bc5883100330f362ef279a8cbccfce8bb4e874" +dependencies = [ + "cc", +] + [[package]] name = "py-polars" version = "0.20.16" @@ -3916,6 +3926,19 @@ dependencies = [ "log", ] +[[package]] +name = "stacker" +version = "0.1.15" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c886bd4480155fd3ef527d45e9ac8dd7118a898a46530b7b94c3e21866259fce" +dependencies = [ + "cc", + "cfg-if", + "libc", + "psm", + "winapi", +] + [[package]] name = "static_assertions" version = "1.1.0" diff --git a/Cargo.toml b/Cargo.toml index e0784d503851..e06e28faf4d9 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -71,6 +71,7 @@ simd-json = { version = "0.13", features = ["known-key"] } simdutf8 = "0.1.4" smartstring = "1" sqlparser = "0.39" +stacker = "0.1" streaming-iterator = "0.1.9" strength_reduce = "0.2" strum_macros = "0.25" diff --git a/crates/polars-plan/src/logical_plan/mod.rs b/crates/polars-plan/src/logical_plan/mod.rs index df855815e5b7..ed8d2614ce64 100644 --- a/crates/polars-plan/src/logical_plan/mod.rs +++ b/crates/polars-plan/src/logical_plan/mod.rs @@ -3,6 +3,7 @@ use std::path::PathBuf; use std::sync::{Arc, Mutex}; use polars_core::prelude::*; +use polars_utils::recursion::with_dynamic_stack; use crate::logical_plan::LogicalPlan::DataFrameScan; use crate::prelude::*; @@ -136,7 +137,6 @@ impl ErrorState { } // https://stackoverflow.com/questions/1031076/what-are-projection-and-selection -#[derive(Clone)] #[cfg_attr(feature = "serde", derive(Serialize, Deserialize))] pub enum LogicalPlan { #[cfg(feature = "python")] @@ -253,6 +253,38 @@ pub enum LogicalPlan { }, } +impl Clone for LogicalPlan { + // Autogenerated by rust-analyzer, don't care about it looking nice, it just + // calls clone on every member of every enum variant. + #[rustfmt::skip] + #[allow(clippy::clone_on_copy)] + fn clone(&self) -> Self { + with_dynamic_stack(|| { + match self { + #[cfg(feature = "python")] + Self::PythonScan { options } => Self::PythonScan { options: options.clone() }, + Self::Selection { input, predicate } => Self::Selection { input: input.clone(), predicate: predicate.clone() }, + Self::Cache { input, id, cache_hits } => Self::Cache { input: input.clone(), id: id.clone(), cache_hits: cache_hits.clone() }, + Self::Scan { paths, file_info, predicate, file_options, scan_type } => Self::Scan { paths: paths.clone(), file_info: file_info.clone(), predicate: predicate.clone(), file_options: file_options.clone(), scan_type: scan_type.clone() }, + Self::DataFrameScan { df, schema, output_schema, projection, selection } => Self::DataFrameScan { df: df.clone(), schema: schema.clone(), output_schema: output_schema.clone(), projection: projection.clone(), selection: selection.clone() }, + Self::Projection { expr, input, schema, options } => Self::Projection { expr: expr.clone(), input: input.clone(), schema: schema.clone(), options: options.clone() }, + Self::Aggregate { input, keys, aggs, schema, apply, maintain_order, options } => Self::Aggregate { input: input.clone(), keys: keys.clone(), aggs: aggs.clone(), schema: schema.clone(), apply: apply.clone(), maintain_order: maintain_order.clone(), options: options.clone() }, + Self::Join { input_left, input_right, schema, left_on, right_on, options } => Self::Join { input_left: input_left.clone(), input_right: input_right.clone(), schema: schema.clone(), left_on: left_on.clone(), right_on: right_on.clone(), options: options.clone() }, + Self::HStack { input, exprs, schema, options } => Self::HStack { input: input.clone(), exprs: exprs.clone(), schema: schema.clone(), options: options.clone() }, + Self::Distinct { input, options } => Self::Distinct { input: input.clone(), options: options.clone() }, + Self::Sort { input, by_column, args } => Self::Sort { input: input.clone(), by_column: by_column.clone(), args: args.clone() }, + Self::Slice { input, offset, len } => Self::Slice { input: input.clone(), offset: offset.clone(), len: len.clone() }, + Self::MapFunction { input, function } => Self::MapFunction { input: input.clone(), function: function.clone() }, + Self::Union { inputs, options } => Self::Union { inputs: inputs.clone(), options: options.clone() }, + Self::HConcat { inputs, schema, options } => Self::HConcat { inputs: inputs.clone(), schema: schema.clone(), options: options.clone() }, + Self::Error { input, err } => Self::Error { input: input.clone(), err: err.clone() }, + Self::ExtContext { input, contexts, schema } => Self::ExtContext { input: input.clone(), contexts: contexts.clone(), schema: schema.clone() }, + Self::Sink { input, payload } => Self::Sink { input: input.clone(), payload: payload.clone() }, + } + }) + } +} + impl Default for LogicalPlan { fn default() -> Self { let df = DataFrame::new::(vec![]).unwrap(); diff --git a/crates/polars-plan/src/logical_plan/optimizer/predicate_pushdown/mod.rs b/crates/polars-plan/src/logical_plan/optimizer/predicate_pushdown/mod.rs index 0b1e9dbd4e93..b533436c6650 100644 --- a/crates/polars-plan/src/logical_plan/optimizer/predicate_pushdown/mod.rs +++ b/crates/polars-plan/src/logical_plan/optimizer/predicate_pushdown/mod.rs @@ -6,6 +6,7 @@ mod utils; use polars_core::datatypes::PlHashMap; use polars_core::prelude::*; +use polars_utils::recursion::with_dynamic_stack; use utils::*; use super::*; @@ -243,466 +244,489 @@ impl<'a> PredicatePushDown<'a> { lp_arena: &mut Arena, expr_arena: &mut Arena, ) -> PolarsResult { - use ALogicalPlan::*; - - match lp { - Selection { - ref predicate, - input, - } => { - // Use a tmp_key to avoid inadvertently combining predicates that otherwise would have - // been partially pushed: - // - // (1) .filter(pl.count().over("key") == 1) - // (2) .filter(pl.col("key") == 1) - // - // (2) can be pushed past (1) but they both have the same predicate - // key name in the hashtable. - let tmp_key = Arc::::from(&*temporary_unique_key(&acc_predicates)); - acc_predicates.insert(tmp_key.clone(), predicate.clone()); - - let local_predicates = - match pushdown_eligibility(&[], &acc_predicates, expr_arena)?.0 { - PushdownEligibility::Full => vec![], - PushdownEligibility::Partial { to_local } => { - let mut out = Vec::with_capacity(to_local.len()); - for key in to_local { - out.push(acc_predicates.remove(&key).unwrap()); - } - out - }, - PushdownEligibility::NoPushdown => { - let out = acc_predicates.drain().map(|t| t.1).collect(); - acc_predicates.clear(); - out - }, - }; + with_dynamic_stack(|| { + use ALogicalPlan::*; + + match lp { + Selection { + ref predicate, + input, + } => { + // Use a tmp_key to avoid inadvertently combining predicates that otherwise would have + // been partially pushed: + // + // (1) .filter(pl.count().over("key") == 1) + // (2) .filter(pl.col("key") == 1) + // + // (2) can be pushed past (1) but they both have the same predicate + // key name in the hashtable. + let tmp_key = Arc::::from(&*temporary_unique_key(&acc_predicates)); + acc_predicates.insert(tmp_key.clone(), predicate.clone()); - if let Some(predicate) = acc_predicates.remove(&tmp_key) { - insert_and_combine_predicate(&mut acc_predicates, &predicate, expr_arena); - } + let local_predicates = + match pushdown_eligibility(&[], &acc_predicates, expr_arena)?.0 { + PushdownEligibility::Full => vec![], + PushdownEligibility::Partial { to_local } => { + let mut out = Vec::with_capacity(to_local.len()); + for key in to_local { + out.push(acc_predicates.remove(&key).unwrap()); + } + out + }, + PushdownEligibility::NoPushdown => { + let out = acc_predicates.drain().map(|t| t.1).collect(); + acc_predicates.clear(); + out + }, + }; + + if let Some(predicate) = acc_predicates.remove(&tmp_key) { + insert_and_combine_predicate(&mut acc_predicates, &predicate, expr_arena); + } - let alp = lp_arena.take(input); - let new_input = self.push_down(alp, acc_predicates, lp_arena, expr_arena)?; + let alp = lp_arena.take(input); + let new_input = self.push_down(alp, acc_predicates, lp_arena, expr_arena)?; - // TODO! - // If a predicates result would be influenced by earlier applied - // predicates, we simply don't pushdown this one passed this node - // However, we can do better and let it pass but store the order of the predicates - // so that we can apply them in correct order at the deepest level - Ok( - self.optional_apply_predicate( + // TODO! + // If a predicates result would be influenced by earlier applied + // predicates, we simply don't pushdown this one passed this node + // However, we can do better and let it pass but store the order of the predicates + // so that we can apply them in correct order at the deepest level + Ok(self.optional_apply_predicate( new_input, local_predicates, lp_arena, expr_arena, - ), - ) - }, - DataFrameScan { - df, - schema, - output_schema, - projection, - selection, - } => { - let selection = predicate_at_scan(acc_predicates, selection, expr_arena); - let lp = DataFrameScan { + )) + }, + DataFrameScan { df, schema, output_schema, projection, selection, - }; - Ok(lp) - }, - Scan { - mut paths, - mut file_info, - ref predicate, - mut scan_type, - file_options: options, - output_schema, - } => { - for e in acc_predicates.values() { - debug_assert_aexpr_allows_predicate_pushdown(e.node(), expr_arena); - } + } => { + let selection = predicate_at_scan(acc_predicates, selection, expr_arena); + let lp = DataFrameScan { + df, + schema, + output_schema, + projection, + selection, + }; + Ok(lp) + }, + Scan { + mut paths, + mut file_info, + ref predicate, + mut scan_type, + file_options: options, + output_schema, + } => { + for e in acc_predicates.values() { + debug_assert_aexpr_allows_predicate_pushdown(e.node(), expr_arena); + } - let local_predicates = match &scan_type { - #[cfg(feature = "parquet")] - FileScan::Parquet { .. } => vec![], - #[cfg(feature = "ipc")] - FileScan::Ipc { .. } => vec![], - _ => { - // Disallow row index pushdown of other scans as they may - // not update the row index properly before applying the - // predicate (e.g. FileScan::Csv doesn't). - if let Some(ref row_index) = options.row_index { - let row_index_predicates = transfer_to_local_by_name( - expr_arena, - &mut acc_predicates, - |name| name.as_ref() == row_index.name, - ); - row_index_predicates - } else { - vec![] - } - }, - }; - let predicate = predicate_at_scan(acc_predicates, predicate.clone(), expr_arena); - - if let (true, Some(predicate)) = (file_info.hive_parts.is_some(), &predicate) { - if let Some(io_expr) = self.hive_partition_eval.unwrap()(predicate, expr_arena) - { - if let Some(stats_evaluator) = io_expr.as_stats_evaluator() { - let mut new_paths = Vec::with_capacity(paths.len()); - - for path in paths.as_ref().iter() { - file_info.update_hive_partitions(path)?; - let hive_part_stats = file_info.hive_parts.as_deref().ok_or_else(|| polars_err!(ComputeError: "cannot combine hive partitioned directories with non-hive partitioned ones"))?; - - if stats_evaluator.should_read(hive_part_stats.get_statistics())? { - new_paths.push(path.clone()); - } + let local_predicates = match &scan_type { + #[cfg(feature = "parquet")] + FileScan::Parquet { .. } => vec![], + #[cfg(feature = "ipc")] + FileScan::Ipc { .. } => vec![], + _ => { + // Disallow row index pushdown of other scans as they may + // not update the row index properly before applying the + // predicate (e.g. FileScan::Csv doesn't). + if let Some(ref row_index) = options.row_index { + let row_index_predicates = transfer_to_local_by_name( + expr_arena, + &mut acc_predicates, + |name| name.as_ref() == row_index.name, + ); + row_index_predicates + } else { + vec![] } + }, + }; + let predicate = + predicate_at_scan(acc_predicates, predicate.clone(), expr_arena); + + if let (true, Some(predicate)) = (file_info.hive_parts.is_some(), &predicate) { + if let Some(io_expr) = + self.hive_partition_eval.unwrap()(predicate, expr_arena) + { + if let Some(stats_evaluator) = io_expr.as_stats_evaluator() { + let mut new_paths = Vec::with_capacity(paths.len()); + + for path in paths.as_ref().iter() { + file_info.update_hive_partitions(path)?; + let hive_part_stats = file_info.hive_parts.as_deref().ok_or_else(|| polars_err!(ComputeError: "cannot combine hive partitioned directories with non-hive partitioned ones"))?; + + if stats_evaluator + .should_read(hive_part_stats.get_statistics())? + { + new_paths.push(path.clone()); + } + } - if paths.len() != new_paths.len() { - if self.verbose { - eprintln!( - "hive partitioning: skipped {} files, first file : {}", - paths.len() - new_paths.len(), - paths[0].display() - ) + if paths.len() != new_paths.len() { + if self.verbose { + eprintln!( + "hive partitioning: skipped {} files, first file : {}", + paths.len() - new_paths.len(), + paths[0].display() + ) + } + scan_type.remove_metadata(); + } + if paths.is_empty() { + let schema = + output_schema.as_ref().unwrap_or(&file_info.schema); + let df = DataFrame::from(schema.as_ref()); + + return Ok(DataFrameScan { + df: Arc::new(df), + schema: schema.clone(), + output_schema: None, + projection: None, + selection: None, + }); + } else { + paths = Arc::from(new_paths) } - scan_type.remove_metadata(); - } - if paths.is_empty() { - let schema = output_schema.as_ref().unwrap_or(&file_info.schema); - let df = DataFrame::from(schema.as_ref()); - - return Ok(DataFrameScan { - df: Arc::new(df), - schema: schema.clone(), - output_schema: None, - projection: None, - selection: None, - }); - } else { - paths = Arc::from(new_paths) } } } - } - let mut do_optimization = match &scan_type { - #[cfg(feature = "csv")] - FileScan::Csv { .. } => options.n_rows.is_none(), - FileScan::Anonymous { function, .. } => function.allows_predicate_pushdown(), - #[allow(unreachable_patterns)] - _ => true, - }; - do_optimization &= predicate.is_some(); - - let lp = if do_optimization { - Scan { - paths, - file_info, - predicate, - file_options: options, - output_schema, - scan_type, - } - } else { - let lp = Scan { - paths, - file_info, - predicate: None, - file_options: options, - output_schema, - scan_type, + let mut do_optimization = match &scan_type { + #[cfg(feature = "csv")] + FileScan::Csv { .. } => options.n_rows.is_none(), + FileScan::Anonymous { function, .. } => { + function.allows_predicate_pushdown() + }, + #[allow(unreachable_patterns)] + _ => true, }; - if let Some(predicate) = predicate { - let input = lp_arena.add(lp); - Selection { input, predicate } + do_optimization &= predicate.is_some(); + + let lp = if do_optimization { + Scan { + paths, + file_info, + predicate, + file_options: options, + output_schema, + scan_type, + } } else { - lp - } - }; - - Ok(self.optional_apply_predicate(lp, local_predicates, lp_arena, expr_arena)) - }, - Distinct { input, options } => { - if let Some(ref subset) = options.subset { - // Predicates on the subset can pass. - let subset = subset.clone(); - let mut names_set = PlHashSet::<&str>::with_capacity(subset.len()); - for name in subset.iter() { - names_set.insert(name.as_str()); - } - - let condition = |name: Arc| !names_set.contains(name.as_ref()); - let local_predicates = - transfer_to_local_by_name(expr_arena, &mut acc_predicates, condition); + let lp = Scan { + paths, + file_info, + predicate: None, + file_options: options, + output_schema, + scan_type, + }; + if let Some(predicate) = predicate { + let input = lp_arena.add(lp); + Selection { input, predicate } + } else { + lp + } + }; - self.pushdown_and_assign(input, acc_predicates, lp_arena, expr_arena)?; - let lp = Distinct { input, options }; Ok(self.optional_apply_predicate(lp, local_predicates, lp_arena, expr_arena)) - } else { - let lp = Distinct { input, options }; - self.no_pushdown_restart_opt(lp, acc_predicates, lp_arena, expr_arena) - } - }, - Join { - input_left, - input_right, - left_on, - right_on, - schema, - options, - } => process_join( - self, - lp_arena, - expr_arena, - input_left, - input_right, - left_on, - right_on, - schema, - options, - acc_predicates, - ), - MapFunction { ref function, .. } => { - if function.allow_predicate_pd() { - match function { - FunctionNode::Rename { existing, new, .. } => { - let local_predicates = - process_rename(&mut acc_predicates, expr_arena, existing, new)?; - let lp = self.pushdown_and_continue( - lp, - acc_predicates, - lp_arena, - expr_arena, - false, - )?; - Ok(self.optional_apply_predicate( - lp, - local_predicates, - lp_arena, - expr_arena, - )) - }, - FunctionNode::Explode { columns, .. } => { - let condition = - |name: Arc| columns.iter().any(|s| s.as_ref() == &*name); + }, + Distinct { input, options } => { + if let Some(ref subset) = options.subset { + // Predicates on the subset can pass. + let subset = subset.clone(); + let mut names_set = PlHashSet::<&str>::with_capacity(subset.len()); + for name in subset.iter() { + names_set.insert(name.as_str()); + } - // first columns that refer to the exploded columns should be done here - let local_predicates = transfer_to_local_by_name( - expr_arena, - &mut acc_predicates, - condition, - ); + let condition = |name: Arc| !names_set.contains(name.as_ref()); + let local_predicates = + transfer_to_local_by_name(expr_arena, &mut acc_predicates, condition); - let lp = self.pushdown_and_continue( - lp, - acc_predicates, - lp_arena, - expr_arena, - false, - )?; - Ok(self.optional_apply_predicate( - lp, - local_predicates, - lp_arena, - expr_arena, - )) - }, - FunctionNode::Melt { args, .. } => { - let variable_name = args.variable_name.as_deref().unwrap_or("variable"); - let value_name = args.value_name.as_deref().unwrap_or("value"); - - // predicates that will be done at this level - let condition = |name: Arc| { - let name = &*name; - name == variable_name - || name == value_name - || args.value_vars.iter().any(|s| s.as_str() == name) - }; - let local_predicates = transfer_to_local_by_name( - expr_arena, - &mut acc_predicates, - condition, - ); - - let lp = self.pushdown_and_continue( - lp, - acc_predicates, - lp_arena, - expr_arena, - false, - )?; - Ok(self.optional_apply_predicate( - lp, - local_predicates, - lp_arena, - expr_arena, - )) - }, - _ => self.pushdown_and_continue( + self.pushdown_and_assign(input, acc_predicates, lp_arena, expr_arena)?; + let lp = Distinct { input, options }; + Ok(self.optional_apply_predicate( lp, - acc_predicates, + local_predicates, lp_arena, expr_arena, - false, - ), - } - } else { - self.no_pushdown_restart_opt(lp, acc_predicates, lp_arena, expr_arena) - } - }, - Aggregate { - input, - keys, - aggs, - schema, - apply, - maintain_order, - options, - } => process_group_by( - self, - lp_arena, - expr_arena, - input, - keys, - aggs, - schema, - maintain_order, - apply, - options, - acc_predicates, - ), - lp @ Union { .. } => { - let mut local_predicates = vec![]; - - // a count is influenced by a Union/Vstack - acc_predicates.retain(|_, predicate| { - if has_aexpr(predicate.node(), expr_arena, |ae| matches!(ae, AExpr::Len)) { - local_predicates.push(predicate.clone()); - false + )) } else { - true + let lp = Distinct { input, options }; + self.no_pushdown_restart_opt(lp, acc_predicates, lp_arena, expr_arena) } - }); - let lp = - self.pushdown_and_continue(lp, acc_predicates, lp_arena, expr_arena, false)?; - Ok(self.optional_apply_predicate(lp, local_predicates, lp_arena, expr_arena)) - }, - lp @ Sort { .. } => { - let mut local_predicates = vec![]; - acc_predicates.retain(|_, predicate| { - if predicate_is_sort_boundary(predicate.node(), expr_arena) { - local_predicates.push(predicate.clone()); - false - } else { - true - } - }); - let lp = - self.pushdown_and_continue(lp, acc_predicates, lp_arena, expr_arena, false)?; - Ok(self.optional_apply_predicate(lp, local_predicates, lp_arena, expr_arena)) - }, - // Pushed down passed these nodes - lp @ Sink { .. } => { - self.pushdown_and_continue(lp, acc_predicates, lp_arena, expr_arena, false) - }, - lp @ HStack { .. } - | lp @ Projection { .. } - | lp @ SimpleProjection { .. } - | lp @ ExtContext { .. } => { - self.pushdown_and_continue(lp, acc_predicates, lp_arena, expr_arena, true) - }, - // NOT Pushed down passed these nodes - // predicates influence slice sizes - lp @ Slice { .. } => { - self.no_pushdown_restart_opt(lp, acc_predicates, lp_arena, expr_arena) - }, - lp @ HConcat { .. } => { - self.no_pushdown_restart_opt(lp, acc_predicates, lp_arena, expr_arena) - }, - // Caches will run predicate push-down in the `cache_states` run. - Cache { .. } => { - if self.block_at_cache { - self.no_pushdown(lp, acc_predicates, lp_arena, expr_arena) - } else { - self.pushdown_and_continue(lp, acc_predicates, lp_arena, expr_arena, false) - } - }, - #[cfg(feature = "python")] - PythonScan { - mut options, - predicate, - } => { - if options.pyarrow { - let predicate = predicate_at_scan(acc_predicates, predicate, expr_arena); - - if let Some(predicate) = predicate.clone() { - // simplify expressions before we translate them to pyarrow - let lp = PythonScan { - options: options.clone(), - predicate: Some(predicate), - }; - let lp_top = lp_arena.add(lp); - let stack_opt = StackOptimizer {}; - let lp_top = stack_opt - .optimize_loop( - &mut [Box::new(SimplifyExprRule {})], - expr_arena, - lp_arena, - lp_top, - ) - .unwrap(); - let PythonScan { - options: _, - predicate: Some(predicate), - } = lp_arena.take(lp_top) - else { - unreachable!() - }; + }, + Join { + input_left, + input_right, + left_on, + right_on, + schema, + options, + } => process_join( + self, + lp_arena, + expr_arena, + input_left, + input_right, + left_on, + right_on, + schema, + options, + acc_predicates, + ), + MapFunction { ref function, .. } => { + if function.allow_predicate_pd() { + match function { + FunctionNode::Rename { existing, new, .. } => { + let local_predicates = + process_rename(&mut acc_predicates, expr_arena, existing, new)?; + let lp = self.pushdown_and_continue( + lp, + acc_predicates, + lp_arena, + expr_arena, + false, + )?; + Ok(self.optional_apply_predicate( + lp, + local_predicates, + lp_arena, + expr_arena, + )) + }, + FunctionNode::Explode { columns, .. } => { + let condition = + |name: Arc| columns.iter().any(|s| s.as_ref() == &*name); - match super::super::pyarrow::predicate_to_pa( - predicate.node(), - expr_arena, - Default::default(), - ) { - // we we able to create a pyarrow string, mutate the options - Some(eval_str) => options.predicate = Some(eval_str), - // we were not able to translate the predicate - // apply here - None => { - let lp = PythonScan { - options, - predicate: None, + // first columns that refer to the exploded columns should be done here + let local_predicates = transfer_to_local_by_name( + expr_arena, + &mut acc_predicates, + condition, + ); + + let lp = self.pushdown_and_continue( + lp, + acc_predicates, + lp_arena, + expr_arena, + false, + )?; + Ok(self.optional_apply_predicate( + lp, + local_predicates, + lp_arena, + expr_arena, + )) + }, + FunctionNode::Melt { args, .. } => { + let variable_name = + args.variable_name.as_deref().unwrap_or("variable"); + let value_name = args.value_name.as_deref().unwrap_or("value"); + + // predicates that will be done at this level + let condition = |name: Arc| { + let name = &*name; + name == variable_name + || name == value_name + || args.value_vars.iter().any(|s| s.as_str() == name) }; - return Ok(self.optional_apply_predicate( + let local_predicates = transfer_to_local_by_name( + expr_arena, + &mut acc_predicates, + condition, + ); + + let lp = self.pushdown_and_continue( + lp, + acc_predicates, + lp_arena, + expr_arena, + false, + )?; + Ok(self.optional_apply_predicate( lp, - vec![predicate], + local_predicates, lp_arena, expr_arena, - )); + )) }, + _ => self.pushdown_and_continue( + lp, + acc_predicates, + lp_arena, + expr_arena, + false, + ), } + } else { + self.no_pushdown_restart_opt(lp, acc_predicates, lp_arena, expr_arena) } - Ok(PythonScan { options, predicate }) - } else { - self.no_pushdown_restart_opt( - PythonScan { options, predicate }, + }, + Aggregate { + input, + keys, + aggs, + schema, + apply, + maintain_order, + options, + } => process_group_by( + self, + lp_arena, + expr_arena, + input, + keys, + aggs, + schema, + maintain_order, + apply, + options, + acc_predicates, + ), + lp @ Union { .. } => { + let mut local_predicates = vec![]; + + // a count is influenced by a Union/Vstack + acc_predicates.retain(|_, predicate| { + if has_aexpr(predicate.node(), expr_arena, |ae| matches!(ae, AExpr::Len)) { + local_predicates.push(predicate.clone()); + false + } else { + true + } + }); + let lp = self.pushdown_and_continue( + lp, acc_predicates, lp_arena, expr_arena, - ) - } - }, - Invalid => unreachable!(), - } + false, + )?; + Ok(self.optional_apply_predicate(lp, local_predicates, lp_arena, expr_arena)) + }, + lp @ Sort { .. } => { + let mut local_predicates = vec![]; + acc_predicates.retain(|_, predicate| { + if predicate_is_sort_boundary(predicate.node(), expr_arena) { + local_predicates.push(predicate.clone()); + false + } else { + true + } + }); + let lp = self.pushdown_and_continue( + lp, + acc_predicates, + lp_arena, + expr_arena, + false, + )?; + Ok(self.optional_apply_predicate(lp, local_predicates, lp_arena, expr_arena)) + }, + // Pushed down passed these nodes + lp @ Sink { .. } => { + self.pushdown_and_continue(lp, acc_predicates, lp_arena, expr_arena, false) + }, + lp @ HStack { .. } + | lp @ Projection { .. } + | lp @ SimpleProjection { .. } + | lp @ ExtContext { .. } => { + self.pushdown_and_continue(lp, acc_predicates, lp_arena, expr_arena, true) + }, + // NOT Pushed down passed these nodes + // predicates influence slice sizes + lp @ Slice { .. } => { + self.no_pushdown_restart_opt(lp, acc_predicates, lp_arena, expr_arena) + }, + lp @ HConcat { .. } => { + self.no_pushdown_restart_opt(lp, acc_predicates, lp_arena, expr_arena) + }, + // Caches will run predicate push-down in the `cache_states` run. + Cache { .. } => { + if self.block_at_cache { + self.no_pushdown(lp, acc_predicates, lp_arena, expr_arena) + } else { + self.pushdown_and_continue(lp, acc_predicates, lp_arena, expr_arena, false) + } + }, + #[cfg(feature = "python")] + PythonScan { + mut options, + predicate, + } => { + if options.pyarrow { + let predicate = predicate_at_scan(acc_predicates, predicate, expr_arena); + + if let Some(predicate) = predicate.clone() { + // simplify expressions before we translate them to pyarrow + let lp = PythonScan { + options: options.clone(), + predicate: Some(predicate), + }; + let lp_top = lp_arena.add(lp); + let stack_opt = StackOptimizer {}; + let lp_top = stack_opt + .optimize_loop( + &mut [Box::new(SimplifyExprRule {})], + expr_arena, + lp_arena, + lp_top, + ) + .unwrap(); + let PythonScan { + options: _, + predicate: Some(predicate), + } = lp_arena.take(lp_top) + else { + unreachable!() + }; + + match super::super::pyarrow::predicate_to_pa( + predicate.node(), + expr_arena, + Default::default(), + ) { + // we we able to create a pyarrow string, mutate the options + Some(eval_str) => options.predicate = Some(eval_str), + // we were not able to translate the predicate + // apply here + None => { + let lp = PythonScan { + options, + predicate: None, + }; + return Ok(self.optional_apply_predicate( + lp, + vec![predicate], + lp_arena, + expr_arena, + )); + }, + } + } + Ok(PythonScan { options, predicate }) + } else { + self.no_pushdown_restart_opt( + PythonScan { options, predicate }, + acc_predicates, + lp_arena, + expr_arena, + ) + } + }, + Invalid => unreachable!(), + } + }) } pub(crate) fn optimize( diff --git a/crates/polars-plan/src/logical_plan/optimizer/projection_pushdown/mod.rs b/crates/polars-plan/src/logical_plan/optimizer/projection_pushdown/mod.rs index efc200238a11..6dea5dcc2763 100644 --- a/crates/polars-plan/src/logical_plan/optimizer/projection_pushdown/mod.rs +++ b/crates/polars-plan/src/logical_plan/optimizer/projection_pushdown/mod.rs @@ -12,6 +12,7 @@ mod semi_anti_join; use polars_core::datatypes::PlHashSet; use polars_core::prelude::*; use polars_io::RowIndex; +use polars_utils::recursion::with_dynamic_stack; #[cfg(feature = "semi_anti_join")] use semi_anti_join::process_semi_anti_join; @@ -325,395 +326,397 @@ impl ProjectionPushDown { lp_arena: &mut Arena, expr_arena: &mut Arena, ) -> PolarsResult { - use ALogicalPlan::*; - - match logical_plan { - Projection { expr, input, .. } => process_projection( - self, - input, - expr.exprs(), - acc_projections, - projected_names, - projections_seen, - lp_arena, - expr_arena, - ), - SimpleProjection { columns, input, .. } => { - let exprs = names_to_expr_irs(columns.iter_names(), expr_arena); - process_projection( + with_dynamic_stack(|| { + use ALogicalPlan::*; + + match logical_plan { + Projection { expr, input, .. } => process_projection( self, input, - exprs, + expr.exprs(), acc_projections, projected_names, projections_seen, lp_arena, expr_arena, - ) - }, - DataFrameScan { - df, - schema, - mut output_schema, - selection, - .. - } => { - let mut projection = None; - if !acc_projections.is_empty() { - output_schema = Some(Arc::new(update_scan_schema( - &acc_projections, + ), + SimpleProjection { columns, input, .. } => { + let exprs = names_to_expr_irs(columns.iter_names(), expr_arena); + process_projection( + self, + input, + exprs, + acc_projections, + projected_names, + projections_seen, + lp_arena, expr_arena, - &schema, - false, - )?)); - projection = get_scan_columns(&mut acc_projections, expr_arena, None); - } - let lp = DataFrameScan { + ) + }, + DataFrameScan { df, schema, - output_schema, - projection, + mut output_schema, selection, - }; - Ok(lp) - }, - #[cfg(feature = "python")] - PythonScan { - mut options, - predicate, - } => { - options.with_columns = get_scan_columns(&mut acc_projections, expr_arena, None); - - options.output_schema = if options.with_columns.is_none() { - None - } else { - Some(Arc::new(update_scan_schema( - &acc_projections, - expr_arena, - &options.schema, - true, - )?)) - }; - Ok(PythonScan { options, predicate }) - }, - Scan { - paths, - file_info, - scan_type, - predicate, - mut file_options, - mut output_schema, - } => { - let mut do_optimization = true; - #[allow(irrefutable_let_patterns)] - if let FileScan::Anonymous { ref function, .. } = scan_type { - do_optimization = function.allows_projection_pushdown(); - } - - if do_optimization { - file_options.with_columns = get_scan_columns( - &mut acc_projections, - expr_arena, - file_options.row_index.as_ref(), - ); + .. + } => { + let mut projection = None; + if !acc_projections.is_empty() { + output_schema = Some(Arc::new(update_scan_schema( + &acc_projections, + expr_arena, + &schema, + false, + )?)); + projection = get_scan_columns(&mut acc_projections, expr_arena, None); + } + let lp = DataFrameScan { + df, + schema, + output_schema, + projection, + selection, + }; + Ok(lp) + }, + #[cfg(feature = "python")] + PythonScan { + mut options, + predicate, + } => { + options.with_columns = get_scan_columns(&mut acc_projections, expr_arena, None); - output_schema = if file_options.with_columns.is_none() { + options.output_schema = if options.with_columns.is_none() { None } else { - let mut schema = update_scan_schema( + Some(Arc::new(update_scan_schema( &acc_projections, expr_arena, - &file_info.schema, - scan_type.sort_projection(&file_options), - )?; - // Hive partitions are created AFTER the projection, so the output - // schema is incorrect. Here we ensure the columns that are projected and hive - // parts are added at the proper place in the schema, which is at the end. - if let Some(parts) = file_info.hive_parts.as_deref() { - let partition_schema = parts.schema(); - for (name, _) in partition_schema.iter() { - if let Some(dt) = schema.shift_remove(name) { - schema.with_column(name.clone(), dt); - } - } - } - Some(Arc::new(schema)) + &options.schema, + true, + )?)) }; - } - - let lp = Scan { + Ok(PythonScan { options, predicate }) + }, + Scan { paths, file_info, - output_schema, scan_type, predicate, - file_options, - }; - Ok(lp) - }, - Sort { - input, - by_column, - args, - } => { - if !acc_projections.is_empty() { - // Make sure that the column(s) used for the sort is projected - by_column.iter().for_each(|node| { - add_expr_to_accumulated( - node.node(), + mut file_options, + mut output_schema, + } => { + let mut do_optimization = true; + #[allow(irrefutable_let_patterns)] + if let FileScan::Anonymous { ref function, .. } = scan_type { + do_optimization = function.allows_projection_pushdown(); + } + + if do_optimization { + file_options.with_columns = get_scan_columns( &mut acc_projections, - &mut projected_names, expr_arena, + file_options.row_index.as_ref(), ); - }); - } - self.pushdown_and_assign( - input, - acc_projections, - projected_names, - projections_seen, - lp_arena, - expr_arena, - )?; - Ok(Sort { + output_schema = if file_options.with_columns.is_none() { + None + } else { + let mut schema = update_scan_schema( + &acc_projections, + expr_arena, + &file_info.schema, + scan_type.sort_projection(&file_options), + )?; + // Hive partitions are created AFTER the projection, so the output + // schema is incorrect. Here we ensure the columns that are projected and hive + // parts are added at the proper place in the schema, which is at the end. + if let Some(parts) = file_info.hive_parts.as_deref() { + let partition_schema = parts.schema(); + for (name, _) in partition_schema.iter() { + if let Some(dt) = schema.shift_remove(name) { + schema.with_column(name.clone(), dt); + } + } + } + Some(Arc::new(schema)) + }; + } + + let lp = Scan { + paths, + file_info, + output_schema, + scan_type, + predicate, + file_options, + }; + Ok(lp) + }, + Sort { input, by_column, args, - }) - }, - Distinct { input, options } => { - // make sure that the set of unique columns is projected - if !acc_projections.is_empty() { - if let Some(subset) = options.subset.as_ref() { - subset.iter().for_each(|name| { - add_str_to_accumulated( - name, - &mut acc_projections, - &mut projected_names, - expr_arena, - ) - }) - } else { - // distinct needs all columns - let input_schema = lp_arena.get(input).schema(lp_arena); - for name in input_schema.iter_names() { - add_str_to_accumulated( - name.as_str(), + } => { + if !acc_projections.is_empty() { + // Make sure that the column(s) used for the sort is projected + by_column.iter().for_each(|node| { + add_expr_to_accumulated( + node.node(), &mut acc_projections, &mut projected_names, expr_arena, - ) + ); + }); + } + + self.pushdown_and_assign( + input, + acc_projections, + projected_names, + projections_seen, + lp_arena, + expr_arena, + )?; + Ok(Sort { + input, + by_column, + args, + }) + }, + Distinct { input, options } => { + // make sure that the set of unique columns is projected + if !acc_projections.is_empty() { + if let Some(subset) = options.subset.as_ref() { + subset.iter().for_each(|name| { + add_str_to_accumulated( + name, + &mut acc_projections, + &mut projected_names, + expr_arena, + ) + }) + } else { + // distinct needs all columns + let input_schema = lp_arena.get(input).schema(lp_arena); + for name in input_schema.iter_names() { + add_str_to_accumulated( + name.as_str(), + &mut acc_projections, + &mut projected_names, + expr_arena, + ) + } } } - } - self.pushdown_and_assign( + self.pushdown_and_assign( + input, + acc_projections, + projected_names, + projections_seen, + lp_arena, + expr_arena, + )?; + Ok(Distinct { input, options }) + }, + Selection { predicate, input } => { + if !acc_projections.is_empty() { + // make sure that the filter column is projected + add_expr_to_accumulated( + predicate.node(), + &mut acc_projections, + &mut projected_names, + expr_arena, + ); + }; + self.pushdown_and_assign( + input, + acc_projections, + projected_names, + projections_seen, + lp_arena, + expr_arena, + )?; + Ok(Selection { predicate, input }) + }, + Aggregate { input, + keys, + aggs, + apply, + schema, + maintain_order, + options, + } => process_group_by( + self, + input, + keys, + aggs, + apply, + schema, + maintain_order, + options, acc_projections, projected_names, projections_seen, lp_arena, expr_arena, - )?; - Ok(Distinct { input, options }) - }, - Selection { predicate, input } => { - if !acc_projections.is_empty() { - // make sure that the filter column is projected - add_expr_to_accumulated( - predicate.node(), - &mut acc_projections, - &mut projected_names, + ), + Join { + input_left, + input_right, + left_on, + right_on, + options, + schema, + } => match options.args.how { + #[cfg(feature = "semi_anti_join")] + JoinType::Semi | JoinType::Anti => process_semi_anti_join( + self, + input_left, + input_right, + left_on, + right_on, + options, + acc_projections, + projected_names, + projections_seen, + lp_arena, + expr_arena, + ), + _ => process_join( + self, + input_left, + input_right, + left_on, + right_on, + options, + acc_projections, + projected_names, + projections_seen, + lp_arena, expr_arena, - ); - }; - self.pushdown_and_assign( + &schema, + ), + }, + HStack { + input, + exprs, + options, + .. + } => process_hstack( + self, input, + exprs.exprs(), + options, acc_projections, projected_names, projections_seen, lp_arena, expr_arena, - )?; - Ok(Selection { predicate, input }) - }, - Aggregate { - input, - keys, - aggs, - apply, - schema, - maintain_order, - options, - } => process_group_by( - self, - input, - keys, - aggs, - apply, - schema, - maintain_order, - options, - acc_projections, - projected_names, - projections_seen, - lp_arena, - expr_arena, - ), - Join { - input_left, - input_right, - left_on, - right_on, - options, - schema, - } => match options.args.how { - #[cfg(feature = "semi_anti_join")] - JoinType::Semi | JoinType::Anti => process_semi_anti_join( + ), + ExtContext { + input, contexts, .. + } => { + // local projections are ignored. These are just root nodes + // complex expression will still be done later + let _local_projections = self.pushdown_and_assign_check_schema( + input, + acc_projections, + projections_seen, + lp_arena, + expr_arena, + false, + )?; + + let mut new_schema = lp_arena + .get(input) + .schema(lp_arena) + .as_ref() + .as_ref() + .clone(); + + for node in &contexts { + let other_schema = lp_arena.get(*node).schema(lp_arena); + for fld in other_schema.iter_fields() { + if new_schema.get(fld.name()).is_none() { + new_schema.with_column(fld.name, fld.dtype); + } + } + } + + Ok(ExtContext { + input, + contexts, + schema: Arc::new(new_schema), + }) + }, + MapFunction { + input, + ref function, + } => functions::process_functions( self, - input_left, - input_right, - left_on, - right_on, - options, + input, + function, acc_projections, projected_names, projections_seen, lp_arena, expr_arena, ), - _ => process_join( + HConcat { + inputs, + schema, + options, + } => process_hconcat( self, - input_left, - input_right, - left_on, - right_on, + inputs, + schema, options, acc_projections, + projections_seen, + lp_arena, + expr_arena, + ), + lp @ Union { .. } => process_generic( + self, + lp, + acc_projections, projected_names, projections_seen, lp_arena, expr_arena, - &schema, ), - }, - HStack { - input, - exprs, - options, - .. - } => process_hstack( - self, - input, - exprs.exprs(), - options, - acc_projections, - projected_names, - projections_seen, - lp_arena, - expr_arena, - ), - ExtContext { - input, contexts, .. - } => { - // local projections are ignored. These are just root nodes - // complex expression will still be done later - let _local_projections = self.pushdown_and_assign_check_schema( - input, + // These nodes only have inputs and exprs, so we can use same logic. + lp @ Slice { .. } | lp @ Sink { .. } => process_generic( + self, + lp, acc_projections, + projected_names, projections_seen, lp_arena, expr_arena, - false, - )?; - - let mut new_schema = lp_arena - .get(input) - .schema(lp_arena) - .as_ref() - .as_ref() - .clone(); - - for node in &contexts { - let other_schema = lp_arena.get(*node).schema(lp_arena); - for fld in other_schema.iter_fields() { - if new_schema.get(fld.name()).is_none() { - new_schema.with_column(fld.name, fld.dtype); - } + ), + Cache { .. } => { + // projections above this cache will be accumulated and pushed down + // later + // the redundant projection will be cleaned in the fast projection optimization + // phase. + if acc_projections.is_empty() { + Ok(logical_plan) + } else { + Ok( + ALogicalPlanBuilder::from_lp(logical_plan, expr_arena, lp_arena) + .project_simple_nodes(acc_projections) + .unwrap() + .build(), + ) } - } - - Ok(ExtContext { - input, - contexts, - schema: Arc::new(new_schema), - }) - }, - MapFunction { - input, - ref function, - } => functions::process_functions( - self, - input, - function, - acc_projections, - projected_names, - projections_seen, - lp_arena, - expr_arena, - ), - HConcat { - inputs, - schema, - options, - } => process_hconcat( - self, - inputs, - schema, - options, - acc_projections, - projections_seen, - lp_arena, - expr_arena, - ), - lp @ Union { .. } => process_generic( - self, - lp, - acc_projections, - projected_names, - projections_seen, - lp_arena, - expr_arena, - ), - // These nodes only have inputs and exprs, so we can use same logic. - lp @ Slice { .. } | lp @ Sink { .. } => process_generic( - self, - lp, - acc_projections, - projected_names, - projections_seen, - lp_arena, - expr_arena, - ), - Cache { .. } => { - // projections above this cache will be accumulated and pushed down - // later - // the redundant projection will be cleaned in the fast projection optimization - // phase. - if acc_projections.is_empty() { - Ok(logical_plan) - } else { - Ok( - ALogicalPlanBuilder::from_lp(logical_plan, expr_arena, lp_arena) - .project_simple_nodes(acc_projections) - .unwrap() - .build(), - ) - } - }, - Invalid => unreachable!(), - } + }, + Invalid => unreachable!(), + } + }) } pub fn optimize( diff --git a/crates/polars-plan/src/logical_plan/optimizer/slice_pushdown_lp.rs b/crates/polars-plan/src/logical_plan/optimizer/slice_pushdown_lp.rs index 4c595dcc47b2..40bc01aeb53a 100644 --- a/crates/polars-plan/src/logical_plan/optimizer/slice_pushdown_lp.rs +++ b/crates/polars-plan/src/logical_plan/optimizer/slice_pushdown_lp.rs @@ -1,4 +1,5 @@ use polars_core::prelude::*; +use polars_utils::recursion::with_dynamic_stack; use crate::logical_plan::projection_expr::ProjectionExprs; use crate::prelude::*; @@ -145,266 +146,267 @@ impl SlicePushDown { lp_arena: &mut Arena, expr_arena: &mut Arena, ) -> PolarsResult { - use ALogicalPlan::*; + with_dynamic_stack(|| { + use ALogicalPlan::*; - match (lp, state) { - #[cfg(feature = "python")] - (PythonScan { - mut options, - predicate, - }, - // TODO! we currently skip slice pushdown if there is a predicate. - // we can modify the readers to only limit after predicates have been applied - Some(state)) if state.offset == 0 && predicate.is_none() => { - options.n_rows = Some(state.len as usize); - let lp = PythonScan { - options, - predicate - }; - Ok(lp) - } - #[cfg(feature = "csv")] - (Scan { - paths, - file_info, - output_schema, - file_options: mut options, - predicate, - scan_type: FileScan::Csv {options: mut csv_options} - }, Some(state)) if predicate.is_none() && state.offset >= 0 => { - options.n_rows = Some(state.len as usize); - csv_options.skip_rows += state.offset as usize; - - let lp = Scan { + match (lp, state) { + #[cfg(feature = "python")] + (PythonScan { + mut options, + predicate, + }, + // TODO! we currently skip slice pushdown if there is a predicate. + // we can modify the readers to only limit after predicates have been applied + Some(state)) if state.offset == 0 && predicate.is_none() => { + options.n_rows = Some(state.len as usize); + let lp = PythonScan { + options, + predicate + }; + Ok(lp) + } + #[cfg(feature = "csv")] + (Scan { paths, file_info, output_schema, - scan_type: FileScan::Csv {options: csv_options}, - file_options: options, + file_options: mut options, predicate, - }; - Ok(lp) - }, - // TODO! we currently skip slice pushdown if there is a predicate. - (Scan { - paths, - file_info, - output_schema, - file_options: mut options, - predicate, - scan_type - }, Some(state)) if state.offset == 0 && predicate.is_none() => { - options.n_rows = Some(state.len as usize); - let lp = Scan { + scan_type: FileScan::Csv {options: mut csv_options} + }, Some(state)) if predicate.is_none() && state.offset >= 0 => { + options.n_rows = Some(state.len as usize); + csv_options.skip_rows += state.offset as usize; + + let lp = Scan { + paths, + file_info, + output_schema, + scan_type: FileScan::Csv {options: csv_options}, + file_options: options, + predicate, + }; + Ok(lp) + }, + // TODO! we currently skip slice pushdown if there is a predicate. + (Scan { paths, file_info, output_schema, + file_options: mut options, predicate, - file_options: options, scan_type - }; + }, Some(state)) if state.offset == 0 && predicate.is_none() => { + options.n_rows = Some(state.len as usize); + let lp = Scan { + paths, + file_info, + output_schema, + predicate, + file_options: options, + scan_type + }; - Ok(lp) - } - (Union {mut inputs, mut options }, Some(state)) => { - options.slice = Some((state.offset, state.len as usize)); - if state.offset == 0 { - for input in &mut inputs { - let input_lp = lp_arena.take(*input); - let input_lp = self.pushdown(input_lp, Some(state), lp_arena, expr_arena)?; - lp_arena.replace(*input, input_lp); - } + Ok(lp) } - Ok(Union {inputs, options}) - }, - (Join { - input_left, - input_right, - schema, - left_on, - right_on, - mut options - }, Some(state)) if !self.streaming => { - // first restart optimization in both inputs and get the updated LP - let lp_left = lp_arena.take(input_left); - let lp_left = self.pushdown(lp_left, None, lp_arena, expr_arena)?; - let input_left = lp_arena.add(lp_left); - - let lp_right = lp_arena.take(input_right); - let lp_right = self.pushdown(lp_right, None, lp_arena, expr_arena)?; - let input_right = lp_arena.add(lp_right); - - // then assign the slice state to the join operation - - let mut_options = Arc::make_mut(&mut options); - mut_options.args.slice = Some((state.offset, state.len as usize)); - - Ok(Join { + (Union {mut inputs, mut options }, Some(state)) => { + options.slice = Some((state.offset, state.len as usize)); + if state.offset == 0 { + for input in &mut inputs { + let input_lp = lp_arena.take(*input); + let input_lp = self.pushdown(input_lp, Some(state), lp_arena, expr_arena)?; + lp_arena.replace(*input, input_lp); + } + } + Ok(Union {inputs, options}) + }, + (Join { input_left, input_right, schema, left_on, right_on, - options - }) - } - (Aggregate { input, keys, aggs, schema, apply, maintain_order, mut options }, Some(state)) => { - // first restart optimization in inputs and get the updated LP - let input_lp = lp_arena.take(input); - let input_lp = self.pushdown(input_lp, None, lp_arena, expr_arena)?; - let input= lp_arena.add(input_lp); + mut options + }, Some(state)) if !self.streaming => { + // first restart optimization in both inputs and get the updated LP + let lp_left = lp_arena.take(input_left); + let lp_left = self.pushdown(lp_left, None, lp_arena, expr_arena)?; + let input_left = lp_arena.add(lp_left); - let mut_options= Arc::make_mut(&mut options); - mut_options.slice = Some((state.offset, state.len as usize)); + let lp_right = lp_arena.take(input_right); + let lp_right = self.pushdown(lp_right, None, lp_arena, expr_arena)?; + let input_right = lp_arena.add(lp_right); - Ok(Aggregate { - input, - keys, - aggs, - schema, - apply, - maintain_order, - options - }) - } - (Distinct {input, mut options}, Some(state)) => { - // first restart optimization in inputs and get the updated LP - let input_lp = lp_arena.take(input); - let input_lp = self.pushdown(input_lp, None, lp_arena, expr_arena)?; - let input= lp_arena.add(input_lp); - options.slice = Some((state.offset, state.len as usize)); - Ok(Distinct { - input, - options, - }) - } - (Sort {input, by_column, mut args}, Some(state)) => { - // first restart optimization in inputs and get the updated LP - let input_lp = lp_arena.take(input); - let input_lp = self.pushdown(input_lp, None, lp_arena, expr_arena)?; - let input= lp_arena.add(input_lp); + // then assign the slice state to the join operation - args.slice = Some((state.offset, state.len as usize)); - Ok(Sort { + let mut_options = Arc::make_mut(&mut options); + mut_options.args.slice = Some((state.offset, state.len as usize)); + + Ok(Join { + input_left, + input_right, + schema, + left_on, + right_on, + options + }) + } + (Aggregate { input, keys, aggs, schema, apply, maintain_order, mut options }, Some(state)) => { + // first restart optimization in inputs and get the updated LP + let input_lp = lp_arena.take(input); + let input_lp = self.pushdown(input_lp, None, lp_arena, expr_arena)?; + let input= lp_arena.add(input_lp); + + let mut_options= Arc::make_mut(&mut options); + mut_options.slice = Some((state.offset, state.len as usize)); + + Ok(Aggregate { + input, + keys, + aggs, + schema, + apply, + maintain_order, + options + }) + } + (Distinct {input, mut options}, Some(state)) => { + // first restart optimization in inputs and get the updated LP + let input_lp = lp_arena.take(input); + let input_lp = self.pushdown(input_lp, None, lp_arena, expr_arena)?; + let input= lp_arena.add(input_lp); + options.slice = Some((state.offset, state.len as usize)); + Ok(Distinct { + input, + options, + }) + } + (Sort {input, by_column, mut args}, Some(state)) => { + // first restart optimization in inputs and get the updated LP + let input_lp = lp_arena.take(input); + let input_lp = self.pushdown(input_lp, None, lp_arena, expr_arena)?; + let input= lp_arena.add(input_lp); + + args.slice = Some((state.offset, state.len as usize)); + Ok(Sort { + input, + by_column, + args + }) + } + (Slice { input, - by_column, - args - }) - } - (Slice { - input, - offset, - len - }, Some(previous_state)) => { - let alp = lp_arena.take(input); - let state = Some(if previous_state.offset == offset { - State { - offset, - len: std::cmp::min(len, previous_state.len) - } - } else { - State { - offset, - len - } - }); - let lp = self.pushdown(alp, state, lp_arena, expr_arena)?; - let input = lp_arena.add(lp); - Ok(Slice { + offset, + len + }, Some(previous_state)) => { + let alp = lp_arena.take(input); + let state = Some(if previous_state.offset == offset { + State { + offset, + len: std::cmp::min(len, previous_state.len) + } + } else { + State { + offset, + len + } + }); + let lp = self.pushdown(alp, state, lp_arena, expr_arena)?; + let input = lp_arena.add(lp); + Ok(Slice { + input, + offset: previous_state.offset, + len: previous_state.len + }) + } + (Slice { input, - offset: previous_state.offset, - len: previous_state.len - }) - } - (Slice { - input, - offset, - len - }, None) => { - let alp = lp_arena.take(input); - let state = Some(State { offset, len - }); - self.pushdown(alp, state, lp_arena, expr_arena) - } - // [Do not pushdown] boundary - // here we do not pushdown. - // we reset the state and then start the optimization again - m @ (Selection { .. }, _) - // other blocking nodes - | m @ (DataFrameScan {..}, _) - | m @ (Sort {..}, _) - | m @ (MapFunction {function: FunctionNode::Explode {..}, ..}, _) - | m @ (MapFunction {function: FunctionNode::Melt {..}, ..}, _) - | m @ (Cache {..}, _) - | m @ (Distinct {..}, _) - | m @ (Aggregate{..},_) - // blocking in streaming - | m @ (Join{..},_) - => { - let (lp, state) = m; - self.no_pushdown_restart_opt(lp, state, lp_arena, expr_arena) - } - // [Pushdown] - (MapFunction {input, function}, _) if function.allow_predicate_pd() => { - let lp = MapFunction {input, function}; - self.pushdown_and_continue(lp, state, lp_arena, expr_arena) - }, - // [NO Pushdown] - m @ (MapFunction {..}, _) => { - let (lp, state) = m; - self.no_pushdown_restart_opt(lp, state, lp_arena, expr_arena) - } - // [Pushdown] - // these nodes will be pushed down. - // State is None, we can continue - m @(Projection{..}, None) - => { - let (lp, state) = m; - self.pushdown_and_continue(lp, state, lp_arena, expr_arena) - } - // there is state, inspect the projection to determine how to deal with it - (Projection {input, expr, schema, options}, Some(_)) => { - if can_pushdown_slice_past_projections(&expr, expr_arena).1 { - let lp = Projection {input, expr, schema, options}; - self.pushdown_and_continue(lp, state, lp_arena, expr_arena) + }, None) => { + let alp = lp_arena.take(input); + let state = Some(State { + offset, + len + }); + self.pushdown(alp, state, lp_arena, expr_arena) } - // don't push down slice, but restart optimization - else { - let lp = Projection {input, expr, schema, options}; + // [Do not pushdown] boundary + // here we do not pushdown. + // we reset the state and then start the optimization again + m @ (Selection { .. }, _) + // other blocking nodes + | m @ (DataFrameScan {..}, _) + | m @ (Sort {..}, _) + | m @ (MapFunction {function: FunctionNode::Explode {..}, ..}, _) + | m @ (MapFunction {function: FunctionNode::Melt {..}, ..}, _) + | m @ (Cache {..}, _) + | m @ (Distinct {..}, _) + | m @ (Aggregate{..},_) + // blocking in streaming + | m @ (Join{..},_) + => { + let (lp, state) = m; self.no_pushdown_restart_opt(lp, state, lp_arena, expr_arena) } - } - (HStack {input, exprs, schema, options}, _) => { - let check = can_pushdown_slice_past_projections(&exprs, expr_arena); + // [Pushdown] + (MapFunction {input, function}, _) if function.allow_predicate_pd() => { + let lp = MapFunction {input, function}; + self.pushdown_and_continue(lp, state, lp_arena, expr_arena) + }, + // [NO Pushdown] + m @ (MapFunction {..}, _) => { + let (lp, state) = m; + self.no_pushdown_restart_opt(lp, state, lp_arena, expr_arena) + } + // [Pushdown] + // these nodes will be pushed down. + // State is None, we can continue + m @(Projection{..}, None) + => { + let (lp, state) = m; + self.pushdown_and_continue(lp, state, lp_arena, expr_arena) + } + // there is state, inspect the projection to determine how to deal with it + (Projection {input, expr, schema, options}, Some(_)) => { + if can_pushdown_slice_past_projections(&expr, expr_arena).1 { + let lp = Projection {input, expr, schema, options}; + self.pushdown_and_continue(lp, state, lp_arena, expr_arena) + } + // don't push down slice, but restart optimization + else { + let lp = Projection {input, expr, schema, options}; + self.no_pushdown_restart_opt(lp, state, lp_arena, expr_arena) + } + } + (HStack {input, exprs, schema, options}, _) => { + let check = can_pushdown_slice_past_projections(&exprs, expr_arena); - if ( - // If the schema length is greater then an input column is being projected, so - // the exprs in with_columns do not need to have an input column name. - schema.len() > exprs.len() && check.0 - ) - || check.1 // e.g. select(c).with_columns(c = c + 1) - { - let lp = HStack {input, exprs, schema, options}; + if ( + // If the schema length is greater then an input column is being projected, so + // the exprs in with_columns do not need to have an input column name. + schema.len() > exprs.len() && check.0 + ) + || check.1 // e.g. select(c).with_columns(c = c + 1) + { + let lp = HStack {input, exprs, schema, options}; + self.pushdown_and_continue(lp, state, lp_arena, expr_arena) + } + // don't push down slice, but restart optimization + else { + let lp = HStack {input, exprs, schema, options}; + self.no_pushdown_restart_opt(lp, state, lp_arena, expr_arena) + } + } + (HConcat {inputs, schema, options}, _) => { + // Slice can always be pushed down for horizontal concatenation + let lp = HConcat {inputs, schema, options}; self.pushdown_and_continue(lp, state, lp_arena, expr_arena) } - // don't push down slice, but restart optimization - else { - let lp = HStack {input, exprs, schema, options}; - self.no_pushdown_restart_opt(lp, state, lp_arena, expr_arena) + (catch_all, state) => { + self.no_pushdown_finish_opt(catch_all, state, lp_arena) } } - (HConcat {inputs, schema, options}, _) => { - // Slice can always be pushed down for horizontal concatenation - let lp = HConcat {inputs, schema, options}; - self.pushdown_and_continue(lp, state, lp_arena, expr_arena) - } - (catch_all, state) => { - self.no_pushdown_finish_opt(catch_all, state, lp_arena) - } - - } + }) } pub fn optimize( diff --git a/crates/polars-plan/src/logical_plan/visitor/visitors.rs b/crates/polars-plan/src/logical_plan/visitor/visitors.rs index 1dfcb74187db..3f3e3436f401 100644 --- a/crates/polars-plan/src/logical_plan/visitor/visitors.rs +++ b/crates/polars-plan/src/logical_plan/visitor/visitors.rs @@ -1,3 +1,5 @@ +use polars_utils::recursion::with_dynamic_stack; + use super::*; /// An implementor of this trait decides how and in which order its nodes get traversed @@ -12,39 +14,43 @@ pub trait TreeWalker: Sized { /// Walks all nodes in depth-first-order. fn visit(&self, visitor: &mut dyn Visitor) -> PolarsResult { - match visitor.pre_visit(self)? { - VisitRecursion::Continue => {}, - // If the recursion should skip, do not apply to its children. And let the recursion continue - VisitRecursion::Skip => return Ok(VisitRecursion::Continue), - // If the recursion should stop, do not apply to its children - VisitRecursion::Stop => return Ok(VisitRecursion::Stop), - }; + with_dynamic_stack(|| { + match visitor.pre_visit(self)? { + VisitRecursion::Continue => {}, + // If the recursion should skip, do not apply to its children. And let the recursion continue + VisitRecursion::Skip => return Ok(VisitRecursion::Continue), + // If the recursion should stop, do not apply to its children + VisitRecursion::Stop => return Ok(VisitRecursion::Stop), + }; - match self.apply_children(&mut |node| node.visit(visitor))? { - // let the recursion continue - VisitRecursion::Continue | VisitRecursion::Skip => {}, - // If the recursion should stop, no further post visit will be performed - VisitRecursion::Stop => return Ok(VisitRecursion::Stop), - } + match self.apply_children(&mut |node| node.visit(visitor))? { + // let the recursion continue + VisitRecursion::Continue | VisitRecursion::Skip => {}, + // If the recursion should stop, no further post visit will be performed + VisitRecursion::Stop => return Ok(VisitRecursion::Stop), + } - visitor.post_visit(self) + visitor.post_visit(self) + }) } fn rewrite(self, rewriter: &mut dyn RewritingVisitor) -> PolarsResult { - let mutate_this_node = match rewriter.pre_visit(&self)? { - RewriteRecursion::MutateAndStop => return rewriter.mutate(self), - RewriteRecursion::Stop => return Ok(self), - RewriteRecursion::MutateAndContinue => true, - RewriteRecursion::NoMutateAndContinue => false, - }; + with_dynamic_stack(|| { + let mutate_this_node = match rewriter.pre_visit(&self)? { + RewriteRecursion::MutateAndStop => return rewriter.mutate(self), + RewriteRecursion::Stop => return Ok(self), + RewriteRecursion::MutateAndContinue => true, + RewriteRecursion::NoMutateAndContinue => false, + }; - let after_applied_children = self.map_children(&mut |node| node.rewrite(rewriter))?; + let after_applied_children = self.map_children(&mut |node| node.rewrite(rewriter))?; - if mutate_this_node { - rewriter.mutate(after_applied_children) - } else { - Ok(after_applied_children) - } + if mutate_this_node { + rewriter.mutate(after_applied_children) + } else { + Ok(after_applied_children) + } + }) } } diff --git a/crates/polars-utils/Cargo.toml b/crates/polars-utils/Cargo.toml index e2810b67daaf..c237d8d56a29 100644 --- a/crates/polars-utils/Cargo.toml +++ b/crates/polars-utils/Cargo.toml @@ -20,6 +20,7 @@ once_cell = { workspace = true } raw-cpuid = { workspace = true } rayon = { workspace = true } smartstring = { workspace = true } +stacker = { workspace = true } sysinfo = { version = "0.30", default-features = false, optional = true } [dev-dependencies] diff --git a/crates/polars-utils/src/lib.rs b/crates/polars-utils/src/lib.rs index 575571b62985..c1e5673f1720 100644 --- a/crates/polars-utils/src/lib.rs +++ b/crates/polars-utils/src/lib.rs @@ -14,6 +14,7 @@ pub mod hashing; pub mod idx_vec; pub mod mem; pub mod min_max; +pub mod recursion; pub mod slice; pub mod sort; pub mod sync; diff --git a/crates/polars-utils/src/recursion.rs b/crates/polars-utils/src/recursion.rs new file mode 100644 index 000000000000..a2db919dd07c --- /dev/null +++ b/crates/polars-utils/src/recursion.rs @@ -0,0 +1,6 @@ +const STACK_SIZE_GUARANTEE: usize = 256 * 1024; +const STACK_ALLOC_SIZE: usize = 2 * 1024 * 1024; + +pub fn with_dynamic_stack R>(f: F) -> R { + stacker::maybe_grow(STACK_SIZE_GUARANTEE, STACK_ALLOC_SIZE, f) +}