From 394325b7c57157e32e167d3e6b41c4da945fff64 Mon Sep 17 00:00:00 2001 From: Mustafa Akur Date: Fri, 22 Dec 2023 15:44:24 +0300 Subject: [PATCH 1/5] Initial commit --- datafusion/expr/src/logical_plan/plan.rs | 47 ++++++++++++++++++- .../optimizer/src/eliminate_cross_join.rs | 39 +++++++++++++++ datafusion/optimizer/src/push_down_filter.rs | 2 +- datafusion/sqllogictest/test_files/joins.slt | 18 +++++++ 4 files changed, 104 insertions(+), 2 deletions(-) diff --git a/datafusion/expr/src/logical_plan/plan.rs b/datafusion/expr/src/logical_plan/plan.rs index 1f3711407a14..049414892859 100644 --- a/datafusion/expr/src/logical_plan/plan.rs +++ b/datafusion/expr/src/logical_plan/plan.rs @@ -780,7 +780,52 @@ impl LogicalPlan { LogicalPlan::CrossJoin(_) => { let left = inputs[0].clone(); let right = inputs[1].clone(); - LogicalPlanBuilder::from(left).cross_join(right)?.build() + if expr.is_empty() { + // If expr is empty, construct cross join from children + LogicalPlanBuilder::from(left).cross_join(right)?.build() + } else { + let exprs = expr + .iter() + .flat_map(|expr| split_conjunction(expr)) + .collect::>(); + let mut new_on: Vec<(Expr, Expr)> = vec![]; + let mut new_filters = vec![]; + for expr in exprs { + // SimplifyExpression rule may add alias to the equi_expr. + let unalias_expr = expr.clone().unalias(); + if let Expr::BinaryExpr(BinaryExpr { + left, + op: Operator::Eq, + right, + }) = unalias_expr + { + new_on.push((*left, *right)); + } else { + new_filters.push(expr.clone()); + } + } + let filter = if new_filters.is_empty() { + None + } else { + Some(new_filters.into_iter().reduce(Expr::and).unwrap()) + }; + let join_schema = build_join_schema( + left.schema(), + right.schema(), + &JoinType::Inner, + )?; + // predicate is given + Ok(LogicalPlan::Join(Join { + left: Arc::new(left), + right: Arc::new(right), + join_type: JoinType::Inner, + join_constraint: JoinConstraint::On, + on: new_on, + filter, + schema: DFSchemaRef::new(join_schema), + null_equals_null: true, + })) + } } LogicalPlan::Subquery(Subquery { outer_ref_columns, .. diff --git a/datafusion/optimizer/src/eliminate_cross_join.rs b/datafusion/optimizer/src/eliminate_cross_join.rs index cf9a59d6b892..9366fc7fc799 100644 --- a/datafusion/optimizer/src/eliminate_cross_join.rs +++ b/datafusion/optimizer/src/eliminate_cross_join.rs @@ -120,6 +120,45 @@ impl OptimizerRule for EliminateCrossJoin { } } } + LogicalPlan::Join(Join { + join_type: JoinType::Inner, + .. + }) => { + let mut possible_join_keys: Vec<(Expr, Expr)> = vec![]; + let mut all_inputs: Vec = vec![]; + let did_flat_successfully = try_flatten_join_inputs( + &plan, + &mut possible_join_keys, + &mut all_inputs, + )?; + + if !did_flat_successfully { + return Ok(None); + } + + // join keys are handled locally + let mut all_join_keys: HashSet<(Expr, Expr)> = HashSet::new(); + + let mut left = all_inputs.remove(0); + while !all_inputs.is_empty() { + left = find_inner_join( + &left, + &mut all_inputs, + &mut possible_join_keys, + &mut all_join_keys, + )?; + } + + left = utils::optimize_children(self, &left, config)?.unwrap_or(left); + + if plan.schema() != left.schema() { + left = LogicalPlan::Projection(Projection::new_from_schema( + Arc::new(left.clone()), + plan.schema().clone(), + )); + } + Ok(Some(left)) + } _ => utils::optimize_children(self, plan, config), } diff --git a/datafusion/optimizer/src/push_down_filter.rs b/datafusion/optimizer/src/push_down_filter.rs index 4bea17500acc..02eb5ca1cb14 100644 --- a/datafusion/optimizer/src/push_down_filter.rs +++ b/datafusion/optimizer/src/push_down_filter.rs @@ -857,7 +857,7 @@ impl OptimizerRule for PushDownFilter { left, right, vec![], - false, + true, )? } LogicalPlan::TableScan(scan) => { diff --git a/datafusion/sqllogictest/test_files/joins.slt b/datafusion/sqllogictest/test_files/joins.slt index 1ad17fbb8c91..6b7765a039d0 100644 --- a/datafusion/sqllogictest/test_files/joins.slt +++ b/datafusion/sqllogictest/test_files/joins.slt @@ -3466,6 +3466,24 @@ SortPreservingMergeExec: [a@0 ASC] ----------------------RepartitionExec: partitioning=RoundRobinBatch(2), input_partitions=1 ------------------------CsvExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/core/tests/data/window_2.csv]]}, projection=[a, b], output_ordering=[a@0 ASC, b@1 ASC NULLS LAST], has_header=true +query TT +EXPLAIN SELECT * +FROM annotated_data as l, annotated_data as r +WHERE l.a > r.a +---- +logical_plan +Inner Join: Filter: l.a > r.a +--SubqueryAlias: l +----TableScan: annotated_data projection=[a0, a, b, c, d] +--SubqueryAlias: r +----TableScan: annotated_data projection=[a0, a, b, c, d] +physical_plan +NestedLoopJoinExec: join_type=Inner, filter=a@0 > a@1 +--RepartitionExec: partitioning=RoundRobinBatch(2), input_partitions=1 +----CsvExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/core/tests/data/window_2.csv]]}, projection=[a0, a, b, c, d], output_ordering=[a@1 ASC, b@2 ASC NULLS LAST, c@3 ASC NULLS LAST], has_header=true +--CsvExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/core/tests/data/window_2.csv]]}, projection=[a0, a, b, c, d], output_ordering=[a@1 ASC, b@2 ASC NULLS LAST, c@3 ASC NULLS LAST], has_header=true + + #### # Config teardown #### From 739bfdfa279d1a68aedf26e5fab8b948bb971197 Mon Sep 17 00:00:00 2001 From: Mustafa Akur Date: Fri, 22 Dec 2023 15:50:25 +0300 Subject: [PATCH 2/5] Simplifications --- datafusion/expr/src/logical_plan/plan.rs | 47 +----- .../optimizer/src/eliminate_cross_join.rs | 140 ++++++++---------- datafusion/optimizer/src/push_down_filter.rs | 63 ++++++-- datafusion/sqllogictest/test_files/joins.slt | 1 - 4 files changed, 108 insertions(+), 143 deletions(-) diff --git a/datafusion/expr/src/logical_plan/plan.rs b/datafusion/expr/src/logical_plan/plan.rs index 049414892859..1f3711407a14 100644 --- a/datafusion/expr/src/logical_plan/plan.rs +++ b/datafusion/expr/src/logical_plan/plan.rs @@ -780,52 +780,7 @@ impl LogicalPlan { LogicalPlan::CrossJoin(_) => { let left = inputs[0].clone(); let right = inputs[1].clone(); - if expr.is_empty() { - // If expr is empty, construct cross join from children - LogicalPlanBuilder::from(left).cross_join(right)?.build() - } else { - let exprs = expr - .iter() - .flat_map(|expr| split_conjunction(expr)) - .collect::>(); - let mut new_on: Vec<(Expr, Expr)> = vec![]; - let mut new_filters = vec![]; - for expr in exprs { - // SimplifyExpression rule may add alias to the equi_expr. - let unalias_expr = expr.clone().unalias(); - if let Expr::BinaryExpr(BinaryExpr { - left, - op: Operator::Eq, - right, - }) = unalias_expr - { - new_on.push((*left, *right)); - } else { - new_filters.push(expr.clone()); - } - } - let filter = if new_filters.is_empty() { - None - } else { - Some(new_filters.into_iter().reduce(Expr::and).unwrap()) - }; - let join_schema = build_join_schema( - left.schema(), - right.schema(), - &JoinType::Inner, - )?; - // predicate is given - Ok(LogicalPlan::Join(Join { - left: Arc::new(left), - right: Arc::new(right), - join_type: JoinType::Inner, - join_constraint: JoinConstraint::On, - on: new_on, - filter, - schema: DFSchemaRef::new(join_schema), - null_equals_null: true, - })) - } + LogicalPlanBuilder::from(left).cross_join(right)?.build() } LogicalPlan::Subquery(Subquery { outer_ref_columns, .. diff --git a/datafusion/optimizer/src/eliminate_cross_join.rs b/datafusion/optimizer/src/eliminate_cross_join.rs index 9366fc7fc799..a882d19e89ba 100644 --- a/datafusion/optimizer/src/eliminate_cross_join.rs +++ b/datafusion/optimizer/src/eliminate_cross_join.rs @@ -54,13 +54,14 @@ impl OptimizerRule for EliminateCrossJoin { plan: &LogicalPlan, config: &dyn OptimizerConfig, ) -> Result> { - match plan { + let mut possible_join_keys: Vec<(Expr, Expr)> = vec![]; + let mut all_inputs: Vec = vec![]; + let mut parent_predicate = None; + let did_flat_successfully = match plan { LogicalPlan::Filter(filter) => { let input = filter.input.as_ref().clone(); - - let mut possible_join_keys: Vec<(Expr, Expr)> = vec![]; - let mut all_inputs: Vec = vec![]; - let did_flat_successfully = match &input { + parent_predicate = Some(&filter.predicate); + match &input { LogicalPlan::Join(Join { join_type: JoinType::Inner, .. @@ -73,94 +74,69 @@ impl OptimizerRule for EliminateCrossJoin { _ => { return utils::optimize_children(self, plan, config); } - }; - - if !did_flat_successfully { - return Ok(None); - } - - let predicate = &filter.predicate; - // join keys are handled locally - let mut all_join_keys: HashSet<(Expr, Expr)> = HashSet::new(); - - extract_possible_join_keys(predicate, &mut possible_join_keys)?; - - let mut left = all_inputs.remove(0); - while !all_inputs.is_empty() { - left = find_inner_join( - &left, - &mut all_inputs, - &mut possible_join_keys, - &mut all_join_keys, - )?; - } - - left = utils::optimize_children(self, &left, config)?.unwrap_or(left); - - if plan.schema() != left.schema() { - left = LogicalPlan::Projection(Projection::new_from_schema( - Arc::new(left.clone()), - plan.schema().clone(), - )); - } - - // if there are no join keys then do nothing. - if all_join_keys.is_empty() { - Ok(Some(LogicalPlan::Filter(Filter::try_new( - predicate.clone(), - Arc::new(left), - )?))) - } else { - // remove join expressions from filter - match remove_join_expressions(predicate, &all_join_keys)? { - Some(filter_expr) => Ok(Some(LogicalPlan::Filter( - Filter::try_new(filter_expr, Arc::new(left))?, - ))), - _ => Ok(Some(left)), - } } } LogicalPlan::Join(Join { join_type: JoinType::Inner, .. }) => { - let mut possible_join_keys: Vec<(Expr, Expr)> = vec![]; - let mut all_inputs: Vec = vec![]; - let did_flat_successfully = try_flatten_join_inputs( - &plan, - &mut possible_join_keys, - &mut all_inputs, - )?; - - if !did_flat_successfully { - return Ok(None); - } + try_flatten_join_inputs(plan, &mut possible_join_keys, &mut all_inputs)? + } - // join keys are handled locally - let mut all_join_keys: HashSet<(Expr, Expr)> = HashSet::new(); + _ => return utils::optimize_children(self, plan, config), + }; + if !did_flat_successfully { + return Ok(None); + } - let mut left = all_inputs.remove(0); - while !all_inputs.is_empty() { - left = find_inner_join( - &left, - &mut all_inputs, - &mut possible_join_keys, - &mut all_join_keys, - )?; - } + // join keys are handled locally + let mut all_join_keys: HashSet<(Expr, Expr)> = HashSet::new(); - left = utils::optimize_children(self, &left, config)?.unwrap_or(left); + if let Some(predicate) = parent_predicate { + extract_possible_join_keys(predicate, &mut possible_join_keys)?; + } - if plan.schema() != left.schema() { - left = LogicalPlan::Projection(Projection::new_from_schema( - Arc::new(left.clone()), - plan.schema().clone(), - )); - } - Ok(Some(left)) - } + let mut left = all_inputs.remove(0); + while !all_inputs.is_empty() { + left = find_inner_join( + &left, + &mut all_inputs, + &mut possible_join_keys, + &mut all_join_keys, + )?; + } + + left = utils::optimize_children(self, &left, config)?.unwrap_or(left); + + if plan.schema() != left.schema() { + left = LogicalPlan::Projection(Projection::new_from_schema( + Arc::new(left.clone()), + plan.schema().clone(), + )); + } + + let predicate = if let Some(predicate) = parent_predicate { + // there is a filter at the top. Consider its predicate also. + predicate + } else { + return Ok(Some(left)); + }; - _ => utils::optimize_children(self, plan, config), + // if there are no join keys then do nothing. + if all_join_keys.is_empty() { + Ok(Some(LogicalPlan::Filter(Filter::try_new( + predicate.clone(), + Arc::new(left), + )?))) + } else { + // remove join expressions from filter + match remove_join_expressions(predicate, &all_join_keys)? { + Some(filter_expr) => Ok(Some(LogicalPlan::Filter(Filter::try_new( + filter_expr, + Arc::new(left), + )?))), + _ => Ok(Some(left)), + } } } diff --git a/datafusion/optimizer/src/push_down_filter.rs b/datafusion/optimizer/src/push_down_filter.rs index 02eb5ca1cb14..5da0ffcc850d 100644 --- a/datafusion/optimizer/src/push_down_filter.rs +++ b/datafusion/optimizer/src/push_down_filter.rs @@ -19,18 +19,19 @@ use crate::optimizer::ApplyOrder; use crate::{OptimizerConfig, OptimizerRule}; use datafusion_common::tree_node::{Transformed, TreeNode, VisitRecursion}; use datafusion_common::{ - internal_err, plan_datafusion_err, Column, DFSchema, DataFusionError, Result, + internal_err, plan_datafusion_err, Column, DFSchema, DFSchemaRef, DataFusionError, + JoinConstraint, Result, }; use datafusion_expr::expr::Alias; use datafusion_expr::utils::{conjunction, split_conjunction, split_conjunction_owned}; -use datafusion_expr::Volatility; use datafusion_expr::{ and, expr_rewriter::replace_col, logical_plan::{CrossJoin, Join, JoinType, LogicalPlan, TableScan, Union}, - or, BinaryExpr, Expr, Filter, Operator, ScalarFunctionDefinition, + or, BinaryExpr, Expr, Filter, LogicalPlanBuilder, Operator, ScalarFunctionDefinition, TableProviderFilterPushDown, }; +use datafusion_expr::{build_join_schema, Volatility}; use itertools::Itertools; use std::collections::{HashMap, HashSet}; use std::sync::Arc; @@ -848,17 +849,23 @@ impl OptimizerRule for PushDownFilter { None => return Ok(None), } } - LogicalPlan::CrossJoin(CrossJoin { left, right, .. }) => { + LogicalPlan::CrossJoin(cross_join) => { let predicates = split_conjunction_owned(filter.predicate.clone()); - push_down_all_join( + let join = convert_cross_join_to_inner_join(cross_join.clone())?; + let join_plan = LogicalPlan::Join(join); + let inputs = join_plan.inputs(); + let left = inputs[0]; + let right = inputs[1]; + let plan = push_down_all_join( predicates, vec![], - &filter.input, + &join_plan, left, right, vec![], true, - )? + )?; + convert_to_cross_join_if_beneficial(plan)? } LogicalPlan::TableScan(scan) => { let filter_predicates = split_conjunction(&filter.predicate); @@ -955,6 +962,36 @@ impl PushDownFilter { } } +/// Convert cross join to join by pushing down filter predicate to the join condition +fn convert_cross_join_to_inner_join(cross_join: CrossJoin) -> Result { + let CrossJoin { left, right, .. } = cross_join; + let join_schema = build_join_schema(left.schema(), right.schema(), &JoinType::Inner)?; + // predicate is given + Ok(Join { + left, + right, + join_type: JoinType::Inner, + join_constraint: JoinConstraint::On, + on: vec![], + filter: None, + schema: DFSchemaRef::new(join_schema), + null_equals_null: true, + }) +} + +/// Converts the inner join with empty equality predicate and empty filter condition to the cross join +fn convert_to_cross_join_if_beneficial(plan: LogicalPlan) -> Result { + if let LogicalPlan::Join(join) = &plan { + // Can be converted back to cross join + if join.on.is_empty() && join.filter.is_none() { + return LogicalPlanBuilder::from(join.left.as_ref().clone()) + .cross_join(join.right.as_ref().clone())? + .build(); + } + } + Ok(plan) +} + /// replaces columns by its name on the projection. pub fn replace_cols_by_name( e: Expr, @@ -2665,14 +2702,12 @@ Projection: a, b .cross_join(right)? .filter(filter)? .build()?; - let expected = "\ - Filter: test.a = d AND test.b > UInt32(1) OR test.b = e AND test.c < UInt32(10)\ - \n CrossJoin:\ - \n Projection: test.a, test.b, test.c\ - \n TableScan: test, full_filters=[test.b > UInt32(1) OR test.c < UInt32(10)]\ - \n Projection: test1.a AS d, test1.a AS e\ - \n TableScan: test1"; + Inner Join: Filter: test.a = d AND test.b > UInt32(1) OR test.b = e AND test.c < UInt32(10)\ + \n Projection: test.a, test.b, test.c\ + \n TableScan: test, full_filters=[test.b > UInt32(1) OR test.c < UInt32(10)]\ + \n Projection: test1.a AS d, test1.a AS e\ + \n TableScan: test1"; assert_optimized_plan_eq_with_rewrite_predicate(&plan, expected)?; // Originally global state which can help to avoid duplicate Filters been generated and pushed down. diff --git a/datafusion/sqllogictest/test_files/joins.slt b/datafusion/sqllogictest/test_files/joins.slt index 6b7765a039d0..eee213811f44 100644 --- a/datafusion/sqllogictest/test_files/joins.slt +++ b/datafusion/sqllogictest/test_files/joins.slt @@ -3483,7 +3483,6 @@ NestedLoopJoinExec: join_type=Inner, filter=a@0 > a@1 ----CsvExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/core/tests/data/window_2.csv]]}, projection=[a0, a, b, c, d], output_ordering=[a@1 ASC, b@2 ASC NULLS LAST, c@3 ASC NULLS LAST], has_header=true --CsvExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/core/tests/data/window_2.csv]]}, projection=[a0, a, b, c, d], output_ordering=[a@1 ASC, b@2 ASC NULLS LAST, c@3 ASC NULLS LAST], has_header=true - #### # Config teardown #### From b52247094fd070bbe561f25ebc4519e8e1159add Mon Sep 17 00:00:00 2001 From: Mehmet Ozan Kabak Date: Sat, 23 Dec 2023 12:41:10 +0300 Subject: [PATCH 3/5] Review --- .../optimizer/src/eliminate_cross_join.rs | 64 ++++++++----------- datafusion/optimizer/src/push_down_filter.rs | 28 ++++---- 2 files changed, 45 insertions(+), 47 deletions(-) diff --git a/datafusion/optimizer/src/eliminate_cross_join.rs b/datafusion/optimizer/src/eliminate_cross_join.rs index a882d19e89ba..1311e7bd0610 100644 --- a/datafusion/optimizer/src/eliminate_cross_join.rs +++ b/datafusion/optimizer/src/eliminate_cross_join.rs @@ -20,6 +20,7 @@ use std::collections::HashSet; use std::sync::Arc; use crate::{utils, OptimizerConfig, OptimizerRule}; + use datafusion_common::{plan_err, DataFusionError, Result}; use datafusion_expr::expr::{BinaryExpr, Expr}; use datafusion_expr::logical_plan::{ @@ -47,7 +48,6 @@ impl EliminateCrossJoin { /// For above queries, the join predicate is available in filters and they are moved to /// join nodes appropriately /// This fix helps to improve the performance of TPCH Q19. issue#78 -/// impl OptimizerRule for EliminateCrossJoin { fn try_optimize( &self, @@ -57,7 +57,7 @@ impl OptimizerRule for EliminateCrossJoin { let mut possible_join_keys: Vec<(Expr, Expr)> = vec![]; let mut all_inputs: Vec = vec![]; let mut parent_predicate = None; - let did_flat_successfully = match plan { + if !match plan { LogicalPlan::Filter(filter) => { let input = filter.input.as_ref().clone(); parent_predicate = Some(&filter.predicate); @@ -66,11 +66,18 @@ impl OptimizerRule for EliminateCrossJoin { join_type: JoinType::Inner, .. }) - | LogicalPlan::CrossJoin(_) => try_flatten_join_inputs( - &input, - &mut possible_join_keys, - &mut all_inputs, - )?, + | LogicalPlan::CrossJoin(_) => { + let success = try_flatten_join_inputs( + &input, + &mut possible_join_keys, + &mut all_inputs, + )?; + extract_possible_join_keys( + &filter.predicate, + &mut possible_join_keys, + )?; + success + } _ => { return utils::optimize_children(self, plan, config); } @@ -82,20 +89,13 @@ impl OptimizerRule for EliminateCrossJoin { }) => { try_flatten_join_inputs(plan, &mut possible_join_keys, &mut all_inputs)? } - _ => return utils::optimize_children(self, plan, config), - }; - if !did_flat_successfully { + } { return Ok(None); } - // join keys are handled locally - let mut all_join_keys: HashSet<(Expr, Expr)> = HashSet::new(); - - if let Some(predicate) = parent_predicate { - extract_possible_join_keys(predicate, &mut possible_join_keys)?; - } - + // Join keys are handled locally: + let mut all_join_keys = HashSet::<(Expr, Expr)>::new(); let mut left = all_inputs.remove(0); while !all_inputs.is_empty() { left = find_inner_join( @@ -115,26 +115,19 @@ impl OptimizerRule for EliminateCrossJoin { )); } - let predicate = if let Some(predicate) = parent_predicate { - // there is a filter at the top. Consider its predicate also. - predicate - } else { + let Some(predicate) = parent_predicate else { return Ok(Some(left)); }; - // if there are no join keys then do nothing. + // If there are no join keys then do nothing: if all_join_keys.is_empty() { - Ok(Some(LogicalPlan::Filter(Filter::try_new( - predicate.clone(), - Arc::new(left), - )?))) + Filter::try_new(predicate.clone(), Arc::new(left)) + .map(|f| Some(LogicalPlan::Filter(f))) } else { - // remove join expressions from filter + // Remove join expressions from filter: match remove_join_expressions(predicate, &all_join_keys)? { - Some(filter_expr) => Ok(Some(LogicalPlan::Filter(Filter::try_new( - filter_expr, - Arc::new(left), - )?))), + Some(filter_expr) => Filter::try_new(filter_expr, Arc::new(left)) + .map(|f| Some(LogicalPlan::Filter(f))), _ => Ok(Some(left)), } } @@ -340,17 +333,16 @@ fn remove_join_expressions( #[cfg(test)] mod tests { + use super::*; + use crate::optimizer::OptimizerContext; + use crate::test::*; + use datafusion_expr::{ binary_expr, col, lit, logical_plan::builder::LogicalPlanBuilder, Operator::{And, Or}, }; - use crate::optimizer::OptimizerContext; - use crate::test::*; - - use super::*; - fn assert_optimized_plan_eq(plan: &LogicalPlan, expected: Vec<&str>) { let rule = EliminateCrossJoin::new(); let optimized_plan = rule diff --git a/datafusion/optimizer/src/push_down_filter.rs b/datafusion/optimizer/src/push_down_filter.rs index 5da0ffcc850d..4eed39a08941 100644 --- a/datafusion/optimizer/src/push_down_filter.rs +++ b/datafusion/optimizer/src/push_down_filter.rs @@ -15,26 +15,29 @@ //! [`PushDownFilter`] Moves filters so they are applied as early as possible in //! the plan. +use std::collections::{HashMap, HashSet}; +use std::sync::Arc; + use crate::optimizer::ApplyOrder; use crate::{OptimizerConfig, OptimizerRule}; + use datafusion_common::tree_node::{Transformed, TreeNode, VisitRecursion}; use datafusion_common::{ internal_err, plan_datafusion_err, Column, DFSchema, DFSchemaRef, DataFusionError, JoinConstraint, Result, }; use datafusion_expr::expr::Alias; +use datafusion_expr::expr_rewriter::replace_col; +use datafusion_expr::logical_plan::{ + CrossJoin, Join, JoinType, LogicalPlan, TableScan, Union, +}; use datafusion_expr::utils::{conjunction, split_conjunction, split_conjunction_owned}; use datafusion_expr::{ - and, - expr_rewriter::replace_col, - logical_plan::{CrossJoin, Join, JoinType, LogicalPlan, TableScan, Union}, - or, BinaryExpr, Expr, Filter, LogicalPlanBuilder, Operator, ScalarFunctionDefinition, - TableProviderFilterPushDown, + and, build_join_schema, or, BinaryExpr, Expr, Filter, LogicalPlanBuilder, Operator, + ScalarFunctionDefinition, TableProviderFilterPushDown, Volatility, }; -use datafusion_expr::{build_join_schema, Volatility}; + use itertools::Itertools; -use std::collections::{HashMap, HashSet}; -use std::sync::Arc; /// Optimizer rule for pushing (moving) filter expressions down in a plan so /// they are applied as early as possible. @@ -1063,13 +1066,16 @@ fn contain(e: &Expr, check_map: &HashMap) -> bool { #[cfg(test)] mod tests { + use std::fmt::{Debug, Formatter}; + use std::sync::Arc; + use super::*; use crate::optimizer::Optimizer; use crate::rewrite_disjunctive_predicate::RewriteDisjunctivePredicate; use crate::test::*; use crate::OptimizerContext; + use arrow::datatypes::{DataType, Field, Schema, SchemaRef}; - use async_trait::async_trait; use datafusion_common::{DFSchema, DFSchemaRef}; use datafusion_expr::logical_plan::table_scan; use datafusion_expr::{ @@ -1077,8 +1083,8 @@ mod tests { BinaryExpr, Expr, Extension, LogicalPlanBuilder, Operator, TableSource, TableType, UserDefinedLogicalNodeCore, }; - use std::fmt::{Debug, Formatter}; - use std::sync::Arc; + + use async_trait::async_trait; fn assert_optimized_plan_eq(plan: &LogicalPlan, expected: &str) -> Result<()> { crate::test::assert_optimized_plan_eq( From 50a0f84aa23317ae5e2389bd3e8984132245e4a4 Mon Sep 17 00:00:00 2001 From: Mehmet Ozan Kabak Date: Sat, 23 Dec 2023 13:12:37 +0300 Subject: [PATCH 4/5] Review Part 2 --- .../optimizer/src/eliminate_cross_join.rs | 18 ++++++++++-------- 1 file changed, 10 insertions(+), 8 deletions(-) diff --git a/datafusion/optimizer/src/eliminate_cross_join.rs b/datafusion/optimizer/src/eliminate_cross_join.rs index 1311e7bd0610..666130dc5394 100644 --- a/datafusion/optimizer/src/eliminate_cross_join.rs +++ b/datafusion/optimizer/src/eliminate_cross_join.rs @@ -59,23 +59,25 @@ impl OptimizerRule for EliminateCrossJoin { let mut parent_predicate = None; if !match plan { LogicalPlan::Filter(filter) => { - let input = filter.input.as_ref().clone(); + let input = filter.input.as_ref(); parent_predicate = Some(&filter.predicate); - match &input { + match input { LogicalPlan::Join(Join { join_type: JoinType::Inner, .. }) | LogicalPlan::CrossJoin(_) => { let success = try_flatten_join_inputs( - &input, + input, &mut possible_join_keys, &mut all_inputs, )?; - extract_possible_join_keys( - &filter.predicate, - &mut possible_join_keys, - )?; + if success { + extract_possible_join_keys( + &filter.predicate, + &mut possible_join_keys, + )?; + } success } _ => { @@ -110,7 +112,7 @@ impl OptimizerRule for EliminateCrossJoin { if plan.schema() != left.schema() { left = LogicalPlan::Projection(Projection::new_from_schema( - Arc::new(left.clone()), + Arc::new(left), plan.schema().clone(), )); } From ed28ce2f2562ba567b4cd8395e098b6bd89f2f08 Mon Sep 17 00:00:00 2001 From: Mehmet Ozan Kabak Date: Sat, 23 Dec 2023 15:11:30 +0300 Subject: [PATCH 5/5] More idiomatic Rust --- .../optimizer/src/eliminate_cross_join.rs | 33 ++++++++++--------- 1 file changed, 18 insertions(+), 15 deletions(-) diff --git a/datafusion/optimizer/src/eliminate_cross_join.rs b/datafusion/optimizer/src/eliminate_cross_join.rs index 666130dc5394..7c866950a622 100644 --- a/datafusion/optimizer/src/eliminate_cross_join.rs +++ b/datafusion/optimizer/src/eliminate_cross_join.rs @@ -56,29 +56,27 @@ impl OptimizerRule for EliminateCrossJoin { ) -> Result> { let mut possible_join_keys: Vec<(Expr, Expr)> = vec![]; let mut all_inputs: Vec = vec![]; - let mut parent_predicate = None; - if !match plan { + let parent_predicate = match plan { LogicalPlan::Filter(filter) => { let input = filter.input.as_ref(); - parent_predicate = Some(&filter.predicate); match input { LogicalPlan::Join(Join { join_type: JoinType::Inner, .. }) | LogicalPlan::CrossJoin(_) => { - let success = try_flatten_join_inputs( + if !try_flatten_join_inputs( input, &mut possible_join_keys, &mut all_inputs, - )?; - if success { - extract_possible_join_keys( - &filter.predicate, - &mut possible_join_keys, - )?; + )? { + return Ok(None); } - success + extract_possible_join_keys( + &filter.predicate, + &mut possible_join_keys, + )?; + Some(&filter.predicate) } _ => { return utils::optimize_children(self, plan, config); @@ -89,12 +87,17 @@ impl OptimizerRule for EliminateCrossJoin { join_type: JoinType::Inner, .. }) => { - try_flatten_join_inputs(plan, &mut possible_join_keys, &mut all_inputs)? + if !try_flatten_join_inputs( + plan, + &mut possible_join_keys, + &mut all_inputs, + )? { + return Ok(None); + } + None } _ => return utils::optimize_children(self, plan, config), - } { - return Ok(None); - } + }; // Join keys are handled locally: let mut all_join_keys = HashSet::<(Expr, Expr)>::new();