From 5f3e182a1df03bcf8018776276ef5bc7db579245 Mon Sep 17 00:00:00 2001 From: Ritchie Vink Date: Wed, 16 Nov 2022 16:41:04 +0100 Subject: [PATCH 1/2] fix(rust, python): fix duplicate caches in cse and prevent quadratic behavior --- .../src/logical_plan/optimizer/cse.rs | 19 +++++++++++++++++-- py-polars/tests/unit/test_cse.py | 18 ++++++++++++++++++ 2 files changed, 35 insertions(+), 2 deletions(-) diff --git a/polars/polars-lazy/polars-plan/src/logical_plan/optimizer/cse.rs b/polars/polars-lazy/polars-plan/src/logical_plan/optimizer/cse.rs index e248bc0c3484..8b74325a3393 100644 --- a/polars/polars-lazy/polars-plan/src/logical_plan/optimizer/cse.rs +++ b/polars/polars-lazy/polars-plan/src/logical_plan/optimizer/cse.rs @@ -1,6 +1,6 @@ //! Common Subplan Elimination -use std::collections::BTreeMap; +use std::collections::{BTreeMap, BTreeSet}; use std::hash::{BuildHasher, Hash, Hasher}; use polars_core::prelude::*; @@ -50,6 +50,10 @@ pub(super) fn collect_trails( collect_trails(*input_right, lp_arena, trails, id, true)?; } Union { inputs, .. } => { + if inputs.len() > 200 { + // don't even bother with cse on this many inputs + return None; + } let new_trail = trails.get(id).unwrap().clone(); let last_i = inputs.len() - 1; @@ -301,13 +305,21 @@ pub(crate) fn elim_cmn_subplans( // search from the leafs upwards and find the longest shared subplans let mut trail_ends = vec![]; + // if i matches j + // we don't need to search with j as they are equal + // this is very important as otherwise we get quadratic behavior + let mut to_skip = BTreeSet::new(); for i in 0..trails.len() { + if to_skip.contains(&i) { + continue; + } let trail_i = &trails[i]; // we only look forwards, then we traverse all combinations - for trail_j in trails.iter().skip(i + 1) { + for (j, trail_j) in trails.iter().enumerate().skip(i + 1) { if let Some(res) = longest_subgraph(trail_i, trail_j, lp_arena, expr_arena) { + to_skip.insert(j); trail_ends.push(res) } } @@ -359,6 +371,9 @@ pub(crate) fn elim_cmn_subplans( let node2 = combination.1 .0; let cache_id = match (cache_mapping.get(&node1), cache_mapping.get(&node2)) { + // (Some(_), Some(_)) => { + // continue + // } (Some(h), _) => *h, (_, Some(h)) => *h, _ => { diff --git a/py-polars/tests/unit/test_cse.py b/py-polars/tests/unit/test_cse.py index 3a27100726e0..b19476254c4f 100644 --- a/py-polars/tests/unit/test_cse.py +++ b/py-polars/tests/unit/test_cse.py @@ -1,3 +1,5 @@ +import re + import polars as pl @@ -13,3 +15,19 @@ def test_cse_rename_cross_join_5405() -> None: "A": [1, 2, 1, 2], "D": [5, None, None, 6], } + + +def test_union_duplicates() -> None: + n_dfs = 10 + df_lazy = pl.DataFrame({}).lazy() + lazy_dfs = [df_lazy for _ in range(n_dfs)] + assert ( + len( + re.findall( + r".*CACHE\[id: .*, count: 9].*", + pl.concat(lazy_dfs).describe_optimized_plan(), + flags=re.MULTILINE, + ) + ) + == 10 + ) From aa7b09535bb1826e884f8afed33f3512b42cf752 Mon Sep 17 00:00:00 2001 From: Ritchie Vink Date: Wed, 16 Nov 2022 17:28:48 +0100 Subject: [PATCH 2/2] only skip if entirely equal --- .../src/logical_plan/optimizer/cse.rs | 19 +++++++++++++------ 1 file changed, 13 insertions(+), 6 deletions(-) diff --git a/polars/polars-lazy/polars-plan/src/logical_plan/optimizer/cse.rs b/polars/polars-lazy/polars-plan/src/logical_plan/optimizer/cse.rs index 8b74325a3393..f77466feebed 100644 --- a/polars/polars-lazy/polars-plan/src/logical_plan/optimizer/cse.rs +++ b/polars/polars-lazy/polars-plan/src/logical_plan/optimizer/cse.rs @@ -250,7 +250,7 @@ fn longest_subgraph( trail_b: &Trail, lp_arena: &Arena, expr_arena: &Arena, -) -> Option<(Node, Node)> { +) -> Option<(Node, Node, bool)> { if trail_a.is_empty() || trail_b.is_empty() { return None; } @@ -258,8 +258,9 @@ fn longest_subgraph( let mut prev_node_b = Node(0); let mut is_equal; let mut i = 0; + let mut entirely_equal = trail_a.len() == trail_b.len(); - // iterates from the leafs upwards + // iterates from the leaves upwards for (node_a, node_b) in trail_a.iter().rev().zip(trail_b.iter().rev()) { // we never include the root that splits a trail // e.g. don't want to cache the join/union, but @@ -273,6 +274,7 @@ fn longest_subgraph( is_equal = lp_node_equal(a, b, expr_arena); if !is_equal { + entirely_equal = false; break; } @@ -282,7 +284,7 @@ fn longest_subgraph( } // previous node was equal if i > 0 { - Some((prev_node_a, prev_node_b)) + Some((prev_node_a, prev_node_b, entirely_equal)) } else { None } @@ -318,9 +320,14 @@ pub(crate) fn elim_cmn_subplans( // we only look forwards, then we traverse all combinations for (j, trail_j) in trails.iter().enumerate().skip(i + 1) { - if let Some(res) = longest_subgraph(trail_i, trail_j, lp_arena, expr_arena) { - to_skip.insert(j); - trail_ends.push(res) + if let Some((a, b, all_equal)) = + longest_subgraph(trail_i, trail_j, lp_arena, expr_arena) + { + // then we can skip `j` as we already searched with trail `i` which is equal + if all_equal { + to_skip.insert(j); + } + trail_ends.push((a, b)) } } }