-
Notifications
You must be signed in to change notification settings - Fork 1.3k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Support type coercion for equijoin #4666
Conversation
@liukun4515 this is what we discuss in #4389 (comment). Also cc @andygrove @alamb |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks great to me -- thank you @ygf11
let equi_expr_count = on.len(); | ||
// The preceding part of expr is equi-exprs, | ||
// and the struct of each equi-expr is like `left-expr = right-expr`. | ||
let new_on:Vec<(Expr,Expr)> = expr.iter().take(equi_expr_count).map(|equi_expr| { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think we should error here if expr
does not has at least equi_expr_count
elements left. Otherwise I think take
will silently return fewer than equi_expr_count
elements, which might result in quite hard to track down bugs
https://doc.rust-lang.org/std/iter/trait.Iterator.html#method.take
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Make sense, I added a check -- assert!(expr.len() >= equi_expr_count)
.
assert!(op == &Operator::Eq); | ||
Ok(((**left).clone(), (**right).clone())) | ||
} else { | ||
Err(DataFusionError::Internal(format!( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
👍
LogicalPlan::Join(Join { on, filter, .. }) => on | ||
.iter() | ||
.flat_map(|(l, r)| vec![l.clone(), r.clone()]) | ||
.map(|(l, r)| Expr::eq(l.clone(), r.clone())) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is the fix, right? It then exposes the <l> = <r>
expr to the existing type coercion logic ?
Very nice 👍
let expected = vec![ | ||
"Explain [plan_type:Utf8, plan:Utf8]", | ||
" Projection: t1.t1_id, t1.t1_name, t2.t2_id [t1_id:UInt32;N, t1_name:Utf8;N, t2_id:UInt32;N]", | ||
" Inner Join: CAST(t1.t1_id AS Int64) * Int64(5) = CAST(t2.t2_id AS Int64) Filter: CAST(t1.t1_id AS Int64) * Int64(4) < CAST(t2.t2_id AS Int64) [t1_id:UInt32;N, t1_name:Utf8;N, t2_id:UInt32;N]", |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
👍
let expected = vec![ | ||
"Explain [plan_type:Utf8, plan:Utf8]", | ||
" Projection: t1.t1_id, t1.t1_name, t2.t2_id [t1_id:UInt32;N, t1_name:Utf8;N, t2_id:UInt32;N]", | ||
" Inner Join: CAST(t1.t1_id AS Int64) + Int64(11) = CAST(t2.t2_id AS Int64) [t1_id:UInt32;N, t1_name:Utf8;N, t2_id:UInt32;N]", |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think eventually it would be great to have these casts unwrapped too, like
" Inner Join: CAST(t1.t1_id AS Int64) + Int64(11) = CAST(t2.t2_id AS Int64) [t1_id:UInt32;N, t1_name:Utf8;N, t2_id:UInt32;N]", | |
" Inner Join: t1.t1_id + Int32(11) = t2.t2_id [t1_id:UInt32;N, t1_name:Utf8;N, t2_id:UInt32;N]", |
To avoid the runtime casting
I am not quite sure why https://github.com/apache/arrow-datafusion/blob/master/datafusion/optimizer/src/unwrap_cast_in_comparison.rs is not doing so
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This rule just can apply to the pattern
expr `op` literal
cc @alamb
We can file a new issue to discuss this.
I must point out a problem about overflow for add
operation, for example i32::max
+ i32::max
maybe overflow.
@@ -1448,11 +1448,11 @@ async fn hash_join_with_decimal() -> Result<()> { | |||
let state = ctx.state(); | |||
let plan = state.optimize(&plan)?; | |||
let expected = vec![ | |||
"Explain [plan_type:Utf8, plan:Utf8]", | |||
" Projection: t1.c1, t1.c2, t1.c3, t1.c4, t2.c1, t2.c2, t2.c3, t2.c4 [c1:Date32;N, c2:Date64;N, c3:Decimal128(5, 2);N, c4:Dictionary(Int32, Utf8);N, c1:Date32;N, c2:Date64;N, c3:Decimal128(10, 2);N, c4:Dictionary(Int32, Utf8);N]", | |||
" Right Join: t1.c3 = t2.c3 [c1:Date32;N, c2:Date64;N, c3:Decimal128(5, 2);N, c4:Dictionary(Int32, Utf8);N, c1:Date32;N, c2:Date64;N, c3:Decimal128(10, 2);N, c4:Dictionary(Int32, Utf8);N]", |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
what happened previously with this plan? Would it error at runtime?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
No, it succeeded without type coercion, but I think it is a coincidence.
The reason is for decimal eq
operation, we only check value, but do not check the precision
and scale
are same.
In this test, each c3
of the two matched rows have the same value, the third
row is 789000
and the fourth
is -12312
, then it succeeded.
"+------------+------------+---------+-----+------------+------------+-----------+---------+",
"| c1 | c2 | c3 | c4 | c1 | c2 | c3 | c4 |",
"+------------+------------+---------+-----+------------+------------+-----------+---------+",
"| | | | | | | 100000.00 | abcdefg |",
"| | | | | | 1970-01-04 | 0.00 | qwerty |",
"| | 1970-01-04 | 789.00 | ghi | 1970-01-04 | | 789.00 | |",
"| 1970-01-04 | | -123.12 | jkl | 1970-01-02 | 1970-01-02 | -123.12 | abc |",
"+------------+------------+---------+-----+------------+------------+-----------+---------+",
async fn join_only_with_filter() -> Result<()> { | ||
let ctx = create_join_context("t1_id", "t2_id", false)?; | ||
|
||
let sql = "select t1.t1_id, t1.t1_name, t2.t2_id from t1 inner join t2 on t1.t1_id * 4 < t2.t2_id"; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
after #4562 merged, the plan will be converted to NLJ
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Updated in e73faab
@@ -253,9 +253,12 @@ impl LogicalPlan { | |||
aggr_expr, | |||
.. | |||
}) => group_expr.iter().chain(aggr_expr.iter()).cloned().collect(), | |||
// There are two part of expression for join, equijoin(on) and non-equijoin(filter). |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
👍
@@ -478,20 +479,33 @@ pub fn from_plan( | |||
}) => { | |||
let schema = | |||
build_join_schema(inputs[0].schema(), inputs[1].schema(), join_type)?; | |||
|
|||
let equi_expr_count = on.len(); | |||
// The preceding part of expr is equi-exprs, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
👍
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
thanks @ygf11
I took the liberty of merging this branch with the latest |
Benchmark runs are scheduled for baseline = fddb3d3 and contender = ac2e5d1. ac2e5d1 is a master commit associated with this PR. Results will be available as each benchmark for each run completes. |
Remove this after rebasing on top of commit ac2e5d1 "Support type coercion for equijoin (apache#4666)". It was first released at DF 16.0
Remove this after rebasing on top of commit ac2e5d1 "Support type coercion for equijoin (apache#4666)". It was first released at DF 16.0
Remove this after rebasing on top of commit ac2e5d1 "Support type coercion for equijoin (apache#4666)". It was first released at DF 16.0
Remove this after rebasing on top of commit ac2e5d1 "Support type coercion for equijoin (apache#4666)". It was first released at DF 16.0 ARROW-11838: fix offset buffer in golden file (#60)
Remove this after rebasing on top of commit ac2e5d1 "Support type coercion for equijoin (apache#4666)". It was first released at DF 16.0 ARROW-11838: fix offset buffer in golden file (#60)
Remove this after rebasing on top of commit ac2e5d1 "Support type coercion for equijoin (apache#4666)". It was first released at DF 16.0 ARROW-11838: fix offset buffer in golden file (#60)
Which issue does this PR close?
Closes #2877.
Rationale for this change
See #2877
What changes are included in this PR?
LogicalPlan::expressions()
will combine the two side of equijoin expression to one equality expression, then save the equality expression to vec.type coercion
rule will do type coercion for it.from_plan
method will split each equality expression to two parts, and save them to the new join plan.Are these changes tested?
Yes, tests will cover it.
Are there any user-facing changes?