Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support non-tuple expression for in-subquery to join #4826

Merged
merged 14 commits into from
Jan 15, 2023

Conversation

ygf11
Copy link
Contributor

@ygf11 ygf11 commented Jan 5, 2023

Which issue does this PR close?

Closes #4724.

Rationale for this change

This sql works in datafusion:

> select * from t1 where t1_id in (select t2_id from t2);

but following sql does not work:

select * from t1 where t1_id + 11 in (select t2_id from t2);
NotImplemented("Physical plan does not support logical expression CAST(t1.t1_id AS Int64) + Int64(11) IN (<subquery>)")

❯ select * from t1 where t1_id  in (select t2_id + 11 from t2);
NotImplemented("Physical plan does not support logical expression t1.t1_id IN (<subquery>)")

❯ select * from t1 where t1_id + 12  in (select t2_id + 1 from t2);
NotImplemented("Physical plan does not support logical expression CAST(t1.t1_id AS Int64) + Int64(12) IN (<subquery>)")

What changes are included in this PR?

  1. Rewrite optimize_where_in in decorrelate_where_in.rs.
  2. Fix tests.

Are these changes tested?

Yes.

Are there any user-facing changes?

@github-actions github-actions bot added core Core DataFusion crate logical-expr Logical plan and expressions optimizer Optimizer rules labels Jan 5, 2023
@ygf11 ygf11 marked this pull request as ready for review January 7, 2023 10:06
@ygf11
Copy link
Contributor Author

ygf11 commented Jan 7, 2023

Please take a look, @alamb @jackwener.

subquery_filters.push(expr.clone());
} else {
join_filters.push(expr.clone())
}
Copy link
Contributor Author

@ygf11 ygf11 Jan 7, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If the expression depends on the outer table, we should move it to the join predicate.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah -- this is the decorrelation logic

.project(projection)?
.alias(&subqry_alias)?
let subquery_expr_name = format!("{:?}", unnormalize_col(subquery_expr.clone()));
let first_expr = subquery_expr.clone().alias(subquery_expr_name.clone());
Copy link
Contributor Author

@ygf11 ygf11 Jan 7, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For the right side of the in predicate, it looks strange if we show the original qualify name in the plan, so the qualify name is stripped.

@alamb
Copy link
Contributor

alamb commented Jan 7, 2023

I plan to review this tomorrow. THank you @ygf11

Copy link
Member

@jackwener jackwener left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I prepare to review this PR more carefully tonight or tomorrow.

Comment on lines +2842 to +2853
let expected = vec![
"+-------+---------+--------+",
"| t1_id | t1_name | t1_int |",
"+-------+---------+--------+",
"| 11 | a | 1 |",
"| 33 | c | 3 |",
"| 44 | d | 4 |",
"+-------+---------+--------+",
];

let results = execute_to_batches(&ctx, sql).await;
assert_batches_sorted_eq!(expected, &results);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Now we already use sqllogicaltest to support Data Driven Tests #4460.
I suggest add a file join.slt | subquery.slt to cover some case of join and subquery.

In UT and integration-test, we just focus on correctness of Plan, data-test is derived by sqllogicaltest

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

File a ticket #4870.

Comment on lines +818 to +828
let expected = "Projection: customer.c_custkey [c_custkey:Int64]\
\n LeftSemi Join: Filter: customer.c_custkey + Int32(1) = __correlated_sq_1.o_custkey AND customer.c_custkey = __correlated_sq_1.o_custkey [c_custkey:Int64, c_name:Utf8]\
\n TableScan: customer [c_custkey:Int64, c_name:Utf8]\
\n SubqueryAlias: __correlated_sq_1 [o_custkey:Int64]\
\n Projection: orders.o_custkey AS o_custkey [o_custkey:Int64]\
\n TableScan: orders [o_orderkey:Int64, o_custkey:Int64, o_orderstatus:Utf8, o_totalprice:Float64;N]";

assert_optimized_plan_eq_display_indent(
Arc::new(DecorrelateWhereIn::new()),
&plan,
"column comparison required",
expected,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍

@jackwener jackwener self-assigned this Jan 10, 2023
@alamb
Copy link
Contributor

alamb commented Jan 10, 2023

Sorry for the delay in my review. I am traveling this week at a conference so I don't have as much time to devote to reviews and merging as normal

Copy link
Contributor

@alamb alamb left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is looking good @ygf11 -- thank you. 👌

The only question I had was about queries that had two subqueries and it appears the predicate filter has been pulled up too far. Otherwise this one is looking


let expected = vec![
"Explain [plan_type:Utf8, plan:Utf8]",
" Projection: t1.t1_id, t1.t1_name, t1.t1_int [t1_id:UInt32;N, t1_name:Utf8;N, t1_int:UInt32;N]",
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍 the plan looks good

SubqueryAlias: __correlated_sq_1
Projection: lineitem.l_linestatus AS l_linestatus, lineitem.l_orderkey AS l_orderkey
TableScan: lineitem projection=[l_orderkey, l_linestatus]"#;

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This change is just whitespace, right?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not only whitespace, the alias except the first expression are removed in the projection.

before:

SubqueryAlias: __correlated_sq_1
      Projection: lineitem.l_linestatus AS l_linestatus, lineitem.l_orderkey AS l_orderkey

after:

 \n    SubqueryAlias: __correlated_sq_1\
    \n      Projection: lineitem.l_linestatus AS l_linestatus, lineitem.l_orderkey\

The first expression of the projection is the right side of in-equijoin, it may be a non-column expression, so always give it an alias, but others do not(they are alway column).

}

#[tokio::test]
async fn three_projection_exprs_subquery_to_join() -> Result<()> {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As a minor point, I am not sure what the extra coverage the multiple predicates in the where clause are adding

I wonder if we need all these tests?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added more tests -- multiple predicates in outer and subquery where clause.

@@ -121,80 +121,95 @@ impl OptimizerRule for DecorrelateWhereIn {
}
}

/// Optimize the where in subquery to left-anti/left-semi join.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

❤️

datafusion/optimizer/src/decorrelate_where_in.rs Outdated Show resolved Hide resolved
let expected = "Projection: customer.c_custkey [c_custkey:Int64]\
\n LeftSemi Join: customer.c_custkey = __correlated_sq_1.o_custkey [c_custkey:Int64, c_name:Utf8]\
\n LeftSemi Join: Filter: customer.c_custkey = __correlated_sq_1.o_custkey AND customer.c_custkey = customer.c_custkey [c_custkey:Int64, c_name:Utf8]\
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This looks like somewhat of a regression to me (the filter has been pulled out of the subquery and into the join). Is that intended?

Copy link
Contributor Author

@ygf11 ygf11 Jan 12, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No, I do not handle this special case, just treat customer.c_custkey = customer.c_custkey as join filter.

I think it is better to add this to the filter of outer table, then the plan will be like:

"Projection: customer.c_custkey [c_custkey:Int64]\
   LeftSemi Join: customer.c_custkey = __correlated_sq_1.o_custkey [c_custkey:Int64, c_name:Utf8]
     Filter: customer.c_custkey = customer.c_custkey [..]\
        TableScan: customer [c_custkey:Int64, c_name:Utf8]\
     SubqueryAlias: __correlated_sq_1 [o_custkey:Int64]\
        Projection: orders.o_custkey AS o_custkey [o_custkey:Int64]\
          TableScan: orders [o_orderkey:Int64, o_custkey:Int64, o_orderstatus:Utf8, o_totalprice:Float64;N]"

Is this the right direction?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

After more thought it probably makes sense to do this decorrelation in the general way (no special case0 as you have done and then we will rely on subsequent predicate pushdown logic to push the filters back down if possible

Copy link
Contributor Author

@ygf11 ygf11 Jan 14, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, it is better to do by other rule.
But I find our predicate pushdown logic does not support these predicates(semi-join-on) now.
https://github.com/apache/arrow-datafusion/blob/dee0dd8be6745f6cb798ba56dca6c1b936d90fd6/datafusion/optimizer/src/push_down_filter.rs#L103-L115

Maybe we should support predicate push down for semi-join-on, or we need move these predicates to the outside of join, then predicate push down can work(Like following).

// for sql: select * from t1 where t1.t1_int in(select t2.t2_int from t2 where t1.t1_id > 10);
// the output of this rule:
Projection: t1.t1_id, t1.t1_name, t1.t1_int                                                                                                                                                                                                                                          
  Filter: CAST(t1.t1_id AS Int64) > Int64(10)                                                                                                                                                                                                                                          
     LeftSemi Join:  Filter: t1.t1_int = __correlated_sq_2.t2_int                                                                                                                                                                                                                          
       TableScan: t1                                                                                                                                                                                                                                                                       
       SubqueryAlias: __correlated_sq_2                                                                                                                                                                                                                                                      
         Projection: t2.t2_int AS t2_int                                                                                                                                                                                                                                                       
           TableScan: t2                                                                                                                                                                                                                                                            

datafusion/optimizer/src/decorrelate_where_in.rs Outdated Show resolved Hide resolved
let using_cols: Vec<Column> = expr
.to_columns()?
.into_iter()
.filter(|col| input_schema.field_from_column(col).is_ok())
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

doesn't this ignore columns that are not found in the subquery? I wonder if the error should be percolated up rather than ignored? Perhaps test coverage of a query like select ... from t where x in (select non_existent_col from foo) would be good?

Although perhaps if it was an error it signals something is wrong in the plan (as a column reference wouldn't be present in the input schema 🤔 )

If this shouldn't happen, maybe we can collect the errors and return them?

Copy link
Contributor Author

@ygf11 ygf11 Jan 13, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

doesn't this ignore columns that are not found in the subquery

Yes, it will ignore. But it is used to collect columns which will be add to subquery projection.

Suppose there is a query:
select * from t where x in (select c from foo where foo.a > t.a and foo.b > t.a)

foo.a > t.a and foo.b > t.a will move to join, to make the join work, we need add foo.a and foo.b to the projection of subquery, the above code does the filter work.

If the where clause references unknown columns, we need do more work here, but I think it should be done in the planner.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ok

subquery_filters.push(expr.clone());
} else {
join_filters.push(expr.clone())
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah -- this is the decorrelation logic

datafusion/optimizer/src/decorrelate_where_in.rs Outdated Show resolved Hide resolved
let expected = "Projection: customer.c_custkey [c_custkey:Int64]\
\n LeftSemi Join: customer.c_custkey = __correlated_sq_1.o_custkey [c_custkey:Int64, c_name:Utf8]\
\n LeftSemi Join: Filter: customer.c_custkey = __correlated_sq_1.o_custkey AND customer.c_custkey = customer.c_custkey [c_custkey:Int64, c_name:Utf8]\
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

After more thought it probably makes sense to do this decorrelation in the general way (no special case0 as you have done and then we will rely on subsequent predicate pushdown logic to push the filters back down if possible

let expected = "Projection: test.b [b:UInt32]\
\n Filter: test.c > UInt32(1) [a:UInt32, b:UInt32, c:UInt32]\
\n LeftSemi Join: Filter: test.c * UInt32(2) = __correlated_sq_2.c * UInt32(2) AND test.a > __correlated_sq_2.a [a:UInt32, b:UInt32, c:UInt32]\
\n Filter: test.c > UInt32(1) [a:UInt32, b:UInt32, c:UInt32]\
Copy link
Contributor Author

@ygf11 ygf11 Jan 15, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Filter: test.c > UInt32(1) happen twice, it is better to only add once #4914.
I will fix it in the following pr.

" SubqueryAlias: __correlated_sq_1 [CAST(t2_id AS Int64) + Int64(1):Int64;N]",
" Projection: CAST(t2.t2_id AS Int64) + Int64(1) AS CAST(t2_id AS Int64) + Int64(1) [CAST(t2_id AS Int64) + Int64(1):Int64;N]",
" TableScan: t2 projection=[t2_id] [t2_id:UInt32;N]",
];
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This test case show that the special case t1.t1_int > 0 is not pushed down after optimizing.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is that something you would like to fix in this PR or is it something you would like to fix in a follow on?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I mean next pr 🤣.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

there is a issue: #4413

@alamb alamb merged commit e2daee9 into apache:master Jan 15, 2023
@alamb
Copy link
Contributor

alamb commented Jan 15, 2023

Thanks for sticking with this @ygf11

@ursabot
Copy link

ursabot commented Jan 15, 2023

Benchmark runs are scheduled for baseline = d49c805 and contender = e2daee9. e2daee9 is a master commit associated with this PR. Results will be available as each benchmark for each run completes.
Conbench compare runs links:
[Skipped ⚠️ Benchmarking of arrow-datafusion-commits is not supported on ec2-t3-xlarge-us-east-2] ec2-t3-xlarge-us-east-2
[Skipped ⚠️ Benchmarking of arrow-datafusion-commits is not supported on test-mac-arm] test-mac-arm
[Skipped ⚠️ Benchmarking of arrow-datafusion-commits is not supported on ursa-i9-9960x] ursa-i9-9960x
[Skipped ⚠️ Benchmarking of arrow-datafusion-commits is not supported on ursa-thinkcentre-m75q] ursa-thinkcentre-m75q
Buildkite builds:
Supported benchmarks:
ec2-t3-xlarge-us-east-2: Supported benchmark langs: Python, R. Runs only benchmarks with cloud = True
test-mac-arm: Supported benchmark langs: C++, Python, R
ursa-i9-9960x: Supported benchmark langs: Python, R, JavaScript
ursa-thinkcentre-m75q: Supported benchmark langs: C++, Java

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
core Core DataFusion crate logical-expr Logical plan and expressions optimizer Optimizer rules
Projects
None yet
Development

Successfully merging this pull request may close these issues.

Support non-tuple expression for in-subquery to join
4 participants