-
Notifications
You must be signed in to change notification settings - Fork 1.3k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Support non-tuple expression for in-subquery to join #4826
Conversation
Please take a look, @alamb @jackwener. |
subquery_filters.push(expr.clone()); | ||
} else { | ||
join_filters.push(expr.clone()) | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If the expression depends on the outer table, we should move it to the join predicate.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah -- this is the decorrelation logic
.project(projection)? | ||
.alias(&subqry_alias)? | ||
let subquery_expr_name = format!("{:?}", unnormalize_col(subquery_expr.clone())); | ||
let first_expr = subquery_expr.clone().alias(subquery_expr_name.clone()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
For the right side of the in predicate, it looks strange if we show the original qualify name in the plan, so the qualify name is stripped.
I plan to review this tomorrow. THank you @ygf11 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I prepare to review this PR more carefully tonight or tomorrow.
let expected = vec![ | ||
"+-------+---------+--------+", | ||
"| t1_id | t1_name | t1_int |", | ||
"+-------+---------+--------+", | ||
"| 11 | a | 1 |", | ||
"| 33 | c | 3 |", | ||
"| 44 | d | 4 |", | ||
"+-------+---------+--------+", | ||
]; | ||
|
||
let results = execute_to_batches(&ctx, sql).await; | ||
assert_batches_sorted_eq!(expected, &results); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Now we already use sqllogicaltest
to support Data Driven Tests
#4460.
I suggest add a file join.slt
| subquery.slt
to cover some case of join
and subquery
.
In UT and integration-test, we just focus on correctness of Plan
, data-test is derived by sqllogicaltest
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
File a ticket #4870.
let expected = "Projection: customer.c_custkey [c_custkey:Int64]\ | ||
\n LeftSemi Join: Filter: customer.c_custkey + Int32(1) = __correlated_sq_1.o_custkey AND customer.c_custkey = __correlated_sq_1.o_custkey [c_custkey:Int64, c_name:Utf8]\ | ||
\n TableScan: customer [c_custkey:Int64, c_name:Utf8]\ | ||
\n SubqueryAlias: __correlated_sq_1 [o_custkey:Int64]\ | ||
\n Projection: orders.o_custkey AS o_custkey [o_custkey:Int64]\ | ||
\n TableScan: orders [o_orderkey:Int64, o_custkey:Int64, o_orderstatus:Utf8, o_totalprice:Float64;N]"; | ||
|
||
assert_optimized_plan_eq_display_indent( | ||
Arc::new(DecorrelateWhereIn::new()), | ||
&plan, | ||
"column comparison required", | ||
expected, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
👍
Sorry for the delay in my review. I am traveling this week at a conference so I don't have as much time to devote to reviews and merging as normal |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is looking good @ygf11 -- thank you. 👌
The only question I had was about queries that had two subqueries and it appears the predicate filter has been pulled up too far. Otherwise this one is looking
|
||
let expected = vec![ | ||
"Explain [plan_type:Utf8, plan:Utf8]", | ||
" Projection: t1.t1_id, t1.t1_name, t1.t1_int [t1_id:UInt32;N, t1_name:Utf8;N, t1_int:UInt32;N]", |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
👍 the plan looks good
SubqueryAlias: __correlated_sq_1 | ||
Projection: lineitem.l_linestatus AS l_linestatus, lineitem.l_orderkey AS l_orderkey | ||
TableScan: lineitem projection=[l_orderkey, l_linestatus]"#; | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This change is just whitespace, right?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Not only whitespace
, the alias except the first expression are removed in the projection.
before:
SubqueryAlias: __correlated_sq_1
Projection: lineitem.l_linestatus AS l_linestatus, lineitem.l_orderkey AS l_orderkey
after:
\n SubqueryAlias: __correlated_sq_1\
\n Projection: lineitem.l_linestatus AS l_linestatus, lineitem.l_orderkey\
The first expression of the projection is the right side of in-equijoin
, it may be a non-column expression, so always give it an alias, but others do not(they are alway column).
} | ||
|
||
#[tokio::test] | ||
async fn three_projection_exprs_subquery_to_join() -> Result<()> { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
As a minor point, I am not sure what the extra coverage the multiple predicates in the where clause are adding
I wonder if we need all these tests?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Added more tests -- multiple predicates in outer and subquery where clause.
@@ -121,80 +121,95 @@ impl OptimizerRule for DecorrelateWhereIn { | |||
} | |||
} | |||
|
|||
/// Optimize the where in subquery to left-anti/left-semi join. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
❤️
let expected = "Projection: customer.c_custkey [c_custkey:Int64]\ | ||
\n LeftSemi Join: customer.c_custkey = __correlated_sq_1.o_custkey [c_custkey:Int64, c_name:Utf8]\ | ||
\n LeftSemi Join: Filter: customer.c_custkey = __correlated_sq_1.o_custkey AND customer.c_custkey = customer.c_custkey [c_custkey:Int64, c_name:Utf8]\ |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This looks like somewhat of a regression to me (the filter has been pulled out of the subquery and into the join). Is that intended?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
No, I do not handle this special case, just treat customer.c_custkey = customer.c_custkey
as join filter.
I think it is better to add this to the filter of outer table, then the plan will be like:
"Projection: customer.c_custkey [c_custkey:Int64]\
LeftSemi Join: customer.c_custkey = __correlated_sq_1.o_custkey [c_custkey:Int64, c_name:Utf8]
Filter: customer.c_custkey = customer.c_custkey [..]\
TableScan: customer [c_custkey:Int64, c_name:Utf8]\
SubqueryAlias: __correlated_sq_1 [o_custkey:Int64]\
Projection: orders.o_custkey AS o_custkey [o_custkey:Int64]\
TableScan: orders [o_orderkey:Int64, o_custkey:Int64, o_orderstatus:Utf8, o_totalprice:Float64;N]"
Is this the right direction?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
After more thought it probably makes sense to do this decorrelation in the general way (no special case0 as you have done and then we will rely on subsequent predicate pushdown logic to push the filters back down if possible
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, it is better to do by other rule.
But I find our predicate pushdown logic
does not support these predicates(semi-join-on) now.
https://github.com/apache/arrow-datafusion/blob/dee0dd8be6745f6cb798ba56dca6c1b936d90fd6/datafusion/optimizer/src/push_down_filter.rs#L103-L115
Maybe we should support predicate push down
for semi-join-on
, or we need move these predicates to the outside of join, then predicate push down
can work(Like following).
// for sql: select * from t1 where t1.t1_int in(select t2.t2_int from t2 where t1.t1_id > 10);
// the output of this rule:
Projection: t1.t1_id, t1.t1_name, t1.t1_int
Filter: CAST(t1.t1_id AS Int64) > Int64(10)
LeftSemi Join: Filter: t1.t1_int = __correlated_sq_2.t2_int
TableScan: t1
SubqueryAlias: __correlated_sq_2
Projection: t2.t2_int AS t2_int
TableScan: t2
let using_cols: Vec<Column> = expr | ||
.to_columns()? | ||
.into_iter() | ||
.filter(|col| input_schema.field_from_column(col).is_ok()) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
doesn't this ignore columns that are not found in the subquery? I wonder if the error should be percolated up rather than ignored? Perhaps test coverage of a query like select ... from t where x in (select non_existent_col from foo)
would be good?
Although perhaps if it was an error it signals something is wrong in the plan (as a column reference wouldn't be present in the input schema 🤔 )
If this shouldn't happen, maybe we can collect the errors and return them?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
doesn't this ignore columns that are not found in the subquery
Yes, it will ignore. But it is used to collect columns which will be add to subquery projection.
Suppose there is a query:
select * from t where x in (select c from foo where foo.a > t.a and foo.b > t.a)
foo.a > t.a and foo.b > t.a
will move to join
, to make the join work, we need add foo.a
and foo.b
to the projection of subquery, the above code does the filter work.
If the where clause references unknown columns, we need do more work here, but I think it should be done in the planner.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ok
subquery_filters.push(expr.clone()); | ||
} else { | ||
join_filters.push(expr.clone()) | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah -- this is the decorrelation logic
Co-authored-by: Andrew Lamb <[email protected]>
Co-authored-by: Andrew Lamb <[email protected]>
Co-authored-by: Andrew Lamb <[email protected]>
let expected = "Projection: customer.c_custkey [c_custkey:Int64]\ | ||
\n LeftSemi Join: customer.c_custkey = __correlated_sq_1.o_custkey [c_custkey:Int64, c_name:Utf8]\ | ||
\n LeftSemi Join: Filter: customer.c_custkey = __correlated_sq_1.o_custkey AND customer.c_custkey = customer.c_custkey [c_custkey:Int64, c_name:Utf8]\ |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
After more thought it probably makes sense to do this decorrelation in the general way (no special case0 as you have done and then we will rely on subsequent predicate pushdown logic to push the filters back down if possible
let expected = "Projection: test.b [b:UInt32]\ | ||
\n Filter: test.c > UInt32(1) [a:UInt32, b:UInt32, c:UInt32]\ | ||
\n LeftSemi Join: Filter: test.c * UInt32(2) = __correlated_sq_2.c * UInt32(2) AND test.a > __correlated_sq_2.a [a:UInt32, b:UInt32, c:UInt32]\ | ||
\n Filter: test.c > UInt32(1) [a:UInt32, b:UInt32, c:UInt32]\ |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Filter: test.c > UInt32(1) happen twice, it is better to only add once #4914.
I will fix it in the following pr.
" SubqueryAlias: __correlated_sq_1 [CAST(t2_id AS Int64) + Int64(1):Int64;N]", | ||
" Projection: CAST(t2.t2_id AS Int64) + Int64(1) AS CAST(t2_id AS Int64) + Int64(1) [CAST(t2_id AS Int64) + Int64(1):Int64;N]", | ||
" TableScan: t2 projection=[t2_id] [t2_id:UInt32;N]", | ||
]; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This test case show that the special case t1.t1_int > 0
is not pushed down after optimizing.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is that something you would like to fix in this PR or is it something you would like to fix in a follow on?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I mean next pr 🤣.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
there is a issue: #4413
Thanks for sticking with this @ygf11 |
Benchmark runs are scheduled for baseline = d49c805 and contender = e2daee9. e2daee9 is a master commit associated with this PR. Results will be available as each benchmark for each run completes. |
Which issue does this PR close?
Closes #4724.
Rationale for this change
This sql works in datafusion:
but following sql does not work:
What changes are included in this PR?
optimize_where_in
indecorrelate_where_in.rs
.Are these changes tested?
Yes.
Are there any user-facing changes?