From 815c061baa802aa72754eb247f462ef19ce17d50 Mon Sep 17 00:00:00 2001 From: hhj Date: Mon, 23 Oct 2023 17:14:26 +0800 Subject: [PATCH 01/11] fix: don't push down volatile predicates in projection --- datafusion/optimizer/src/push_down_filter.rs | 144 +++++++++++++++++-- 1 file changed, 130 insertions(+), 14 deletions(-) diff --git a/datafusion/optimizer/src/push_down_filter.rs b/datafusion/optimizer/src/push_down_filter.rs index 4c5cd3ab2855..252f9f9d49da 100644 --- a/datafusion/optimizer/src/push_down_filter.rs +++ b/datafusion/optimizer/src/push_down_filter.rs @@ -15,13 +15,14 @@ //! Push Down Filter optimizer rule ensures that filters are applied as early as possible in the plan use crate::optimizer::ApplyOrder; -use crate::utils::{conjunction, split_conjunction}; +use crate::utils::{conjunction, split_conjunction, split_conjunction_owned}; use crate::{utils, OptimizerConfig, OptimizerRule}; use datafusion_common::tree_node::{Transformed, TreeNode, VisitRecursion}; use datafusion_common::{ internal_err, plan_datafusion_err, Column, DFSchema, DataFusionError, Result, }; use datafusion_expr::expr::Alias; +use datafusion_expr::Volatility; use datafusion_expr::{ and, expr_rewriter::replace_col, @@ -652,9 +653,10 @@ impl OptimizerRule for PushDownFilter { child_plan.with_new_inputs(&[new_filter])? } LogicalPlan::Projection(projection) => { - // A projection is filter-commutable, but re-writes all predicate expressions + // A projection is filter-commutable if it do not contain volatile predicates or contain volatile + // predicates that are not used in the filter. However, we should re-writes all predicate expressions. // collect projection. - let replace_map = projection + let (volatile_map, non_volatile_map): (HashMap<_,_>, HashMap<_,_>) = projection .schema .fields() .iter() @@ -668,16 +670,40 @@ impl OptimizerRule for PushDownFilter { (field.qualified_name(), expr) }) - .collect::>(); + .partition(|(_, value)| matches!(value, Expr::ScalarFunction(f) if f.fun.volatility() == Volatility::Volatile)); - // re-write all filters based on this projection - // E.g. in `Filter: b\n Projection: a > 1 as b`, we can swap them, but the filter must be "a > 1" - let new_filter = LogicalPlan::Filter(Filter::try_new( - replace_cols_by_name(filter.predicate.clone(), &replace_map)?, - projection.input.clone(), - )?); + let mut push_predicates = vec![]; + let mut keep_predicates = vec![]; + for expr in split_conjunction_owned(filter.predicate.clone()).into_iter() + { + if contain(expr.clone(), &volatile_map.clone())? { + keep_predicates.push(expr); + } else { + push_predicates.push(expr); + } + } - child_plan.with_new_inputs(&[new_filter])? + match conjunction(push_predicates) { + Some(expr) => { + // re-write all filters based on this projection + // E.g. in `Filter: b\n Projection: a > 1 as b`, we can swap them, but the filter must be "a > 1" + let new_filter = LogicalPlan::Filter(Filter::try_new( + replace_cols_by_name(expr, &non_volatile_map)?, + projection.input.clone(), + )?); + + if keep_predicates.is_empty() { + child_plan.with_new_inputs(&[new_filter])? + } else { + let child_plan = child_plan.with_new_inputs(&[new_filter])?; + LogicalPlan::Filter(Filter::try_new( + conjunction(keep_predicates).unwrap(), + Arc::new(child_plan), + )?) + } + } + None => return Ok(None), + } } LogicalPlan::Union(union) => { let mut inputs = Vec::with_capacity(union.inputs.len()); @@ -881,6 +907,25 @@ pub fn replace_cols_by_name( }) } +/// check whether the expression uses the columns in `check_map`. +pub fn contain(e: Expr, check_map: &HashMap) -> Result { + let mut is_contain = false; + e.apply(&mut |expr| { + Ok(if let Expr::Column(c) = &expr { + match check_map.get(&c.flat_name()) { + Some(_) => { + is_contain = true; + VisitRecursion::Stop + } + None => VisitRecursion::Continue, + } + } else { + VisitRecursion::Continue + }) + })?; + Ok(is_contain) +} + #[cfg(test)] mod tests { use super::*; @@ -893,9 +938,9 @@ mod tests { use datafusion_common::{DFSchema, DFSchemaRef}; use datafusion_expr::logical_plan::table_scan; use datafusion_expr::{ - and, col, in_list, in_subquery, lit, logical_plan::JoinType, or, sum, BinaryExpr, - Expr, Extension, LogicalPlanBuilder, Operator, TableSource, TableType, - UserDefinedLogicalNodeCore, + and, col, in_list, in_subquery, lit, logical_plan::JoinType, or, random, sum, + BinaryExpr, Expr, Extension, LogicalPlanBuilder, Operator, TableSource, + TableType, UserDefinedLogicalNodeCore, }; use std::fmt::{Debug, Formatter}; use std::sync::Arc; @@ -2712,4 +2757,75 @@ Projection: a, b \n TableScan: test2"; assert_optimized_plan_eq(&plan, expected) } + + #[test] + fn test_push_down_volatile_function_in_aggregate() -> Result<()> { + // SELECT t.a, t.r FROM (SELECT a, SUM(b), random() AS r FROM test1 GROUP BY a) AS t WHERE t.a > 5 AND t.r > 0.5; + let table_scan = test_table_scan_with_name("test1")?; + let plan = LogicalPlanBuilder::from(table_scan) + .aggregate(vec![col("a")], vec![sum(col("b"))])? + .project(vec![col("a"), sum(col("b")), random().alias("r")])? + .alias("t")? + .filter(col("t.a").gt(lit(5)).and(col("t.r").gt(lit(0.5))))? + .project(vec![col("t.a"), col("t.r")])? + .build()?; + + let expected_before = "Projection: t.a, t.r\ + \n Filter: t.a > Int32(5) AND t.r > Float64(0.5)\ + \n SubqueryAlias: t\ + \n Projection: test1.a, SUM(test1.b), random() AS r\ + \n Aggregate: groupBy=[[test1.a]], aggr=[[SUM(test1.b)]]\ + \n TableScan: test1"; + assert_eq!(format!("{plan:?}"), expected_before); + + let expected_after = "Projection: t.a, t.r\ + \n SubqueryAlias: t\ + \n Filter: r > Float64(0.5)\ + \n Projection: test1.a, SUM(test1.b), random() AS r\ + \n Aggregate: groupBy=[[test1.a]], aggr=[[SUM(test1.b)]]\ + \n TableScan: test1, full_filters=[test1.a > Int32(5)]"; + assert_optimized_plan_eq(&plan, expected_after) + } + + #[test] + fn test_push_down_volatile_function_in_join() -> Result<()> { + // SELECT t.a, t.r FROM (SELECT test1.a AS a, random() AS r FROM test1 join test2 ON test1.a = test2.a) AS t WHERE t.r > 0.5; + let table_scan = test_table_scan_with_name("test1")?; + let left = LogicalPlanBuilder::from(table_scan).build()?; + let right_table_scan = test_table_scan_with_name("test2")?; + let right = LogicalPlanBuilder::from(right_table_scan).build()?; + let plan = LogicalPlanBuilder::from(left) + .join( + right, + JoinType::Inner, + ( + vec![Column::from_qualified_name("test1.a")], + vec![Column::from_qualified_name("test2.a")], + ), + None, + )? + .project(vec![col("test1.a").alias("a"), random().alias("r")])? + .alias("t")? + .filter(col("t.r").gt(lit(0.8)))? + .project(vec![col("t.a"), col("t.r")])? + .build()?; + + let expected_before = "Projection: t.a, t.r\ + \n Filter: t.r > Float64(0.8)\ + \n SubqueryAlias: t\ + \n Projection: test1.a AS a, random() AS r\ + \n Inner Join: test1.a = test2.a\ + \n TableScan: test1\ + \n TableScan: test2"; + assert_eq!(format!("{plan:?}"), expected_before); + + let expected = "Projection: t.a, t.r\ + \n SubqueryAlias: t\ + \n Filter: r > Float64(0.8)\ + \n Projection: test1.a AS a, random() AS r\ + \n Inner Join: test1.a = test2.a\ + \n TableScan: test1\ + \n TableScan: test2"; + assert_optimized_plan_eq(&plan, expected) + } } From 589be272bc1fc0679e2dc482b681d9fb0c629187 Mon Sep 17 00:00:00 2001 From: Huaijin Date: Tue, 24 Oct 2023 10:04:50 +0800 Subject: [PATCH 02/11] Update datafusion/optimizer/src/push_down_filter.rs Co-authored-by: Andrew Lamb --- datafusion/optimizer/src/push_down_filter.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/datafusion/optimizer/src/push_down_filter.rs b/datafusion/optimizer/src/push_down_filter.rs index 252f9f9d49da..b590fa90a5f2 100644 --- a/datafusion/optimizer/src/push_down_filter.rs +++ b/datafusion/optimizer/src/push_down_filter.rs @@ -676,7 +676,7 @@ impl OptimizerRule for PushDownFilter { let mut keep_predicates = vec![]; for expr in split_conjunction_owned(filter.predicate.clone()).into_iter() { - if contain(expr.clone(), &volatile_map.clone())? { + if contain(expr.clone(), volatile_map)? { keep_predicates.push(expr); } else { push_predicates.push(expr); From 653d8809e88107b1421567ea1172c30aa6e2a0ea Mon Sep 17 00:00:00 2001 From: Huaijin Date: Tue, 24 Oct 2023 10:04:59 +0800 Subject: [PATCH 03/11] Update datafusion/optimizer/src/push_down_filter.rs Co-authored-by: Andrew Lamb --- datafusion/optimizer/src/push_down_filter.rs | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/datafusion/optimizer/src/push_down_filter.rs b/datafusion/optimizer/src/push_down_filter.rs index b590fa90a5f2..d3e2c1d5eaea 100644 --- a/datafusion/optimizer/src/push_down_filter.rs +++ b/datafusion/optimizer/src/push_down_filter.rs @@ -692,12 +692,12 @@ impl OptimizerRule for PushDownFilter { projection.input.clone(), )?); - if keep_predicates.is_empty() { - child_plan.with_new_inputs(&[new_filter])? - } else { + match conjunction(keep_predicates) { + None => child_plan.with_new_inputs(&[new_filter])? + Some(keep_predicate) => { let child_plan = child_plan.with_new_inputs(&[new_filter])?; LogicalPlan::Filter(Filter::try_new( - conjunction(keep_predicates).unwrap(), + keep_predicate Arc::new(child_plan), )?) } From 8802de6c1643e55559b3063c12e428f36b6bd107 Mon Sep 17 00:00:00 2001 From: Huaijin Date: Tue, 24 Oct 2023 10:05:20 +0800 Subject: [PATCH 04/11] Update datafusion/optimizer/src/push_down_filter.rs Co-authored-by: Andrew Lamb --- datafusion/optimizer/src/push_down_filter.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/datafusion/optimizer/src/push_down_filter.rs b/datafusion/optimizer/src/push_down_filter.rs index d3e2c1d5eaea..7bc43fe15476 100644 --- a/datafusion/optimizer/src/push_down_filter.rs +++ b/datafusion/optimizer/src/push_down_filter.rs @@ -2764,7 +2764,7 @@ Projection: a, b let table_scan = test_table_scan_with_name("test1")?; let plan = LogicalPlanBuilder::from(table_scan) .aggregate(vec![col("a")], vec![sum(col("b"))])? - .project(vec![col("a"), sum(col("b")), random().alias("r")])? + .project(vec![col("a"), sum(col("b")), random().add(lit(1)).alias("r")])? .alias("t")? .filter(col("t.a").gt(lit(5)).and(col("t.r").gt(lit(0.5))))? .project(vec![col("t.a"), col("t.r")])? From 515d5e208eea5828c256745aeb8e2bb9e6ba61b2 Mon Sep 17 00:00:00 2001 From: hhj Date: Tue, 24 Oct 2023 11:06:31 +0800 Subject: [PATCH 05/11] add suggestions --- datafusion/optimizer/src/push_down_filter.rs | 56 ++++++++++++++------ 1 file changed, 39 insertions(+), 17 deletions(-) diff --git a/datafusion/optimizer/src/push_down_filter.rs b/datafusion/optimizer/src/push_down_filter.rs index 7bc43fe15476..dd2b67b6d05e 100644 --- a/datafusion/optimizer/src/push_down_filter.rs +++ b/datafusion/optimizer/src/push_down_filter.rs @@ -676,7 +676,7 @@ impl OptimizerRule for PushDownFilter { let mut keep_predicates = vec![]; for expr in split_conjunction_owned(filter.predicate.clone()).into_iter() { - if contain(expr.clone(), volatile_map)? { + if contain(expr.clone(), &volatile_map) { keep_predicates.push(expr); } else { push_predicates.push(expr); @@ -693,13 +693,15 @@ impl OptimizerRule for PushDownFilter { )?); match conjunction(keep_predicates) { - None => child_plan.with_new_inputs(&[new_filter])? - Some(keep_predicate) => { - let child_plan = child_plan.with_new_inputs(&[new_filter])?; - LogicalPlan::Filter(Filter::try_new( - keep_predicate - Arc::new(child_plan), - )?) + None => child_plan.with_new_inputs(&[new_filter])?, + Some(keep_predicate) => { + let child_plan = + child_plan.with_new_inputs(&[new_filter])?; + LogicalPlan::Filter(Filter::try_new( + keep_predicate, + Arc::new(child_plan), + )?) + } } } None => return Ok(None), @@ -907,8 +909,24 @@ pub fn replace_cols_by_name( }) } +/// check whether the expression is volatile predicates +pub fn is_volatile_expression(e: Expr) -> bool { + let mut is_volatile = false; + e.apply(&mut |expr| { + Ok(match expr { + Expr::ScalarFunction(f) if f.fun.volatility() == Volatility::Volatile => { + is_volatile = true; + VisitRecursion::Stop + } + _ => VisitRecursion::Continue, + }) + }) + .unwrap(); + is_volatile +} + /// check whether the expression uses the columns in `check_map`. -pub fn contain(e: Expr, check_map: &HashMap) -> Result { +pub fn contain(e: Expr, check_map: &HashMap) -> bool { let mut is_contain = false; e.apply(&mut |expr| { Ok(if let Expr::Column(c) = &expr { @@ -922,8 +940,9 @@ pub fn contain(e: Expr, check_map: &HashMap) -> Result { } else { VisitRecursion::Continue }) - })?; - Ok(is_contain) + }) + .unwrap(); + is_contain } #[cfg(test)] @@ -2764,7 +2783,11 @@ Projection: a, b let table_scan = test_table_scan_with_name("test1")?; let plan = LogicalPlanBuilder::from(table_scan) .aggregate(vec![col("a")], vec![sum(col("b"))])? - .project(vec![col("a"), sum(col("b")), random().add(lit(1)).alias("r")])? + .project(vec![ + col("a"), + sum(col("b")), + add(random(), lit(1)).alias("r"), + ])? .alias("t")? .filter(col("t.a").gt(lit(5)).and(col("t.r").gt(lit(0.5))))? .project(vec![col("t.a"), col("t.r")])? @@ -2773,17 +2796,16 @@ Projection: a, b let expected_before = "Projection: t.a, t.r\ \n Filter: t.a > Int32(5) AND t.r > Float64(0.5)\ \n SubqueryAlias: t\ - \n Projection: test1.a, SUM(test1.b), random() AS r\ + \n Projection: test1.a, SUM(test1.b), random() + Int32(1) AS r\ \n Aggregate: groupBy=[[test1.a]], aggr=[[SUM(test1.b)]]\ \n TableScan: test1"; assert_eq!(format!("{plan:?}"), expected_before); let expected_after = "Projection: t.a, t.r\ \n SubqueryAlias: t\ - \n Filter: r > Float64(0.5)\ - \n Projection: test1.a, SUM(test1.b), random() AS r\ - \n Aggregate: groupBy=[[test1.a]], aggr=[[SUM(test1.b)]]\ - \n TableScan: test1, full_filters=[test1.a > Int32(5)]"; + \n Projection: test1.a, SUM(test1.b), random() + Int32(1) AS r\ + \n Aggregate: groupBy=[[test1.a]], aggr=[[SUM(test1.b)]]\ + \n TableScan: test1, full_filters=[test1.a > Int32(5), random() + Int32(1) > Float64(0.5)]"; assert_optimized_plan_eq(&plan, expected_after) } From eb929372731e4999f2a9b1baba4deb286acf7614 Mon Sep 17 00:00:00 2001 From: hhj Date: Tue, 24 Oct 2023 11:14:42 +0800 Subject: [PATCH 06/11] fix --- datafusion/optimizer/src/push_down_filter.rs | 38 ++++++++++---------- 1 file changed, 20 insertions(+), 18 deletions(-) diff --git a/datafusion/optimizer/src/push_down_filter.rs b/datafusion/optimizer/src/push_down_filter.rs index dd2b67b6d05e..d77a760b9f9a 100644 --- a/datafusion/optimizer/src/push_down_filter.rs +++ b/datafusion/optimizer/src/push_down_filter.rs @@ -656,21 +656,22 @@ impl OptimizerRule for PushDownFilter { // A projection is filter-commutable if it do not contain volatile predicates or contain volatile // predicates that are not used in the filter. However, we should re-writes all predicate expressions. // collect projection. - let (volatile_map, non_volatile_map): (HashMap<_,_>, HashMap<_,_>) = projection - .schema - .fields() - .iter() - .enumerate() - .map(|(i, field)| { - // strip alias, as they should not be part of filters - let expr = match &projection.expr[i] { - Expr::Alias(Alias { expr, .. }) => expr.as_ref().clone(), - expr => expr.clone(), - }; - - (field.qualified_name(), expr) - }) - .partition(|(_, value)| matches!(value, Expr::ScalarFunction(f) if f.fun.volatility() == Volatility::Volatile)); + let (volatile_map, non_volatile_map): (HashMap<_, _>, HashMap<_, _>) = + projection + .schema + .fields() + .iter() + .enumerate() + .map(|(i, field)| { + // strip alias, as they should not be part of filters + let expr = match &projection.expr[i] { + Expr::Alias(Alias { expr, .. }) => expr.as_ref().clone(), + expr => expr.clone(), + }; + + (field.qualified_name(), expr) + }) + .partition(|(_, value)| is_volatile_expression(value.clone())); let mut push_predicates = vec![]; let mut keep_predicates = vec![]; @@ -2803,9 +2804,10 @@ Projection: a, b let expected_after = "Projection: t.a, t.r\ \n SubqueryAlias: t\ - \n Projection: test1.a, SUM(test1.b), random() + Int32(1) AS r\ - \n Aggregate: groupBy=[[test1.a]], aggr=[[SUM(test1.b)]]\ - \n TableScan: test1, full_filters=[test1.a > Int32(5), random() + Int32(1) > Float64(0.5)]"; + \n Filter: r > Float64(0.5)\ + \n Projection: test1.a, SUM(test1.b), random() + Int32(1) AS r\ + \n Aggregate: groupBy=[[test1.a]], aggr=[[SUM(test1.b)]]\ + \n TableScan: test1, full_filters=[test1.a > Int32(5)]"; assert_optimized_plan_eq(&plan, expected_after) } From c6b41edecaac15a9f9778fd320748faa835d2800 Mon Sep 17 00:00:00 2001 From: hhj Date: Tue, 24 Oct 2023 11:25:04 +0800 Subject: [PATCH 07/11] fix doc --- datafusion/optimizer/src/push_down_filter.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/datafusion/optimizer/src/push_down_filter.rs b/datafusion/optimizer/src/push_down_filter.rs index d77a760b9f9a..aee138277529 100644 --- a/datafusion/optimizer/src/push_down_filter.rs +++ b/datafusion/optimizer/src/push_down_filter.rs @@ -2780,7 +2780,7 @@ Projection: a, b #[test] fn test_push_down_volatile_function_in_aggregate() -> Result<()> { - // SELECT t.a, t.r FROM (SELECT a, SUM(b), random() AS r FROM test1 GROUP BY a) AS t WHERE t.a > 5 AND t.r > 0.5; + // SELECT t.a, t.r FROM (SELECT a, SUM(b), random()+1 AS r FROM test1 GROUP BY a) AS t WHERE t.a > 5 AND t.r > 0.5; let table_scan = test_table_scan_with_name("test1")?; let plan = LogicalPlanBuilder::from(table_scan) .aggregate(vec![col("a")], vec![sum(col("b"))])? From d4ad5c21a4a032686279ea2ab9c44d4796dda3f3 Mon Sep 17 00:00:00 2001 From: Huaijin Date: Tue, 24 Oct 2023 16:14:57 +0800 Subject: [PATCH 08/11] Update datafusion/optimizer/src/push_down_filter.rs Co-authored-by: Jonah Gao --- datafusion/optimizer/src/push_down_filter.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/datafusion/optimizer/src/push_down_filter.rs b/datafusion/optimizer/src/push_down_filter.rs index aee138277529..c91f82290ce8 100644 --- a/datafusion/optimizer/src/push_down_filter.rs +++ b/datafusion/optimizer/src/push_down_filter.rs @@ -911,7 +911,7 @@ pub fn replace_cols_by_name( } /// check whether the expression is volatile predicates -pub fn is_volatile_expression(e: Expr) -> bool { +fn is_volatile_expression(e: &Expr) -> bool { let mut is_volatile = false; e.apply(&mut |expr| { Ok(match expr { From 9d20700151a23faeb1cd470a0830518ee8bc8310 Mon Sep 17 00:00:00 2001 From: Huaijin Date: Tue, 24 Oct 2023 16:15:07 +0800 Subject: [PATCH 09/11] Update datafusion/optimizer/src/push_down_filter.rs Co-authored-by: Jonah Gao --- datafusion/optimizer/src/push_down_filter.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/datafusion/optimizer/src/push_down_filter.rs b/datafusion/optimizer/src/push_down_filter.rs index c91f82290ce8..39086c157c69 100644 --- a/datafusion/optimizer/src/push_down_filter.rs +++ b/datafusion/optimizer/src/push_down_filter.rs @@ -671,7 +671,7 @@ impl OptimizerRule for PushDownFilter { (field.qualified_name(), expr) }) - .partition(|(_, value)| is_volatile_expression(value.clone())); + .partition(|(_, value)| is_volatile_expression(value)); let mut push_predicates = vec![]; let mut keep_predicates = vec![]; From 798d323f1aa84c963e18c07181028e9d1927c93a Mon Sep 17 00:00:00 2001 From: Huaijin Date: Tue, 24 Oct 2023 16:15:13 +0800 Subject: [PATCH 10/11] Update datafusion/optimizer/src/push_down_filter.rs Co-authored-by: Jonah Gao --- datafusion/optimizer/src/push_down_filter.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/datafusion/optimizer/src/push_down_filter.rs b/datafusion/optimizer/src/push_down_filter.rs index 39086c157c69..46d7660cebae 100644 --- a/datafusion/optimizer/src/push_down_filter.rs +++ b/datafusion/optimizer/src/push_down_filter.rs @@ -677,7 +677,7 @@ impl OptimizerRule for PushDownFilter { let mut keep_predicates = vec![]; for expr in split_conjunction_owned(filter.predicate.clone()).into_iter() { - if contain(expr.clone(), &volatile_map) { + if contain(&expr, &volatile_map) { keep_predicates.push(expr); } else { push_predicates.push(expr); From 46f01785346a79bc33732b7cf76059ad11e85ac4 Mon Sep 17 00:00:00 2001 From: Huaijin Date: Tue, 24 Oct 2023 16:15:21 +0800 Subject: [PATCH 11/11] Update datafusion/optimizer/src/push_down_filter.rs Co-authored-by: Jonah Gao --- datafusion/optimizer/src/push_down_filter.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/datafusion/optimizer/src/push_down_filter.rs b/datafusion/optimizer/src/push_down_filter.rs index 46d7660cebae..8c2eb96a48d8 100644 --- a/datafusion/optimizer/src/push_down_filter.rs +++ b/datafusion/optimizer/src/push_down_filter.rs @@ -927,7 +927,7 @@ fn is_volatile_expression(e: &Expr) -> bool { } /// check whether the expression uses the columns in `check_map`. -pub fn contain(e: Expr, check_map: &HashMap) -> bool { +fn contain(e: &Expr, check_map: &HashMap) -> bool { let mut is_contain = false; e.apply(&mut |expr| { Ok(if let Expr::Column(c) = &expr {