From de682da835ec39eeb2c086e6f5f71e69eb942b5e Mon Sep 17 00:00:00 2001 From: Jesse Bakker Date: Wed, 29 Nov 2023 10:27:05 +0100 Subject: [PATCH] Move some datafusion-optimizer::utils down to datafusion-expr::utils These utils manipulate `LogicalPlan`s and `Expr`s and may be useful in projects that only depend on `datafusion-expr` --- benchmarks/src/parquet_filter.rs | 2 +- .../core/src/datasource/listing/table.rs | 4 +- .../core/tests/parquet/filter_pushdown.rs | 2 +- datafusion/expr/src/utils.rs | 379 ++++++++++++++++- datafusion/optimizer/src/analyzer/subquery.rs | 3 +- .../optimizer/src/analyzer/type_coercion.rs | 7 +- datafusion/optimizer/src/decorrelate.rs | 5 +- .../src/decorrelate_predicate_subquery.rs | 3 +- .../src/extract_equijoin_predicate.rs | 3 +- datafusion/optimizer/src/optimizer.rs | 2 +- datafusion/optimizer/src/push_down_filter.rs | 16 +- .../optimizer/src/scalar_subquery_to_join.rs | 3 +- .../simplify_expressions/simplify_exprs.rs | 2 +- .../src/unwrap_cast_in_comparison.rs | 2 +- datafusion/optimizer/src/utils.rs | 399 +++++------------- .../substrait/src/logical_plan/consumer.rs | 2 +- 16 files changed, 498 insertions(+), 336 deletions(-) diff --git a/benchmarks/src/parquet_filter.rs b/benchmarks/src/parquet_filter.rs index e19596b80f54e..1d816908e2b04 100644 --- a/benchmarks/src/parquet_filter.rs +++ b/benchmarks/src/parquet_filter.rs @@ -19,8 +19,8 @@ use crate::AccessLogOpt; use crate::{BenchmarkRun, CommonOpt}; use arrow::util::pretty; use datafusion::common::Result; +use datafusion::logical_expr::utils::disjunction; use datafusion::logical_expr::{lit, or, Expr}; -use datafusion::optimizer::utils::disjunction; use datafusion::physical_plan::collect; use datafusion::prelude::{col, SessionContext}; use datafusion::test_util::parquet::{ParquetScanOptions, TestParquetFile}; diff --git a/datafusion/core/src/datasource/listing/table.rs b/datafusion/core/src/datasource/listing/table.rs index 515bc8a9e612b..a3be57db3a83e 100644 --- a/datafusion/core/src/datasource/listing/table.rs +++ b/datafusion/core/src/datasource/listing/table.rs @@ -40,11 +40,10 @@ use crate::datasource::{ physical_plan::{is_plan_streaming, FileScanConfig, FileSinkConfig}, TableProvider, TableType, }; -use crate::logical_expr::TableProviderFilterPushDown; use crate::{ error::{DataFusionError, Result}, execution::context::SessionState, - logical_expr::Expr, + logical_expr::{utils::conjunction, Expr, TableProviderFilterPushDown}, physical_plan::{empty::EmptyExec, ExecutionPlan, Statistics}, }; @@ -56,7 +55,6 @@ use datafusion_common::{ }; use datafusion_execution::cache::cache_manager::FileStatisticsCache; use datafusion_execution::cache::cache_unit::DefaultFileStatisticsCache; -use datafusion_optimizer::utils::conjunction; use datafusion_physical_expr::{ create_physical_expr, LexOrdering, PhysicalSortRequirement, }; diff --git a/datafusion/core/tests/parquet/filter_pushdown.rs b/datafusion/core/tests/parquet/filter_pushdown.rs index 61a8f87b9ea58..f214e8903a4f8 100644 --- a/datafusion/core/tests/parquet/filter_pushdown.rs +++ b/datafusion/core/tests/parquet/filter_pushdown.rs @@ -34,7 +34,7 @@ use datafusion::physical_plan::collect; use datafusion::physical_plan::metrics::MetricsSet; use datafusion::prelude::{col, lit, lit_timestamp_nano, Expr, SessionContext}; use datafusion::test_util::parquet::{ParquetScanOptions, TestParquetFile}; -use datafusion_optimizer::utils::{conjunction, disjunction, split_conjunction}; +use datafusion_expr::utils::{conjunction, disjunction, split_conjunction}; use itertools::Itertools; use parquet::file::properties::WriterProperties; use tempfile::TempDir; diff --git a/datafusion/expr/src/utils.rs b/datafusion/expr/src/utils.rs index d8668fba8e1e4..f577d5fc13dbb 100644 --- a/datafusion/expr/src/utils.rs +++ b/datafusion/expr/src/utils.rs @@ -18,9 +18,13 @@ //! Expression utilities use crate::expr::{Alias, Sort, WindowFunction}; +use crate::expr_rewriter::strip_outer_reference; use crate::logical_plan::Aggregate; use crate::signature::{Signature, TypeSignature}; -use crate::{Cast, Expr, ExprSchemable, GroupingSet, LogicalPlan, TryCast}; +use crate::{ + and, BinaryExpr, Cast, Expr, ExprSchemable, Filter, GroupingSet, LogicalPlan, + Operator, TryCast, +}; use arrow::datatypes::{DataType, TimeUnit}; use datafusion_common::tree_node::{TreeNode, VisitRecursion}; use datafusion_common::{ @@ -30,6 +34,7 @@ use datafusion_common::{ use sqlparser::ast::{ExceptSelectItem, ExcludeSelectItem, WildcardAdditionalOptions}; use std::cmp::Ordering; use std::collections::HashSet; +use std::sync::Arc; /// The value to which `COUNT(*)` is expanded to in /// `COUNT()` expressions @@ -1004,6 +1009,239 @@ pub fn generate_signature_error_msg( ) } +/// Splits a conjunctive [`Expr`] such as `A AND B AND C` => `[A, B, C]` +/// +/// See [`split_conjunction_owned`] for more details and an example. +pub fn split_conjunction(expr: &Expr) -> Vec<&Expr> { + split_conjunction_impl(expr, vec![]) +} + +fn split_conjunction_impl<'a>(expr: &'a Expr, mut exprs: Vec<&'a Expr>) -> Vec<&'a Expr> { + match expr { + Expr::BinaryExpr(BinaryExpr { + right, + op: Operator::And, + left, + }) => { + let exprs = split_conjunction_impl(left, exprs); + split_conjunction_impl(right, exprs) + } + Expr::Alias(Alias { expr, .. }) => split_conjunction_impl(expr, exprs), + other => { + exprs.push(other); + exprs + } + } +} + +/// Splits an owned conjunctive [`Expr`] such as `A AND B AND C` => `[A, B, C]` +/// +/// This is often used to "split" filter expressions such as `col1 = 5 +/// AND col2 = 10` into [`col1 = 5`, `col2 = 10`]; +/// +/// # Example +/// ``` +/// # use datafusion_expr::{col, lit}; +/// # use datafusion_optimizer::utils::split_conjunction_owned; +/// // a=1 AND b=2 +/// let expr = col("a").eq(lit(1)).and(col("b").eq(lit(2))); +/// +/// // [a=1, b=2] +/// let split = vec![ +/// col("a").eq(lit(1)), +/// col("b").eq(lit(2)), +/// ]; +/// +/// // use split_conjunction_owned to split them +/// assert_eq!(split_conjunction_owned(expr), split); +/// ``` +pub fn split_conjunction_owned(expr: Expr) -> Vec { + split_binary_owned(expr, Operator::And) +} + +/// Splits an owned binary operator tree [`Expr`] such as `A B C` => `[A, B, C]` +/// +/// This is often used to "split" expressions such as `col1 = 5 +/// AND col2 = 10` into [`col1 = 5`, `col2 = 10`]; +/// +/// # Example +/// ``` +/// # use datafusion_expr::{col, lit, Operator}; +/// # use datafusion_optimizer::utils::split_binary_owned; +/// # use std::ops::Add; +/// // a=1 + b=2 +/// let expr = col("a").eq(lit(1)).add(col("b").eq(lit(2))); +/// +/// // [a=1, b=2] +/// let split = vec![ +/// col("a").eq(lit(1)), +/// col("b").eq(lit(2)), +/// ]; +/// +/// // use split_binary_owned to split them +/// assert_eq!(split_binary_owned(expr, Operator::Plus), split); +/// ``` +pub fn split_binary_owned(expr: Expr, op: Operator) -> Vec { + split_binary_owned_impl(expr, op, vec![]) +} + +fn split_binary_owned_impl( + expr: Expr, + operator: Operator, + mut exprs: Vec, +) -> Vec { + match expr { + Expr::BinaryExpr(BinaryExpr { right, op, left }) if op == operator => { + let exprs = split_binary_owned_impl(*left, operator, exprs); + split_binary_owned_impl(*right, operator, exprs) + } + Expr::Alias(Alias { expr, .. }) => { + split_binary_owned_impl(*expr, operator, exprs) + } + other => { + exprs.push(other); + exprs + } + } +} + +/// Splits an binary operator tree [`Expr`] such as `A B C` => `[A, B, C]` +/// +/// See [`split_binary_owned`] for more details and an example. +pub fn split_binary(expr: &Expr, op: Operator) -> Vec<&Expr> { + split_binary_impl(expr, op, vec![]) +} + +fn split_binary_impl<'a>( + expr: &'a Expr, + operator: Operator, + mut exprs: Vec<&'a Expr>, +) -> Vec<&'a Expr> { + match expr { + Expr::BinaryExpr(BinaryExpr { right, op, left }) if *op == operator => { + let exprs = split_binary_impl(left, operator, exprs); + split_binary_impl(right, operator, exprs) + } + Expr::Alias(Alias { expr, .. }) => split_binary_impl(expr, operator, exprs), + other => { + exprs.push(other); + exprs + } + } +} + +/// Combines an array of filter expressions into a single filter +/// expression consisting of the input filter expressions joined with +/// logical AND. +/// +/// Returns None if the filters array is empty. +/// +/// # Example +/// ``` +/// # use datafusion_expr::{col, lit}; +/// # use datafusion_optimizer::utils::conjunction; +/// // a=1 AND b=2 +/// let expr = col("a").eq(lit(1)).and(col("b").eq(lit(2))); +/// +/// // [a=1, b=2] +/// let split = vec![ +/// col("a").eq(lit(1)), +/// col("b").eq(lit(2)), +/// ]; +/// +/// // use conjunction to join them together with `AND` +/// assert_eq!(conjunction(split), Some(expr)); +/// ``` +pub fn conjunction(filters: impl IntoIterator) -> Option { + filters.into_iter().reduce(|accum, expr| accum.and(expr)) +} + +/// Combines an array of filter expressions into a single filter +/// expression consisting of the input filter expressions joined with +/// logical OR. +/// +/// Returns None if the filters array is empty. +pub fn disjunction(filters: impl IntoIterator) -> Option { + filters.into_iter().reduce(|accum, expr| accum.or(expr)) +} + +/// returns a new [LogicalPlan] that wraps `plan` in a [LogicalPlan::Filter] with +/// its predicate be all `predicates` ANDed. +pub fn add_filter(plan: LogicalPlan, predicates: &[&Expr]) -> Result { + // reduce filters to a single filter with an AND + let predicate = predicates + .iter() + .skip(1) + .fold(predicates[0].clone(), |acc, predicate| { + and(acc, (*predicate).to_owned()) + }); + + Ok(LogicalPlan::Filter(Filter::try_new( + predicate, + Arc::new(plan), + )?)) +} + +/// Looks for correlating expressions: for example, a binary expression with one field from the subquery, and +/// one not in the subquery (closed upon from outer scope) +/// +/// # Arguments +/// +/// * `exprs` - List of expressions that may or may not be joins +/// +/// # Return value +/// +/// Tuple of (expressions containing joins, remaining non-join expressions) +pub fn find_join_exprs(exprs: Vec<&Expr>) -> Result<(Vec, Vec)> { + let mut joins = vec![]; + let mut others = vec![]; + for filter in exprs.into_iter() { + // If the expression contains correlated predicates, add it to join filters + if filter.contains_outer() { + if !matches!(filter, Expr::BinaryExpr(BinaryExpr{ left, op: Operator::Eq, right }) if left.eq(right)) + { + joins.push(strip_outer_reference((*filter).clone())); + } + } else { + others.push((*filter).clone()); + } + } + + Ok((joins, others)) +} + +/// Returns the first (and only) element in a slice, or an error +/// +/// # Arguments +/// +/// * `slice` - The slice to extract from +/// +/// # Return value +/// +/// The first element, or an error +pub fn only_or_err(slice: &[T]) -> Result<&T> { + match slice { + [it] => Ok(it), + [] => plan_err!("No items found!"), + _ => plan_err!("More than one item found!"), + } +} + +/// merge inputs schema into a single schema. +pub fn merge_schema(inputs: Vec<&LogicalPlan>) -> DFSchema { + if inputs.len() == 1 { + inputs[0].schema().clone().as_ref().clone() + } else { + inputs.iter().map(|input| input.schema()).fold( + DFSchema::empty(), + |mut lhs, rhs| { + lhs.merge(rhs); + lhs + }, + ) + } +} + #[cfg(test)] mod tests { use super::*; @@ -1322,4 +1560,143 @@ mod tests { Ok(()) } + #[test] + fn test_split_conjunction() { + let expr = col("a"); + let result = split_conjunction(&expr); + assert_eq!(result, vec![&expr]); + } + + #[test] + fn test_split_conjunction_two() { + let expr = col("a").eq(lit(5)).and(col("b")); + let expr1 = col("a").eq(lit(5)); + let expr2 = col("b"); + + let result = split_conjunction(&expr); + assert_eq!(result, vec![&expr1, &expr2]); + } + + #[test] + fn test_split_conjunction_alias() { + let expr = col("a").eq(lit(5)).and(col("b").alias("the_alias")); + let expr1 = col("a").eq(lit(5)); + let expr2 = col("b"); // has no alias + + let result = split_conjunction(&expr); + assert_eq!(result, vec![&expr1, &expr2]); + } + + #[test] + fn test_split_conjunction_or() { + let expr = col("a").eq(lit(5)).or(col("b")); + let result = split_conjunction(&expr); + assert_eq!(result, vec![&expr]); + } + + #[test] + fn test_split_binary_owned() { + let expr = col("a"); + assert_eq!(split_binary_owned(expr.clone(), Operator::And), vec![expr]); + } + + #[test] + fn test_split_binary_owned_two() { + assert_eq!( + split_binary_owned(col("a").eq(lit(5)).and(col("b")), Operator::And), + vec![col("a").eq(lit(5)), col("b")] + ); + } + + #[test] + fn test_split_binary_owned_different_op() { + let expr = col("a").eq(lit(5)).or(col("b")); + assert_eq!( + // expr is connected by OR, but pass in AND + split_binary_owned(expr.clone(), Operator::And), + vec![expr] + ); + } + + #[test] + fn test_split_conjunction_owned() { + let expr = col("a"); + assert_eq!(split_conjunction_owned(expr.clone()), vec![expr]); + } + + #[test] + fn test_split_conjunction_owned_two() { + assert_eq!( + split_conjunction_owned(col("a").eq(lit(5)).and(col("b"))), + vec![col("a").eq(lit(5)), col("b")] + ); + } + + #[test] + fn test_split_conjunction_owned_alias() { + assert_eq!( + split_conjunction_owned(col("a").eq(lit(5)).and(col("b").alias("the_alias"))), + vec![ + col("a").eq(lit(5)), + // no alias on b + col("b"), + ] + ); + } + + #[test] + fn test_conjunction_empty() { + assert_eq!(conjunction(vec![]), None); + } + + #[test] + fn test_conjunction() { + // `[A, B, C]` + let expr = conjunction(vec![col("a"), col("b"), col("c")]); + + // --> `(A AND B) AND C` + assert_eq!(expr, Some(col("a").and(col("b")).and(col("c")))); + + // which is different than `A AND (B AND C)` + assert_ne!(expr, Some(col("a").and(col("b").and(col("c"))))); + } + + #[test] + fn test_disjunction_empty() { + assert_eq!(disjunction(vec![]), None); + } + + #[test] + fn test_disjunction() { + // `[A, B, C]` + let expr = disjunction(vec![col("a"), col("b"), col("c")]); + + // --> `(A OR B) OR C` + assert_eq!(expr, Some(col("a").or(col("b")).or(col("c")))); + + // which is different than `A OR (B OR C)` + assert_ne!(expr, Some(col("a").or(col("b").or(col("c"))))); + } + + #[test] + fn test_split_conjunction_owned_or() { + let expr = col("a").eq(lit(5)).or(col("b")); + assert_eq!(split_conjunction_owned(expr.clone()), vec![expr]); + } + + #[test] + fn test_collect_expr() -> Result<()> { + let mut accum: HashSet = HashSet::new(); + expr_to_columns( + &Expr::Cast(Cast::new(Box::new(col("a")), DataType::Float64)), + &mut accum, + )?; + expr_to_columns( + &Expr::Cast(Cast::new(Box::new(col("a")), DataType::Float64)), + &mut accum, + )?; + assert_eq!(1, accum.len()); + assert!(accum.contains(&Column::from_name("a"))); + Ok(()) + } } diff --git a/datafusion/optimizer/src/analyzer/subquery.rs b/datafusion/optimizer/src/analyzer/subquery.rs index 6b8b1020cd6d8..7c5b70b19af0a 100644 --- a/datafusion/optimizer/src/analyzer/subquery.rs +++ b/datafusion/optimizer/src/analyzer/subquery.rs @@ -16,10 +16,11 @@ // under the License. use crate::analyzer::check_plan; -use crate::utils::{collect_subquery_cols, split_conjunction}; +use crate::utils::collect_subquery_cols; use datafusion_common::tree_node::{TreeNode, VisitRecursion}; use datafusion_common::{plan_err, DataFusionError, Result}; use datafusion_expr::expr_rewriter::strip_outer_reference; +use datafusion_expr::utils::split_conjunction; use datafusion_expr::{ Aggregate, BinaryExpr, Cast, Expr, Filter, Join, JoinType, LogicalPlan, Operator, Window, diff --git a/datafusion/optimizer/src/analyzer/type_coercion.rs b/datafusion/optimizer/src/analyzer/type_coercion.rs index 6628e8961e263..eb5d8c53a5e08 100644 --- a/datafusion/optimizer/src/analyzer/type_coercion.rs +++ b/datafusion/optimizer/src/analyzer/type_coercion.rs @@ -42,16 +42,15 @@ use datafusion_expr::type_coercion::other::{ get_coerce_type_for_case_expression, get_coerce_type_for_list, }; use datafusion_expr::type_coercion::{is_datetime, is_utf8_or_large_utf8}; +use datafusion_expr::utils::merge_schema; use datafusion_expr::{ is_false, is_not_false, is_not_true, is_not_unknown, is_true, is_unknown, type_coercion, window_function, AggregateFunction, BuiltinScalarFunction, Expr, - LogicalPlan, Operator, Projection, ScalarFunctionDefinition, WindowFrame, - WindowFrameBound, WindowFrameUnits, + ExprSchemable, LogicalPlan, Operator, Projection, ScalarFunctionDefinition, + Signature, WindowFrame, WindowFrameBound, WindowFrameUnits, }; -use datafusion_expr::{ExprSchemable, Signature}; use crate::analyzer::AnalyzerRule; -use crate::utils::merge_schema; #[derive(Default)] pub struct TypeCoercion {} diff --git a/datafusion/optimizer/src/decorrelate.rs b/datafusion/optimizer/src/decorrelate.rs index c8162683f39e2..ed6f472186d4c 100644 --- a/datafusion/optimizer/src/decorrelate.rs +++ b/datafusion/optimizer/src/decorrelate.rs @@ -16,15 +16,14 @@ // under the License. use crate::simplify_expressions::{ExprSimplifier, SimplifyContext}; -use crate::utils::{ - collect_subquery_cols, conjunction, find_join_exprs, split_conjunction, -}; +use crate::utils::collect_subquery_cols; use datafusion_common::tree_node::{ RewriteRecursion, Transformed, TreeNode, TreeNodeRewriter, }; use datafusion_common::{plan_err, Result}; use datafusion_common::{Column, DFSchemaRef, DataFusionError, ScalarValue}; use datafusion_expr::expr::Alias; +use datafusion_expr::utils::{conjunction, find_join_exprs, split_conjunction}; use datafusion_expr::{expr, EmptyRelation, Expr, LogicalPlan, LogicalPlanBuilder}; use datafusion_physical_expr::execution_props::ExecutionProps; use std::collections::{BTreeSet, HashMap}; diff --git a/datafusion/optimizer/src/decorrelate_predicate_subquery.rs b/datafusion/optimizer/src/decorrelate_predicate_subquery.rs index 96b46663d8e47..450336376a239 100644 --- a/datafusion/optimizer/src/decorrelate_predicate_subquery.rs +++ b/datafusion/optimizer/src/decorrelate_predicate_subquery.rs @@ -17,7 +17,7 @@ use crate::decorrelate::PullUpCorrelatedExpr; use crate::optimizer::ApplyOrder; -use crate::utils::{conjunction, replace_qualified_name, split_conjunction}; +use crate::utils::replace_qualified_name; use crate::{OptimizerConfig, OptimizerRule}; use datafusion_common::alias::AliasGenerator; use datafusion_common::tree_node::TreeNode; @@ -25,6 +25,7 @@ use datafusion_common::{plan_err, DataFusionError, Result}; use datafusion_expr::expr::{Exists, InSubquery}; use datafusion_expr::expr_rewriter::create_col_from_scalar_expr; use datafusion_expr::logical_plan::{JoinType, Subquery}; +use datafusion_expr::utils::{conjunction, split_conjunction}; use datafusion_expr::{ exists, in_subquery, not_exists, not_in_subquery, BinaryExpr, Expr, Filter, LogicalPlan, LogicalPlanBuilder, Operator, diff --git a/datafusion/optimizer/src/extract_equijoin_predicate.rs b/datafusion/optimizer/src/extract_equijoin_predicate.rs index 575969fbf73cf..24664d57c38d8 100644 --- a/datafusion/optimizer/src/extract_equijoin_predicate.rs +++ b/datafusion/optimizer/src/extract_equijoin_predicate.rs @@ -17,11 +17,10 @@ //! [`ExtractEquijoinPredicate`] rule that extracts equijoin predicates use crate::optimizer::ApplyOrder; -use crate::utils::split_conjunction; use crate::{OptimizerConfig, OptimizerRule}; use datafusion_common::DFSchema; use datafusion_common::Result; -use datafusion_expr::utils::{can_hash, find_valid_equijoin_key_pair}; +use datafusion_expr::utils::{can_hash, find_valid_equijoin_key_pair, split_conjunction}; use datafusion_expr::{BinaryExpr, Expr, ExprSchemable, Join, LogicalPlan, Operator}; use std::sync::Arc; diff --git a/datafusion/optimizer/src/optimizer.rs b/datafusion/optimizer/src/optimizer.rs index e93565fef0a05..ada94960be267 100644 --- a/datafusion/optimizer/src/optimizer.rs +++ b/datafusion/optimizer/src/optimizer.rs @@ -47,7 +47,7 @@ use chrono::{DateTime, Utc}; use datafusion_common::alias::AliasGenerator; use datafusion_common::config::ConfigOptions; use datafusion_common::{DataFusionError, Result}; -use datafusion_expr::logical_plan::LogicalPlan; +use datafusion_expr::LogicalPlan; use log::{debug, warn}; use std::collections::HashSet; use std::sync::Arc; diff --git a/datafusion/optimizer/src/push_down_filter.rs b/datafusion/optimizer/src/push_down_filter.rs index 7a2c6a8d8ccdd..95eeee931b4f4 100644 --- a/datafusion/optimizer/src/push_down_filter.rs +++ b/datafusion/optimizer/src/push_down_filter.rs @@ -16,13 +16,13 @@ //! the plan. use crate::optimizer::ApplyOrder; -use crate::utils::{conjunction, split_conjunction, split_conjunction_owned}; -use crate::{utils, OptimizerConfig, OptimizerRule}; +use crate::{OptimizerConfig, OptimizerRule}; use datafusion_common::tree_node::{Transformed, TreeNode, VisitRecursion}; use datafusion_common::{ internal_err, plan_datafusion_err, Column, DFSchema, DataFusionError, Result, }; use datafusion_expr::expr::Alias; +use datafusion_expr::utils::{conjunction, split_conjunction, split_conjunction_owned}; use datafusion_expr::Volatility; use datafusion_expr::{ and, @@ -546,9 +546,7 @@ fn push_down_join( parent_predicate: Option<&Expr>, ) -> Result> { let predicates = match parent_predicate { - Some(parent_predicate) => { - utils::split_conjunction_owned(parent_predicate.clone()) - } + Some(parent_predicate) => split_conjunction_owned(parent_predicate.clone()), None => vec![], }; @@ -556,7 +554,7 @@ fn push_down_join( let on_filters = join .filter .as_ref() - .map(|e| utils::split_conjunction_owned(e.clone())) + .map(|e| split_conjunction_owned(e.clone())) .unwrap_or_default(); let mut is_inner_join = false; @@ -805,7 +803,7 @@ impl OptimizerRule for PushDownFilter { .map(|e| Ok(Column::from_qualified_name(e.display_name()?))) .collect::>>()?; - let predicates = utils::split_conjunction_owned(filter.predicate.clone()); + let predicates = split_conjunction_owned(filter.predicate.clone()); let mut keep_predicates = vec![]; let mut push_predicates = vec![]; @@ -853,7 +851,7 @@ impl OptimizerRule for PushDownFilter { } } LogicalPlan::CrossJoin(CrossJoin { left, right, .. }) => { - let predicates = utils::split_conjunction_owned(filter.predicate.clone()); + let predicates = split_conjunction_owned(filter.predicate.clone()); push_down_all_join( predicates, vec![], @@ -908,7 +906,7 @@ impl OptimizerRule for PushDownFilter { let prevent_cols = extension_plan.node.prevent_predicate_push_down_columns(); - let predicates = utils::split_conjunction_owned(filter.predicate.clone()); + let predicates = split_conjunction_owned(filter.predicate.clone()); let mut keep_predicates = vec![]; let mut push_predicates = vec![]; diff --git a/datafusion/optimizer/src/scalar_subquery_to_join.rs b/datafusion/optimizer/src/scalar_subquery_to_join.rs index 7ac0c25119c36..34ed4a9475cba 100644 --- a/datafusion/optimizer/src/scalar_subquery_to_join.rs +++ b/datafusion/optimizer/src/scalar_subquery_to_join.rs @@ -17,7 +17,7 @@ use crate::decorrelate::{PullUpCorrelatedExpr, UN_MATCHED_ROW_INDICATOR}; use crate::optimizer::ApplyOrder; -use crate::utils::{conjunction, replace_qualified_name}; +use crate::utils::replace_qualified_name; use crate::{OptimizerConfig, OptimizerRule}; use datafusion_common::alias::AliasGenerator; use datafusion_common::tree_node::{ @@ -26,6 +26,7 @@ use datafusion_common::tree_node::{ use datafusion_common::{plan_err, Column, DataFusionError, Result, ScalarValue}; use datafusion_expr::expr_rewriter::create_col_from_scalar_expr; use datafusion_expr::logical_plan::{JoinType, Subquery}; +use datafusion_expr::utils::conjunction; use datafusion_expr::{expr, EmptyRelation, Expr, LogicalPlan, LogicalPlanBuilder}; use std::collections::{BTreeSet, HashMap}; use std::sync::Arc; diff --git a/datafusion/optimizer/src/simplify_expressions/simplify_exprs.rs b/datafusion/optimizer/src/simplify_expressions/simplify_exprs.rs index 9dc83e0fadf57..43a41b1185a33 100644 --- a/datafusion/optimizer/src/simplify_expressions/simplify_exprs.rs +++ b/datafusion/optimizer/src/simplify_expressions/simplify_exprs.rs @@ -20,10 +20,10 @@ use std::sync::Arc; use super::{ExprSimplifier, SimplifyContext}; -use crate::utils::merge_schema; use crate::{OptimizerConfig, OptimizerRule}; use datafusion_common::{DFSchema, DFSchemaRef, Result}; use datafusion_expr::logical_plan::LogicalPlan; +use datafusion_expr::utils::merge_schema; use datafusion_physical_expr::execution_props::ExecutionProps; /// Optimizer Pass that simplifies [`LogicalPlan`]s by rewriting diff --git a/datafusion/optimizer/src/unwrap_cast_in_comparison.rs b/datafusion/optimizer/src/unwrap_cast_in_comparison.rs index 907c12b7afb1e..91603e82a54fc 100644 --- a/datafusion/optimizer/src/unwrap_cast_in_comparison.rs +++ b/datafusion/optimizer/src/unwrap_cast_in_comparison.rs @@ -19,7 +19,6 @@ //! of expr can be added if needed. //! This rule can reduce adding the `Expr::Cast` the expr instead of adding the `Expr::Cast` to literal expr. use crate::optimizer::ApplyOrder; -use crate::utils::merge_schema; use crate::{OptimizerConfig, OptimizerRule}; use arrow::datatypes::{ DataType, TimeUnit, MAX_DECIMAL_FOR_EACH_PRECISION, MIN_DECIMAL_FOR_EACH_PRECISION, @@ -31,6 +30,7 @@ use datafusion_common::{ }; use datafusion_expr::expr::{BinaryExpr, Cast, InList, TryCast}; use datafusion_expr::expr_rewriter::rewrite_preserving_name; +use datafusion_expr::utils::merge_schema; use datafusion_expr::{ binary_expr, in_list, lit, Expr, ExprSchemable, LogicalPlan, Operator, }; diff --git a/datafusion/optimizer/src/utils.rs b/datafusion/optimizer/src/utils.rs index a3e7e42875d7e..48f72ee7a0f87 100644 --- a/datafusion/optimizer/src/utils.rs +++ b/datafusion/optimizer/src/utils.rs @@ -18,19 +18,13 @@ //! Collection of utility functions that are leveraged by the query optimizer rules use crate::{OptimizerConfig, OptimizerRule}; -use datafusion_common::DataFusionError; -use datafusion_common::{plan_err, Column, DFSchemaRef}; +use datafusion_common::{Column, DFSchemaRef}; use datafusion_common::{DFSchema, Result}; -use datafusion_expr::expr::{Alias, BinaryExpr}; -use datafusion_expr::expr_rewriter::{replace_col, strip_outer_reference}; -use datafusion_expr::{ - and, - logical_plan::{Filter, LogicalPlan}, - Expr, Operator, -}; +use datafusion_expr::expr_rewriter::replace_col; +use datafusion_expr::utils as expr_utils; +use datafusion_expr::{logical_plan::LogicalPlan, Expr, Operator}; use log::{debug, trace}; use std::collections::{BTreeSet, HashMap}; -use std::sync::Arc; /// Convenience rule for writing optimizers: recursively invoke /// optimize on plan's children and then return a node of the same @@ -58,29 +52,55 @@ pub fn optimize_children( } } +pub(crate) fn collect_subquery_cols( + exprs: &[Expr], + subquery_schema: DFSchemaRef, +) -> Result> { + exprs.iter().try_fold(BTreeSet::new(), |mut cols, expr| { + let mut using_cols: Vec = vec![]; + for col in expr.to_columns()?.into_iter() { + if subquery_schema.has_column(&col) { + using_cols.push(col); + } + } + + cols.extend(using_cols); + Result::<_>::Ok(cols) + }) +} + +pub(crate) fn replace_qualified_name( + expr: Expr, + cols: &BTreeSet, + subquery_alias: &str, +) -> Result { + let alias_cols: Vec = cols + .iter() + .map(|col| { + Column::from_qualified_name(format!("{}.{}", subquery_alias, col.name)) + }) + .collect(); + let replace_map: HashMap<&Column, &Column> = + cols.iter().zip(alias_cols.iter()).collect(); + + replace_col(expr, &replace_map) +} + +/// Log the plan in debug/tracing mode after some part of the optimizer runs +pub fn log_plan(description: &str, plan: &LogicalPlan) { + debug!("{description}:\n{}\n", plan.display_indent()); + trace!("{description}::\n{}\n", plan.display_indent_schema()); +} + /// Splits a conjunctive [`Expr`] such as `A AND B AND C` => `[A, B, C]` /// /// See [`split_conjunction_owned`] for more details and an example. +#[deprecated( + since = "34.0.0", + note = "use `datafusion_expr::utils::split_conjunction` instead" +)] pub fn split_conjunction(expr: &Expr) -> Vec<&Expr> { - split_conjunction_impl(expr, vec![]) -} - -fn split_conjunction_impl<'a>(expr: &'a Expr, mut exprs: Vec<&'a Expr>) -> Vec<&'a Expr> { - match expr { - Expr::BinaryExpr(BinaryExpr { - right, - op: Operator::And, - left, - }) => { - let exprs = split_conjunction_impl(left, exprs); - split_conjunction_impl(right, exprs) - } - Expr::Alias(Alias { expr, .. }) => split_conjunction_impl(expr, exprs), - other => { - exprs.push(other); - exprs - } - } + expr_utils::split_conjunction(expr) } /// Splits an owned conjunctive [`Expr`] such as `A AND B AND C` => `[A, B, C]` @@ -104,8 +124,12 @@ fn split_conjunction_impl<'a>(expr: &'a Expr, mut exprs: Vec<&'a Expr>) -> Vec<& /// // use split_conjunction_owned to split them /// assert_eq!(split_conjunction_owned(expr), split); /// ``` +#[deprecated( + since = "34.0.0", + note = "use `datafusion_expr::utils::split_conjunction_owned` instead" +)] pub fn split_conjunction_owned(expr: Expr) -> Vec { - split_binary_owned(expr, Operator::And) + expr_utils::split_conjunction_owned(expr) } /// Splits an owned binary operator tree [`Expr`] such as `A B C` => `[A, B, C]` @@ -130,53 +154,23 @@ pub fn split_conjunction_owned(expr: Expr) -> Vec { /// // use split_binary_owned to split them /// assert_eq!(split_binary_owned(expr, Operator::Plus), split); /// ``` +#[deprecated( + since = "34.0.0", + note = "use `datafusion_expr::utils::split_binary_owned` instead" +)] pub fn split_binary_owned(expr: Expr, op: Operator) -> Vec { - split_binary_owned_impl(expr, op, vec![]) -} - -fn split_binary_owned_impl( - expr: Expr, - operator: Operator, - mut exprs: Vec, -) -> Vec { - match expr { - Expr::BinaryExpr(BinaryExpr { right, op, left }) if op == operator => { - let exprs = split_binary_owned_impl(*left, operator, exprs); - split_binary_owned_impl(*right, operator, exprs) - } - Expr::Alias(Alias { expr, .. }) => { - split_binary_owned_impl(*expr, operator, exprs) - } - other => { - exprs.push(other); - exprs - } - } + expr_utils::split_binary_owned(expr, op) } /// Splits an binary operator tree [`Expr`] such as `A B C` => `[A, B, C]` /// /// See [`split_binary_owned`] for more details and an example. +#[deprecated( + since = "34.0.0", + note = "use `datafusion_expr::utils::split_binary` instead" +)] pub fn split_binary(expr: &Expr, op: Operator) -> Vec<&Expr> { - split_binary_impl(expr, op, vec![]) -} - -fn split_binary_impl<'a>( - expr: &'a Expr, - operator: Operator, - mut exprs: Vec<&'a Expr>, -) -> Vec<&'a Expr> { - match expr { - Expr::BinaryExpr(BinaryExpr { right, op, left }) if *op == operator => { - let exprs = split_binary_impl(left, operator, exprs); - split_binary_impl(right, operator, exprs) - } - Expr::Alias(Alias { expr, .. }) => split_binary_impl(expr, operator, exprs), - other => { - exprs.push(other); - exprs - } - } + expr_utils::split_binary(expr, op) } /// Combines an array of filter expressions into a single filter @@ -201,8 +195,12 @@ fn split_binary_impl<'a>( /// // use conjunction to join them together with `AND` /// assert_eq!(conjunction(split), Some(expr)); /// ``` +#[deprecated( + since = "34.0.0", + note = "use `datafusion_expr::utils::conjunction` instead" +)] pub fn conjunction(filters: impl IntoIterator) -> Option { - filters.into_iter().reduce(|accum, expr| accum.and(expr)) + expr_utils::conjunction(filters) } /// Combines an array of filter expressions into a single filter @@ -210,25 +208,22 @@ pub fn conjunction(filters: impl IntoIterator) -> Option { /// logical OR. /// /// Returns None if the filters array is empty. +#[deprecated( + since = "34.0.0", + note = "use `datafusion_expr::utils::disjunction` instead" +)] pub fn disjunction(filters: impl IntoIterator) -> Option { - filters.into_iter().reduce(|accum, expr| accum.or(expr)) + expr_utils::disjunction(filters) } /// returns a new [LogicalPlan] that wraps `plan` in a [LogicalPlan::Filter] with /// its predicate be all `predicates` ANDed. +#[deprecated( + since = "34.0.0", + note = "use `datafusion_expr::utils::add_filter` instead" +)] pub fn add_filter(plan: LogicalPlan, predicates: &[&Expr]) -> Result { - // reduce filters to a single filter with an AND - let predicate = predicates - .iter() - .skip(1) - .fold(predicates[0].clone(), |acc, predicate| { - and(acc, (*predicate).to_owned()) - }); - - Ok(LogicalPlan::Filter(Filter::try_new( - predicate, - Arc::new(plan), - )?)) + expr_utils::add_filter(plan, predicates) } /// Looks for correlating expressions: for example, a binary expression with one field from the subquery, and @@ -241,22 +236,12 @@ pub fn add_filter(plan: LogicalPlan, predicates: &[&Expr]) -> Result) -> Result<(Vec, Vec)> { - let mut joins = vec![]; - let mut others = vec![]; - for filter in exprs.into_iter() { - // If the expression contains correlated predicates, add it to join filters - if filter.contains_outer() { - if !matches!(filter, Expr::BinaryExpr(BinaryExpr{ left, op: Operator::Eq, right }) if left.eq(right)) - { - joins.push(strip_outer_reference((*filter).clone())); - } - } else { - others.push((*filter).clone()); - } - } - - Ok((joins, others)) + expr_utils::find_join_exprs(exprs) } /// Returns the first (and only) element in a slice, or an error @@ -268,215 +253,19 @@ pub fn find_join_exprs(exprs: Vec<&Expr>) -> Result<(Vec, Vec)> { /// # Return value /// /// The first element, or an error +#[deprecated( + since = "34.0.0", + note = "use `datafusion_expr::utils::only_or_err` instead" +)] pub fn only_or_err(slice: &[T]) -> Result<&T> { - match slice { - [it] => Ok(it), - [] => plan_err!("No items found!"), - _ => plan_err!("More than one item found!"), - } + expr_utils::only_or_err(slice) } /// merge inputs schema into a single schema. +#[deprecated( + since = "34.0.0", + note = "use `datafusion_expr::utils::merge_schema` instead" +)] pub fn merge_schema(inputs: Vec<&LogicalPlan>) -> DFSchema { - if inputs.len() == 1 { - inputs[0].schema().clone().as_ref().clone() - } else { - inputs.iter().map(|input| input.schema()).fold( - DFSchema::empty(), - |mut lhs, rhs| { - lhs.merge(rhs); - lhs - }, - ) - } -} - -pub(crate) fn collect_subquery_cols( - exprs: &[Expr], - subquery_schema: DFSchemaRef, -) -> Result> { - exprs.iter().try_fold(BTreeSet::new(), |mut cols, expr| { - let mut using_cols: Vec = vec![]; - for col in expr.to_columns()?.into_iter() { - if subquery_schema.has_column(&col) { - using_cols.push(col); - } - } - - cols.extend(using_cols); - Result::<_>::Ok(cols) - }) -} - -pub(crate) fn replace_qualified_name( - expr: Expr, - cols: &BTreeSet, - subquery_alias: &str, -) -> Result { - let alias_cols: Vec = cols - .iter() - .map(|col| { - Column::from_qualified_name(format!("{}.{}", subquery_alias, col.name)) - }) - .collect(); - let replace_map: HashMap<&Column, &Column> = - cols.iter().zip(alias_cols.iter()).collect(); - - replace_col(expr, &replace_map) -} - -/// Log the plan in debug/tracing mode after some part of the optimizer runs -pub fn log_plan(description: &str, plan: &LogicalPlan) { - debug!("{description}:\n{}\n", plan.display_indent()); - trace!("{description}::\n{}\n", plan.display_indent_schema()); -} - -#[cfg(test)] -mod tests { - use super::*; - use arrow::datatypes::DataType; - use datafusion_common::Column; - use datafusion_expr::expr::Cast; - use datafusion_expr::{col, lit, utils::expr_to_columns}; - use std::collections::HashSet; - - #[test] - fn test_split_conjunction() { - let expr = col("a"); - let result = split_conjunction(&expr); - assert_eq!(result, vec![&expr]); - } - - #[test] - fn test_split_conjunction_two() { - let expr = col("a").eq(lit(5)).and(col("b")); - let expr1 = col("a").eq(lit(5)); - let expr2 = col("b"); - - let result = split_conjunction(&expr); - assert_eq!(result, vec![&expr1, &expr2]); - } - - #[test] - fn test_split_conjunction_alias() { - let expr = col("a").eq(lit(5)).and(col("b").alias("the_alias")); - let expr1 = col("a").eq(lit(5)); - let expr2 = col("b"); // has no alias - - let result = split_conjunction(&expr); - assert_eq!(result, vec![&expr1, &expr2]); - } - - #[test] - fn test_split_conjunction_or() { - let expr = col("a").eq(lit(5)).or(col("b")); - let result = split_conjunction(&expr); - assert_eq!(result, vec![&expr]); - } - - #[test] - fn test_split_binary_owned() { - let expr = col("a"); - assert_eq!(split_binary_owned(expr.clone(), Operator::And), vec![expr]); - } - - #[test] - fn test_split_binary_owned_two() { - assert_eq!( - split_binary_owned(col("a").eq(lit(5)).and(col("b")), Operator::And), - vec![col("a").eq(lit(5)), col("b")] - ); - } - - #[test] - fn test_split_binary_owned_different_op() { - let expr = col("a").eq(lit(5)).or(col("b")); - assert_eq!( - // expr is connected by OR, but pass in AND - split_binary_owned(expr.clone(), Operator::And), - vec![expr] - ); - } - - #[test] - fn test_split_conjunction_owned() { - let expr = col("a"); - assert_eq!(split_conjunction_owned(expr.clone()), vec![expr]); - } - - #[test] - fn test_split_conjunction_owned_two() { - assert_eq!( - split_conjunction_owned(col("a").eq(lit(5)).and(col("b"))), - vec![col("a").eq(lit(5)), col("b")] - ); - } - - #[test] - fn test_split_conjunction_owned_alias() { - assert_eq!( - split_conjunction_owned(col("a").eq(lit(5)).and(col("b").alias("the_alias"))), - vec![ - col("a").eq(lit(5)), - // no alias on b - col("b"), - ] - ); - } - - #[test] - fn test_conjunction_empty() { - assert_eq!(conjunction(vec![]), None); - } - - #[test] - fn test_conjunction() { - // `[A, B, C]` - let expr = conjunction(vec![col("a"), col("b"), col("c")]); - - // --> `(A AND B) AND C` - assert_eq!(expr, Some(col("a").and(col("b")).and(col("c")))); - - // which is different than `A AND (B AND C)` - assert_ne!(expr, Some(col("a").and(col("b").and(col("c"))))); - } - - #[test] - fn test_disjunction_empty() { - assert_eq!(disjunction(vec![]), None); - } - - #[test] - fn test_disjunction() { - // `[A, B, C]` - let expr = disjunction(vec![col("a"), col("b"), col("c")]); - - // --> `(A OR B) OR C` - assert_eq!(expr, Some(col("a").or(col("b")).or(col("c")))); - - // which is different than `A OR (B OR C)` - assert_ne!(expr, Some(col("a").or(col("b").or(col("c"))))); - } - - #[test] - fn test_split_conjunction_owned_or() { - let expr = col("a").eq(lit(5)).or(col("b")); - assert_eq!(split_conjunction_owned(expr.clone()), vec![expr]); - } - - #[test] - fn test_collect_expr() -> Result<()> { - let mut accum: HashSet = HashSet::new(); - expr_to_columns( - &Expr::Cast(Cast::new(Box::new(col("a")), DataType::Float64)), - &mut accum, - )?; - expr_to_columns( - &Expr::Cast(Cast::new(Box::new(col("a")), DataType::Float64)), - &mut accum, - )?; - assert_eq!(1, accum.len()); - assert!(accum.contains(&Column::from_name("a"))); - Ok(()) - } + expr_utils::merge_schema(inputs) } diff --git a/datafusion/substrait/src/logical_plan/consumer.rs b/datafusion/substrait/src/logical_plan/consumer.rs index 5cb72adaca4df..b7a51032dcd9e 100644 --- a/datafusion/substrait/src/logical_plan/consumer.rs +++ b/datafusion/substrait/src/logical_plan/consumer.rs @@ -32,7 +32,7 @@ use datafusion::prelude::JoinType; use datafusion::sql::TableReference; use datafusion::{ error::{DataFusionError, Result}, - optimizer::utils::split_conjunction, + logical_expr::utils::split_conjunction, prelude::{Column, SessionContext}, scalar::ScalarValue, };