-
Notifications
You must be signed in to change notification settings - Fork 1.3k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Filter pushdown into cross join #8626
Changes from 4 commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -20,6 +20,7 @@ use std::collections::HashSet; | |
use std::sync::Arc; | ||
|
||
use crate::{utils, OptimizerConfig, OptimizerRule}; | ||
|
||
use datafusion_common::{plan_err, DataFusionError, Result}; | ||
use datafusion_expr::expr::{BinaryExpr, Expr}; | ||
use datafusion_expr::logical_plan::{ | ||
|
@@ -47,81 +48,90 @@ impl EliminateCrossJoin { | |
/// For above queries, the join predicate is available in filters and they are moved to | ||
/// join nodes appropriately | ||
/// This fix helps to improve the performance of TPCH Q19. issue#78 | ||
/// | ||
impl OptimizerRule for EliminateCrossJoin { | ||
fn try_optimize( | ||
&self, | ||
plan: &LogicalPlan, | ||
config: &dyn OptimizerConfig, | ||
) -> Result<Option<LogicalPlan>> { | ||
match plan { | ||
let mut possible_join_keys: Vec<(Expr, Expr)> = vec![]; | ||
let mut all_inputs: Vec<LogicalPlan> = vec![]; | ||
let mut parent_predicate = None; | ||
if !match plan { | ||
LogicalPlan::Filter(filter) => { | ||
let input = filter.input.as_ref().clone(); | ||
|
||
let mut possible_join_keys: Vec<(Expr, Expr)> = vec![]; | ||
let mut all_inputs: Vec<LogicalPlan> = vec![]; | ||
let did_flat_successfully = match &input { | ||
let input = filter.input.as_ref(); | ||
parent_predicate = Some(&filter.predicate); | ||
match input { | ||
LogicalPlan::Join(Join { | ||
join_type: JoinType::Inner, | ||
.. | ||
}) | ||
| LogicalPlan::CrossJoin(_) => try_flatten_join_inputs( | ||
&input, | ||
&mut possible_join_keys, | ||
&mut all_inputs, | ||
)?, | ||
| LogicalPlan::CrossJoin(_) => { | ||
let success = try_flatten_join_inputs( | ||
input, | ||
&mut possible_join_keys, | ||
&mut all_inputs, | ||
)?; | ||
if success { | ||
extract_possible_join_keys( | ||
&filter.predicate, | ||
&mut possible_join_keys, | ||
)?; | ||
} | ||
success | ||
} | ||
_ => { | ||
return utils::optimize_children(self, plan, config); | ||
} | ||
}; | ||
|
||
if !did_flat_successfully { | ||
return Ok(None); | ||
} | ||
} | ||
LogicalPlan::Join(Join { | ||
join_type: JoinType::Inner, | ||
.. | ||
}) => { | ||
try_flatten_join_inputs(plan, &mut possible_join_keys, &mut all_inputs)? | ||
} | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This looks like a missing case of reordering joins to eliminate CrossJoin, do we have a test case can cover this? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Can you expand on this so we also address it in the follow-on PR (in case you see a bug or gap here)? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I tried to add a simple test case for this feature. However, I couldn't write a simple test to reproduce this. However, without this reordering support test fails. If I can come up with a simple test, I will add it. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Oh, I meant previously this rule doesn't cover the case of reordering joins to eliminate CrossJoin without a Filter on top of Join (because it did only match |
||
_ => return utils::optimize_children(self, plan, config), | ||
} { | ||
return Ok(None); | ||
} | ||
|
||
let predicate = &filter.predicate; | ||
// join keys are handled locally | ||
let mut all_join_keys: HashSet<(Expr, Expr)> = HashSet::new(); | ||
|
||
extract_possible_join_keys(predicate, &mut possible_join_keys)?; | ||
// Join keys are handled locally: | ||
let mut all_join_keys = HashSet::<(Expr, Expr)>::new(); | ||
let mut left = all_inputs.remove(0); | ||
while !all_inputs.is_empty() { | ||
left = find_inner_join( | ||
&left, | ||
&mut all_inputs, | ||
&mut possible_join_keys, | ||
&mut all_join_keys, | ||
)?; | ||
} | ||
|
||
let mut left = all_inputs.remove(0); | ||
while !all_inputs.is_empty() { | ||
left = find_inner_join( | ||
&left, | ||
&mut all_inputs, | ||
&mut possible_join_keys, | ||
&mut all_join_keys, | ||
)?; | ||
} | ||
left = utils::optimize_children(self, &left, config)?.unwrap_or(left); | ||
|
||
left = utils::optimize_children(self, &left, config)?.unwrap_or(left); | ||
if plan.schema() != left.schema() { | ||
left = LogicalPlan::Projection(Projection::new_from_schema( | ||
Arc::new(left), | ||
plan.schema().clone(), | ||
)); | ||
} | ||
|
||
if plan.schema() != left.schema() { | ||
left = LogicalPlan::Projection(Projection::new_from_schema( | ||
Arc::new(left.clone()), | ||
plan.schema().clone(), | ||
)); | ||
} | ||
let Some(predicate) = parent_predicate else { | ||
return Ok(Some(left)); | ||
}; | ||
|
||
// if there are no join keys then do nothing. | ||
if all_join_keys.is_empty() { | ||
Ok(Some(LogicalPlan::Filter(Filter::try_new( | ||
predicate.clone(), | ||
Arc::new(left), | ||
)?))) | ||
} else { | ||
// remove join expressions from filter | ||
match remove_join_expressions(predicate, &all_join_keys)? { | ||
Some(filter_expr) => Ok(Some(LogicalPlan::Filter( | ||
Filter::try_new(filter_expr, Arc::new(left))?, | ||
))), | ||
_ => Ok(Some(left)), | ||
} | ||
} | ||
// If there are no join keys then do nothing: | ||
if all_join_keys.is_empty() { | ||
Filter::try_new(predicate.clone(), Arc::new(left)) | ||
.map(|f| Some(LogicalPlan::Filter(f))) | ||
} else { | ||
// Remove join expressions from filter: | ||
match remove_join_expressions(predicate, &all_join_keys)? { | ||
Some(filter_expr) => Filter::try_new(filter_expr, Arc::new(left)) | ||
.map(|f| Some(LogicalPlan::Filter(f))), | ||
_ => Ok(Some(left)), | ||
} | ||
|
||
_ => utils::optimize_children(self, plan, config), | ||
} | ||
} | ||
|
||
|
@@ -325,17 +335,16 @@ fn remove_join_expressions( | |
|
||
#[cfg(test)] | ||
mod tests { | ||
use super::*; | ||
use crate::optimizer::OptimizerContext; | ||
use crate::test::*; | ||
|
||
use datafusion_expr::{ | ||
binary_expr, col, lit, | ||
logical_plan::builder::LogicalPlanBuilder, | ||
Operator::{And, Or}, | ||
}; | ||
|
||
use crate::optimizer::OptimizerContext; | ||
use crate::test::*; | ||
|
||
use super::*; | ||
|
||
fn assert_optimized_plan_eq(plan: &LogicalPlan, expected: Vec<&str>) { | ||
let rule = EliminateCrossJoin::new(); | ||
let optimized_plan = rule | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -15,25 +15,29 @@ | |
//! [`PushDownFilter`] Moves filters so they are applied as early as possible in | ||
//! the plan. | ||
|
||
use std::collections::{HashMap, HashSet}; | ||
use std::sync::Arc; | ||
|
||
use crate::optimizer::ApplyOrder; | ||
use crate::{OptimizerConfig, OptimizerRule}; | ||
|
||
use datafusion_common::tree_node::{Transformed, TreeNode, VisitRecursion}; | ||
use datafusion_common::{ | ||
internal_err, plan_datafusion_err, Column, DFSchema, DataFusionError, Result, | ||
internal_err, plan_datafusion_err, Column, DFSchema, DFSchemaRef, DataFusionError, | ||
JoinConstraint, Result, | ||
}; | ||
use datafusion_expr::expr::Alias; | ||
use datafusion_expr::expr_rewriter::replace_col; | ||
use datafusion_expr::logical_plan::{ | ||
CrossJoin, Join, JoinType, LogicalPlan, TableScan, Union, | ||
}; | ||
use datafusion_expr::utils::{conjunction, split_conjunction, split_conjunction_owned}; | ||
use datafusion_expr::Volatility; | ||
use datafusion_expr::{ | ||
and, | ||
expr_rewriter::replace_col, | ||
logical_plan::{CrossJoin, Join, JoinType, LogicalPlan, TableScan, Union}, | ||
or, BinaryExpr, Expr, Filter, Operator, ScalarFunctionDefinition, | ||
TableProviderFilterPushDown, | ||
and, build_join_schema, or, BinaryExpr, Expr, Filter, LogicalPlanBuilder, Operator, | ||
ScalarFunctionDefinition, TableProviderFilterPushDown, Volatility, | ||
}; | ||
|
||
use itertools::Itertools; | ||
use std::collections::{HashMap, HashSet}; | ||
use std::sync::Arc; | ||
|
||
/// Optimizer rule for pushing (moving) filter expressions down in a plan so | ||
/// they are applied as early as possible. | ||
|
@@ -848,17 +852,23 @@ impl OptimizerRule for PushDownFilter { | |
None => return Ok(None), | ||
} | ||
} | ||
LogicalPlan::CrossJoin(CrossJoin { left, right, .. }) => { | ||
LogicalPlan::CrossJoin(cross_join) => { | ||
let predicates = split_conjunction_owned(filter.predicate.clone()); | ||
push_down_all_join( | ||
let join = convert_cross_join_to_inner_join(cross_join.clone())?; | ||
let join_plan = LogicalPlan::Join(join); | ||
let inputs = join_plan.inputs(); | ||
let left = inputs[0]; | ||
let right = inputs[1]; | ||
let plan = push_down_all_join( | ||
predicates, | ||
vec![], | ||
&filter.input, | ||
&join_plan, | ||
left, | ||
right, | ||
vec![], | ||
false, | ||
)? | ||
true, | ||
)?; | ||
convert_to_cross_join_if_beneficial(plan)? | ||
} | ||
LogicalPlan::TableScan(scan) => { | ||
let filter_predicates = split_conjunction(&filter.predicate); | ||
|
@@ -955,6 +965,36 @@ impl PushDownFilter { | |
} | ||
} | ||
|
||
/// Convert cross join to join by pushing down filter predicate to the join condition | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Looks like this function only does "Convert cross join to join"? "pushing down filter predicate" is done by There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Exactly, I have updated the comment to reflect this |
||
fn convert_cross_join_to_inner_join(cross_join: CrossJoin) -> Result<Join> { | ||
let CrossJoin { left, right, .. } = cross_join; | ||
let join_schema = build_join_schema(left.schema(), right.schema(), &JoinType::Inner)?; | ||
// predicate is given | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Is this comment put wrong place? |
||
Ok(Join { | ||
left, | ||
right, | ||
join_type: JoinType::Inner, | ||
join_constraint: JoinConstraint::On, | ||
on: vec![], | ||
filter: None, | ||
schema: DFSchemaRef::new(join_schema), | ||
null_equals_null: true, | ||
}) | ||
} | ||
|
||
/// Converts the inner join with empty equality predicate and empty filter condition to the cross join | ||
fn convert_to_cross_join_if_beneficial(plan: LogicalPlan) -> Result<LogicalPlan> { | ||
if let LogicalPlan::Join(join) = &plan { | ||
// Can be converted back to cross join | ||
if join.on.is_empty() && join.filter.is_none() { | ||
return LogicalPlanBuilder::from(join.left.as_ref().clone()) | ||
.cross_join(join.right.as_ref().clone())? | ||
.build(); | ||
} | ||
} | ||
Ok(plan) | ||
} | ||
Comment on lines
+986
to
+996
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Hmm, after There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. To be more precise filter can be pushed down below the join completely. In this case we may end up with joins empty equality predicate and empty filter condition.
will be converted to the plan below after
this util ensures that plan above is converted to the plan below
However, if the original plan were
after
in this case join top cannot be converted to the cross join. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I mean if after There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. As far as I know, after There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This is how if keep_predicates.is_empty() {
Ok(plan)
} else {
// wrap the join on the filter whose predicates must be kept
match conjunction(keep_predicates) {
Some(predicate) => Ok(LogicalPlan::Filter(Filter::try_new( // Filter on top on Join
predicate,
Arc::new(plan), // this is Join
)?)),
None => Ok(plan),
}
} And after it,
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. And, the reason that the predicate is kept in Filter in There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think you are right. We will discuss and file a follow-on PR today to fix. |
||
|
||
/// replaces columns by its name on the projection. | ||
pub fn replace_cols_by_name( | ||
e: Expr, | ||
|
@@ -1026,22 +1066,25 @@ fn contain(e: &Expr, check_map: &HashMap<String, Expr>) -> bool { | |
|
||
#[cfg(test)] | ||
mod tests { | ||
use std::fmt::{Debug, Formatter}; | ||
use std::sync::Arc; | ||
|
||
use super::*; | ||
use crate::optimizer::Optimizer; | ||
use crate::rewrite_disjunctive_predicate::RewriteDisjunctivePredicate; | ||
use crate::test::*; | ||
use crate::OptimizerContext; | ||
|
||
use arrow::datatypes::{DataType, Field, Schema, SchemaRef}; | ||
use async_trait::async_trait; | ||
use datafusion_common::{DFSchema, DFSchemaRef}; | ||
use datafusion_expr::logical_plan::table_scan; | ||
use datafusion_expr::{ | ||
and, col, in_list, in_subquery, lit, logical_plan::JoinType, or, random, sum, | ||
BinaryExpr, Expr, Extension, LogicalPlanBuilder, Operator, TableSource, | ||
TableType, UserDefinedLogicalNodeCore, | ||
}; | ||
use std::fmt::{Debug, Formatter}; | ||
use std::sync::Arc; | ||
|
||
use async_trait::async_trait; | ||
|
||
fn assert_optimized_plan_eq(plan: &LogicalPlan, expected: &str) -> Result<()> { | ||
crate::test::assert_optimized_plan_eq( | ||
|
@@ -2665,14 +2708,12 @@ Projection: a, b | |
.cross_join(right)? | ||
.filter(filter)? | ||
.build()?; | ||
|
||
let expected = "\ | ||
Filter: test.a = d AND test.b > UInt32(1) OR test.b = e AND test.c < UInt32(10)\ | ||
\n CrossJoin:\ | ||
\n Projection: test.a, test.b, test.c\ | ||
\n TableScan: test, full_filters=[test.b > UInt32(1) OR test.c < UInt32(10)]\ | ||
\n Projection: test1.a AS d, test1.a AS e\ | ||
\n TableScan: test1"; | ||
Inner Join: Filter: test.a = d AND test.b > UInt32(1) OR test.b = e AND test.c < UInt32(10)\ | ||
\n Projection: test.a, test.b, test.c\ | ||
\n TableScan: test, full_filters=[test.b > UInt32(1) OR test.c < UInt32(10)]\ | ||
\n Projection: test1.a AS d, test1.a AS e\ | ||
\n TableScan: test1"; | ||
assert_optimized_plan_eq_with_rewrite_predicate(&plan, expected)?; | ||
|
||
// Originally global state which can help to avoid duplicate Filters been generated and pushed down. | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The current doc (of PushDownFilter) doesn't have such example, maybe update it with this change?