-
Notifications
You must be signed in to change notification settings - Fork 1.3k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Support non-tuple expression for in-subquery to join #4826
Changes from 6 commits
10bafa9
6b929df
7767bb1
de0fdf3
d641b51
c7b7a4c
5d25827
e0055c8
08a66bb
d4187f8
f5ff04a
747957a
96d26bd
f035229
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -2810,3 +2810,137 @@ async fn type_coercion_join_with_filter_and_equi_expr() -> Result<()> { | |
|
||
Ok(()) | ||
} | ||
|
||
#[tokio::test] | ||
async fn subquery_to_join_with_both_side_expr() -> Result<()> { | ||
let ctx = create_join_context("t1_id", "t2_id", false)?; | ||
|
||
let sql = "select t1.t1_id, t1.t1_name, t1.t1_int from t1 where t1.t1_id + 12 in (select t2.t2_id + 1 from t2)"; | ||
|
||
// assert logical plan | ||
let msg = format!("Creating logical plan for '{sql}'"); | ||
let dataframe = ctx.sql(&("explain ".to_owned() + sql)).await.expect(&msg); | ||
let plan = dataframe.into_optimized_plan().unwrap(); | ||
|
||
let expected = vec![ | ||
"Explain [plan_type:Utf8, plan:Utf8]", | ||
" Projection: t1.t1_id, t1.t1_name, t1.t1_int [t1_id:UInt32;N, t1_name:Utf8;N, t1_int:UInt32;N]", | ||
" LeftSemi Join: CAST(t1.t1_id AS Int64) + Int64(12) = __correlated_sq_1.CAST(t2_id AS Int64) + Int64(1) [t1_id:UInt32;N, t1_name:Utf8;N, t1_int:UInt32;N]", | ||
" TableScan: t1 projection=[t1_id, t1_name, t1_int] [t1_id:UInt32;N, t1_name:Utf8;N, t1_int:UInt32;N]", | ||
" SubqueryAlias: __correlated_sq_1 [CAST(t2_id AS Int64) + Int64(1):Int64;N]", | ||
" Projection: CAST(t2.t2_id AS Int64) + Int64(1) AS CAST(t2_id AS Int64) + Int64(1) [CAST(t2_id AS Int64) + Int64(1):Int64;N]", | ||
" TableScan: t2 projection=[t2_id] [t2_id:UInt32;N]", | ||
]; | ||
|
||
let formatted = plan.display_indent_schema().to_string(); | ||
let actual: Vec<&str> = formatted.trim().lines().collect(); | ||
assert_eq!( | ||
expected, actual, | ||
"\n\nexpected:\n\n{expected:#?}\nactual:\n\n{actual:#?}\n\n" | ||
); | ||
|
||
let expected = vec![ | ||
"+-------+---------+--------+", | ||
"| t1_id | t1_name | t1_int |", | ||
"+-------+---------+--------+", | ||
"| 11 | a | 1 |", | ||
"| 33 | c | 3 |", | ||
"| 44 | d | 4 |", | ||
"+-------+---------+--------+", | ||
]; | ||
|
||
let results = execute_to_batches(&ctx, sql).await; | ||
assert_batches_sorted_eq!(expected, &results); | ||
Comment on lines
+2900
to
+2911
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Now we already use In UT and integration-test, we just focus on correctness of There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. File a ticket #4870. |
||
|
||
Ok(()) | ||
} | ||
|
||
#[tokio::test] | ||
async fn subquery_to_join_with_muti_filter() -> Result<()> { | ||
let ctx = create_join_context("t1_id", "t2_id", false)?; | ||
|
||
let sql = "select t1.t1_id, t1.t1_name, t1.t1_int from t1 where t1.t1_id + 12 in | ||
(select t2.t2_id + 1 from t2 where t1.t1_int <= t2.t2_int and t2.t2_int > 0)"; | ||
|
||
// assert logical plan | ||
let msg = format!("Creating logical plan for '{sql}'"); | ||
let dataframe = ctx.sql(&("explain ".to_owned() + sql)).await.expect(&msg); | ||
let plan = dataframe.into_optimized_plan().unwrap(); | ||
|
||
let expected = vec![ | ||
"Explain [plan_type:Utf8, plan:Utf8]", | ||
" Projection: t1.t1_id, t1.t1_name, t1.t1_int [t1_id:UInt32;N, t1_name:Utf8;N, t1_int:UInt32;N]", | ||
" LeftSemi Join: CAST(t1.t1_id AS Int64) + Int64(12) = __correlated_sq_1.CAST(t2_id AS Int64) + Int64(1) Filter: t1.t1_int <= __correlated_sq_1.t2_int [t1_id:UInt32;N, t1_name:Utf8;N, t1_int:UInt32;N]", | ||
" TableScan: t1 projection=[t1_id, t1_name, t1_int] [t1_id:UInt32;N, t1_name:Utf8;N, t1_int:UInt32;N]", | ||
" SubqueryAlias: __correlated_sq_1 [CAST(t2_id AS Int64) + Int64(1):Int64;N, t2_int:UInt32;N]", | ||
" Projection: CAST(t2.t2_id AS Int64) + Int64(1) AS CAST(t2_id AS Int64) + Int64(1), t2.t2_int [CAST(t2_id AS Int64) + Int64(1):Int64;N, t2_int:UInt32;N]", | ||
" Filter: t2.t2_int > UInt32(0) [t2_id:UInt32;N, t2_int:UInt32;N]", | ||
" TableScan: t2 projection=[t2_id, t2_int] [t2_id:UInt32;N, t2_int:UInt32;N]", | ||
]; | ||
|
||
let formatted = plan.display_indent_schema().to_string(); | ||
let actual: Vec<&str> = formatted.trim().lines().collect(); | ||
assert_eq!( | ||
expected, actual, | ||
"\n\nexpected:\n\n{expected:#?}\nactual:\n\n{actual:#?}\n\n" | ||
); | ||
|
||
let expected = vec![ | ||
"+-------+---------+--------+", | ||
"| t1_id | t1_name | t1_int |", | ||
"+-------+---------+--------+", | ||
"| 11 | a | 1 |", | ||
"| 33 | c | 3 |", | ||
"+-------+---------+--------+", | ||
]; | ||
|
||
let results = execute_to_batches(&ctx, sql).await; | ||
assert_batches_sorted_eq!(expected, &results); | ||
|
||
Ok(()) | ||
} | ||
|
||
#[tokio::test] | ||
async fn three_projection_exprs_subquery_to_join() -> Result<()> { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. As a minor point, I am not sure what the extra coverage the multiple predicates in the where clause are adding I wonder if we need all these tests? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Added more tests -- multiple predicates in outer and subquery where clause. |
||
let ctx = create_join_context("t1_id", "t2_id", false)?; | ||
|
||
let sql = "select t1.t1_id, t1.t1_name, t1.t1_int from t1 where t1.t1_id + 12 in | ||
(select t2.t2_id + 1 from t2 where t1.t1_int <= t2.t2_int and t1.t1_name != t2.t2_name and t2.t2_int > 0)"; | ||
|
||
// assert logical plan | ||
let msg = format!("Creating logical plan for '{sql}'"); | ||
let dataframe = ctx.sql(&("explain ".to_owned() + sql)).await.expect(&msg); | ||
let plan = dataframe.into_optimized_plan().unwrap(); | ||
|
||
let expected = vec![ | ||
"Explain [plan_type:Utf8, plan:Utf8]", | ||
" Projection: t1.t1_id, t1.t1_name, t1.t1_int [t1_id:UInt32;N, t1_name:Utf8;N, t1_int:UInt32;N]", | ||
" LeftSemi Join: CAST(t1.t1_id AS Int64) + Int64(12) = __correlated_sq_1.CAST(t2_id AS Int64) + Int64(1) Filter: t1.t1_int <= __correlated_sq_1.t2_int AND t1.t1_name != __correlated_sq_1.t2_name [t1_id:UInt32;N, t1_name:Utf8;N, t1_int:UInt32;N]", | ||
" TableScan: t1 projection=[t1_id, t1_name, t1_int] [t1_id:UInt32;N, t1_name:Utf8;N, t1_int:UInt32;N]", | ||
" SubqueryAlias: __correlated_sq_1 [CAST(t2_id AS Int64) + Int64(1):Int64;N, t2_int:UInt32;N, t2_name:Utf8;N]", | ||
" Projection: CAST(t2.t2_id AS Int64) + Int64(1) AS CAST(t2_id AS Int64) + Int64(1), t2.t2_int, t2.t2_name [CAST(t2_id AS Int64) + Int64(1):Int64;N, t2_int:UInt32;N, t2_name:Utf8;N]", | ||
" Filter: t2.t2_int > UInt32(0) [t2_id:UInt32;N, t2_name:Utf8;N, t2_int:UInt32;N]", | ||
" TableScan: t2 projection=[t2_id, t2_name, t2_int] [t2_id:UInt32;N, t2_name:Utf8;N, t2_int:UInt32;N]", | ||
]; | ||
|
||
let formatted = plan.display_indent_schema().to_string(); | ||
let actual: Vec<&str> = formatted.trim().lines().collect(); | ||
assert_eq!( | ||
expected, actual, | ||
"\n\nexpected:\n\n{expected:#?}\nactual:\n\n{actual:#?}\n\n" | ||
); | ||
|
||
let expected = vec![ | ||
"+-------+---------+--------+", | ||
"| t1_id | t1_name | t1_int |", | ||
"+-------+---------+--------+", | ||
"| 11 | a | 1 |", | ||
"| 33 | c | 3 |", | ||
"+-------+---------+--------+", | ||
]; | ||
|
||
let results = execute_to_batches(&ctx, sql).await; | ||
assert_batches_sorted_eq!(expected, &results); | ||
|
||
Ok(()) | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -94,12 +94,13 @@ where o_orderstatus in ( | |
let dataframe = ctx.sql(sql).await.unwrap(); | ||
let plan = dataframe.into_optimized_plan().unwrap(); | ||
let actual = format!("{}", plan.display_indent()); | ||
let expected = r#"Projection: orders.o_orderkey | ||
LeftSemi Join: orders.o_orderstatus = __correlated_sq_1.l_linestatus, orders.o_orderkey = __correlated_sq_1.l_orderkey | ||
TableScan: orders projection=[o_orderkey, o_orderstatus] | ||
SubqueryAlias: __correlated_sq_1 | ||
Projection: lineitem.l_linestatus AS l_linestatus, lineitem.l_orderkey AS l_orderkey | ||
TableScan: lineitem projection=[l_orderkey, l_linestatus]"#; | ||
|
||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This change is just whitespace, right? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Not only before: SubqueryAlias: __correlated_sq_1
Projection: lineitem.l_linestatus AS l_linestatus, lineitem.l_orderkey AS l_orderkey after: \n SubqueryAlias: __correlated_sq_1\
\n Projection: lineitem.l_linestatus AS l_linestatus, lineitem.l_orderkey\ The first expression of the projection is the right side of |
||
let expected = "Projection: orders.o_orderkey\ | ||
\n LeftSemi Join: orders.o_orderstatus = __correlated_sq_1.l_linestatus, orders.o_orderkey = __correlated_sq_1.l_orderkey\ | ||
\n TableScan: orders projection=[o_orderkey, o_orderstatus]\ | ||
\n SubqueryAlias: __correlated_sq_1\ | ||
\n Projection: lineitem.l_linestatus AS l_linestatus, lineitem.l_orderkey\ | ||
\n TableScan: lineitem projection=[l_orderkey, l_linestatus]"; | ||
assert_eq!(actual, expected); | ||
|
||
// assert data | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
👍 the plan looks good