Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Filter pushdown into cross join #8626

Merged
merged 5 commits into from
Dec 25, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
128 changes: 70 additions & 58 deletions datafusion/optimizer/src/eliminate_cross_join.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ use std::collections::HashSet;
use std::sync::Arc;

use crate::{utils, OptimizerConfig, OptimizerRule};

use datafusion_common::{plan_err, DataFusionError, Result};
use datafusion_expr::expr::{BinaryExpr, Expr};
use datafusion_expr::logical_plan::{
Expand Down Expand Up @@ -47,81 +48,93 @@ impl EliminateCrossJoin {
/// For above queries, the join predicate is available in filters and they are moved to
/// join nodes appropriately
/// This fix helps to improve the performance of TPCH Q19. issue#78
Copy link
Member

@viirya viirya Dec 23, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The current doc (of PushDownFilter) doesn't have such example, maybe update it with this change?

///
impl OptimizerRule for EliminateCrossJoin {
fn try_optimize(
&self,
plan: &LogicalPlan,
config: &dyn OptimizerConfig,
) -> Result<Option<LogicalPlan>> {
match plan {
let mut possible_join_keys: Vec<(Expr, Expr)> = vec![];
let mut all_inputs: Vec<LogicalPlan> = vec![];
let parent_predicate = match plan {
LogicalPlan::Filter(filter) => {
let input = filter.input.as_ref().clone();

let mut possible_join_keys: Vec<(Expr, Expr)> = vec![];
let mut all_inputs: Vec<LogicalPlan> = vec![];
let did_flat_successfully = match &input {
let input = filter.input.as_ref();
match input {
LogicalPlan::Join(Join {
join_type: JoinType::Inner,
..
})
| LogicalPlan::CrossJoin(_) => try_flatten_join_inputs(
&input,
&mut possible_join_keys,
&mut all_inputs,
)?,
| LogicalPlan::CrossJoin(_) => {
if !try_flatten_join_inputs(
input,
&mut possible_join_keys,
&mut all_inputs,
)? {
return Ok(None);
}
extract_possible_join_keys(
&filter.predicate,
&mut possible_join_keys,
)?;
Some(&filter.predicate)
}
_ => {
return utils::optimize_children(self, plan, config);
}
};

if !did_flat_successfully {
}
}
LogicalPlan::Join(Join {
join_type: JoinType::Inner,
..
}) => {
if !try_flatten_join_inputs(
plan,
&mut possible_join_keys,
&mut all_inputs,
)? {
return Ok(None);
}
None
}
Copy link
Member

@viirya viirya Dec 23, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This looks like a missing case of reordering joins to eliminate CrossJoin, do we have a test case can cover this?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you expand on this so we also address it in the follow-on PR (in case you see a bug or gap here)?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I tried to add a simple test case for this feature. However, I couldn't write a simple test to reproduce this. However, without this reordering support test fails. If I can come up with a simple test, I will add it.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh, I meant previously this rule doesn't cover the case of reordering joins to eliminate CrossJoin without a Filter on top of Join (because it did only match LogicalPlan::Filter(filter). So if adding this new matching case LogicalPlan::Join is to address it, it means we don't have test case covering it previously. Then we may need to add one.

_ => return utils::optimize_children(self, plan, config),
};

let predicate = &filter.predicate;
// join keys are handled locally
let mut all_join_keys: HashSet<(Expr, Expr)> = HashSet::new();

extract_possible_join_keys(predicate, &mut possible_join_keys)?;
// Join keys are handled locally:
let mut all_join_keys = HashSet::<(Expr, Expr)>::new();
let mut left = all_inputs.remove(0);
while !all_inputs.is_empty() {
left = find_inner_join(
&left,
&mut all_inputs,
&mut possible_join_keys,
&mut all_join_keys,
)?;
}

let mut left = all_inputs.remove(0);
while !all_inputs.is_empty() {
left = find_inner_join(
&left,
&mut all_inputs,
&mut possible_join_keys,
&mut all_join_keys,
)?;
}
left = utils::optimize_children(self, &left, config)?.unwrap_or(left);

left = utils::optimize_children(self, &left, config)?.unwrap_or(left);
if plan.schema() != left.schema() {
left = LogicalPlan::Projection(Projection::new_from_schema(
Arc::new(left),
plan.schema().clone(),
));
}

if plan.schema() != left.schema() {
left = LogicalPlan::Projection(Projection::new_from_schema(
Arc::new(left.clone()),
plan.schema().clone(),
));
}
let Some(predicate) = parent_predicate else {
return Ok(Some(left));
};

// if there are no join keys then do nothing.
if all_join_keys.is_empty() {
Ok(Some(LogicalPlan::Filter(Filter::try_new(
predicate.clone(),
Arc::new(left),
)?)))
} else {
// remove join expressions from filter
match remove_join_expressions(predicate, &all_join_keys)? {
Some(filter_expr) => Ok(Some(LogicalPlan::Filter(
Filter::try_new(filter_expr, Arc::new(left))?,
))),
_ => Ok(Some(left)),
}
}
// If there are no join keys then do nothing:
if all_join_keys.is_empty() {
Filter::try_new(predicate.clone(), Arc::new(left))
.map(|f| Some(LogicalPlan::Filter(f)))
} else {
// Remove join expressions from filter:
match remove_join_expressions(predicate, &all_join_keys)? {
Some(filter_expr) => Filter::try_new(filter_expr, Arc::new(left))
.map(|f| Some(LogicalPlan::Filter(f))),
_ => Ok(Some(left)),
}

_ => utils::optimize_children(self, plan, config),
}
}

Expand Down Expand Up @@ -325,17 +338,16 @@ fn remove_join_expressions(

#[cfg(test)]
mod tests {
use super::*;
use crate::optimizer::OptimizerContext;
use crate::test::*;

use datafusion_expr::{
binary_expr, col, lit,
logical_plan::builder::LogicalPlanBuilder,
Operator::{And, Or},
};

use crate::optimizer::OptimizerContext;
use crate::test::*;

use super::*;

fn assert_optimized_plan_eq(plan: &LogicalPlan, expected: Vec<&str>) {
let rule = EliminateCrossJoin::new();
let optimized_plan = rule
Expand Down
89 changes: 65 additions & 24 deletions datafusion/optimizer/src/push_down_filter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,25 +15,29 @@
//! [`PushDownFilter`] Moves filters so they are applied as early as possible in
//! the plan.

use std::collections::{HashMap, HashSet};
use std::sync::Arc;

use crate::optimizer::ApplyOrder;
use crate::{OptimizerConfig, OptimizerRule};

use datafusion_common::tree_node::{Transformed, TreeNode, VisitRecursion};
use datafusion_common::{
internal_err, plan_datafusion_err, Column, DFSchema, DataFusionError, Result,
internal_err, plan_datafusion_err, Column, DFSchema, DFSchemaRef, DataFusionError,
JoinConstraint, Result,
};
use datafusion_expr::expr::Alias;
use datafusion_expr::expr_rewriter::replace_col;
use datafusion_expr::logical_plan::{
CrossJoin, Join, JoinType, LogicalPlan, TableScan, Union,
};
use datafusion_expr::utils::{conjunction, split_conjunction, split_conjunction_owned};
use datafusion_expr::Volatility;
use datafusion_expr::{
and,
expr_rewriter::replace_col,
logical_plan::{CrossJoin, Join, JoinType, LogicalPlan, TableScan, Union},
or, BinaryExpr, Expr, Filter, Operator, ScalarFunctionDefinition,
TableProviderFilterPushDown,
and, build_join_schema, or, BinaryExpr, Expr, Filter, LogicalPlanBuilder, Operator,
ScalarFunctionDefinition, TableProviderFilterPushDown, Volatility,
};

use itertools::Itertools;
use std::collections::{HashMap, HashSet};
use std::sync::Arc;

/// Optimizer rule for pushing (moving) filter expressions down in a plan so
/// they are applied as early as possible.
Expand Down Expand Up @@ -848,17 +852,23 @@ impl OptimizerRule for PushDownFilter {
None => return Ok(None),
}
}
LogicalPlan::CrossJoin(CrossJoin { left, right, .. }) => {
LogicalPlan::CrossJoin(cross_join) => {
let predicates = split_conjunction_owned(filter.predicate.clone());
push_down_all_join(
let join = convert_cross_join_to_inner_join(cross_join.clone())?;
let join_plan = LogicalPlan::Join(join);
let inputs = join_plan.inputs();
let left = inputs[0];
let right = inputs[1];
let plan = push_down_all_join(
predicates,
vec![],
&filter.input,
&join_plan,
left,
right,
vec![],
false,
)?
true,
)?;
convert_to_cross_join_if_beneficial(plan)?
}
LogicalPlan::TableScan(scan) => {
let filter_predicates = split_conjunction(&filter.predicate);
Expand Down Expand Up @@ -955,6 +965,36 @@ impl PushDownFilter {
}
}

/// Convert cross join to join by pushing down filter predicate to the join condition
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks like this function only does "Convert cross join to join"?

"pushing down filter predicate" is done by push_down_all_join. Newbies might be confused by this.

Copy link
Contributor Author

@mustafasrepo mustafasrepo Dec 25, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Exactly, I have updated the comment to reflect this

fn convert_cross_join_to_inner_join(cross_join: CrossJoin) -> Result<Join> {
let CrossJoin { left, right, .. } = cross_join;
let join_schema = build_join_schema(left.schema(), right.schema(), &JoinType::Inner)?;
// predicate is given
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this comment put wrong place?

Ok(Join {
left,
right,
join_type: JoinType::Inner,
join_constraint: JoinConstraint::On,
on: vec![],
filter: None,
schema: DFSchemaRef::new(join_schema),
null_equals_null: true,
})
}

/// Converts the inner join with empty equality predicate and empty filter condition to the cross join
fn convert_to_cross_join_if_beneficial(plan: LogicalPlan) -> Result<LogicalPlan> {
if let LogicalPlan::Join(join) = &plan {
// Can be converted back to cross join
if join.on.is_empty() && join.filter.is_none() {
return LogicalPlanBuilder::from(join.left.as_ref().clone())
.cross_join(join.right.as_ref().clone())?
.build();
}
}
Ok(plan)
}
Comment on lines +986 to +996
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm, after push_down_all_join, if there are predicates that cannot be join condition, they will be into a Filter on top of join node. In the case, this function will skip to convert the under join to cross join?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

To be more precise filter can be pushed down below the join completely. In this case we may end up with joins empty equality predicate and empty filter condition.
As an example

Filter(l.a>l.b AND r.a>r.b)
--Join (on=[], filter=None)
----LeftTable(a,  b)
----RightTable(a, b)

will be converted to the plan below after push_down_all_join

Join (on=[], filter=None)
--Filter(l.a>l.b)
----LeftTable(a,  b)
--Filter(r.a>r.b)
----RightTable(a, b)

this util ensures that plan above is converted to the plan below

CrossJoin
--Filter(l.a>l.b)
----LeftTable(a,  b)
--Filter(r.a>r.b)
----RightTable(a, b)

However, if the original plan were

Filter(l.a>r.b)
--Join (on=[], filter=None)
----LeftTable(a,  b)
----RightTable(a, b)

after push_down_all_join we will end up with following plan

Join (on=[], filter=Some(l.a>r.b))
--LeftTable(a,  b)
--RightTable(a, b)

in this case join top cannot be converted to the cross join.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I mean if after push_down_all_join there are predicates remaining in Filter, but Join has empty on and empty filter, because you match the plan by if let LogicalPlan::Join(join) = &plan, the Join won't be converted to a Cross Join even looks like it should be (based on the logic here).

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As far as I know, after after push_down_all_join predicates remaining in Filter will be pushed into the join filter. Hence, we are sure that top operator is Join.

Copy link
Member

@viirya viirya Dec 25, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is how push_down_all_join handles remaining predicates in Filter.

if keep_predicates.is_empty() {
  Ok(plan)
} else {
  // wrap the join on the filter whose predicates must be kept
  match conjunction(keep_predicates) {
    Some(predicate) => Ok(LogicalPlan::Filter(Filter::try_new( // Filter on top on Join
      predicate,
      Arc::new(plan),  // this is Join
    )?)),
    None => Ok(plan),
  }
}

And after it, convert_to_cross_join_if_beneficial is called. Where does the Filter push down? Am I missing it?

let plan = push_down_all_join(
  predicates,
  vec![],
  &join_plan,
  left,
  right,
  vec![],
  true,
  )?;
convert_to_cross_join_if_beneficial(plan)?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

And, the reason that the predicate is kept in Filter in push_down_all_join is because it cannot be pushed into Join filter or pushed down through Join. How does it push into/push down later?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think you are right. We will discuss and file a follow-on PR today to fix.


/// replaces columns by its name on the projection.
pub fn replace_cols_by_name(
e: Expr,
Expand Down Expand Up @@ -1026,22 +1066,25 @@ fn contain(e: &Expr, check_map: &HashMap<String, Expr>) -> bool {

#[cfg(test)]
mod tests {
use std::fmt::{Debug, Formatter};
use std::sync::Arc;

use super::*;
use crate::optimizer::Optimizer;
use crate::rewrite_disjunctive_predicate::RewriteDisjunctivePredicate;
use crate::test::*;
use crate::OptimizerContext;

use arrow::datatypes::{DataType, Field, Schema, SchemaRef};
use async_trait::async_trait;
use datafusion_common::{DFSchema, DFSchemaRef};
use datafusion_expr::logical_plan::table_scan;
use datafusion_expr::{
and, col, in_list, in_subquery, lit, logical_plan::JoinType, or, random, sum,
BinaryExpr, Expr, Extension, LogicalPlanBuilder, Operator, TableSource,
TableType, UserDefinedLogicalNodeCore,
};
use std::fmt::{Debug, Formatter};
use std::sync::Arc;

use async_trait::async_trait;

fn assert_optimized_plan_eq(plan: &LogicalPlan, expected: &str) -> Result<()> {
crate::test::assert_optimized_plan_eq(
Expand Down Expand Up @@ -2665,14 +2708,12 @@ Projection: a, b
.cross_join(right)?
.filter(filter)?
.build()?;

let expected = "\
Filter: test.a = d AND test.b > UInt32(1) OR test.b = e AND test.c < UInt32(10)\
\n CrossJoin:\
\n Projection: test.a, test.b, test.c\
\n TableScan: test, full_filters=[test.b > UInt32(1) OR test.c < UInt32(10)]\
\n Projection: test1.a AS d, test1.a AS e\
\n TableScan: test1";
Inner Join: Filter: test.a = d AND test.b > UInt32(1) OR test.b = e AND test.c < UInt32(10)\
\n Projection: test.a, test.b, test.c\
\n TableScan: test, full_filters=[test.b > UInt32(1) OR test.c < UInt32(10)]\
\n Projection: test1.a AS d, test1.a AS e\
\n TableScan: test1";
assert_optimized_plan_eq_with_rewrite_predicate(&plan, expected)?;

// Originally global state which can help to avoid duplicate Filters been generated and pushed down.
Expand Down
17 changes: 17 additions & 0 deletions datafusion/sqllogictest/test_files/joins.slt
Original file line number Diff line number Diff line change
Expand Up @@ -3466,6 +3466,23 @@ SortPreservingMergeExec: [a@0 ASC]
----------------------RepartitionExec: partitioning=RoundRobinBatch(2), input_partitions=1
------------------------CsvExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/core/tests/data/window_2.csv]]}, projection=[a, b], output_ordering=[a@0 ASC, b@1 ASC NULLS LAST], has_header=true

query TT
EXPLAIN SELECT *
FROM annotated_data as l, annotated_data as r
WHERE l.a > r.a
----
logical_plan
Inner Join: Filter: l.a > r.a
--SubqueryAlias: l
----TableScan: annotated_data projection=[a0, a, b, c, d]
--SubqueryAlias: r
----TableScan: annotated_data projection=[a0, a, b, c, d]
physical_plan
NestedLoopJoinExec: join_type=Inner, filter=a@0 > a@1
--RepartitionExec: partitioning=RoundRobinBatch(2), input_partitions=1
----CsvExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/core/tests/data/window_2.csv]]}, projection=[a0, a, b, c, d], output_ordering=[a@1 ASC, b@2 ASC NULLS LAST, c@3 ASC NULLS LAST], has_header=true
--CsvExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/core/tests/data/window_2.csv]]}, projection=[a0, a, b, c, d], output_ordering=[a@1 ASC, b@2 ASC NULLS LAST, c@3 ASC NULLS LAST], has_header=true

####
# Config teardown
####
Expand Down