-
Notifications
You must be signed in to change notification settings - Fork 1.3k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Fix and improve CommonSubexprEliminate
rule
#10396
Conversation
… always re-find the correct expression during re-write. (apache#9871)" This reverts commit cd7a00b.
…ds, better JumpMark handling, better variable names, code cleanup, some new todos
cc @alamb, @erratic-pattern and @MohamedAbdeen21 as this PR is related to your recent comments/PRs. |
Possibly related #10333 |
match expr_set.get(&id) { | ||
Some((expr, _, _, symbol)) => { | ||
// todo: check `nullable` | ||
agg_exprs.push(expr.clone().alias(symbol.as_str())); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this clone is eliminated in the 3rd commit
// todo: check `nullable` | ||
let field = Field::new(&id, data_type.clone(), true); | ||
fields_set.insert(field.name().to_owned()); | ||
project_exprs.push(expr.clone().alias(symbol.as_str())); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this clone is also eliminated
|
||
self.expr_set | ||
.entry(curr_expr_identifier) | ||
.or_insert_with(|| (expr.clone(), 0, data_type, alias_symbol)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this clone is also eliminated
This seems to be a major conflict with #10333
I agree that an identifier shouldn't belong to 2 different expressions, but why does it have to represent a subtree? The expr IS the subtree itself. If we use an identifier like
AFAIK, we only need the identifier to be unique (no collision) for correctness, I don't see why we require the other traits. Edit: Just took a look at the tests, we are basically trying to do the same thing, although your PR is probably more efficient. We only differ on the subtree part I mentioned above. I tried to do |
I will review this one carefully tomorrow morning. |
In the first traversal we need to count how many times we encountered with an Edit:
The issue is not about the aliases we assign to the extracted common expressions, it is about the key of the map where we store the counts. Your PR can be a good improvement to the aliases after this PR, but we need need to fix the key of map first. |
Ok yeah, I see what you mean. But the example you mention with a + b. Doesn't that go away if we fix the side note you mentioned?
|
There are multiple questions here and I don't have the answers for.
I think the best we can do now is to revert #9871 and return to the old chained string representation. (This PR improves the identifier readability a bit by adding |
Although I'd like to find answers to these questions before giving more opinions, I don't mind merging this for now. |
} else { | ||
Ok(Transformed::no(expr)) | ||
} | ||
let (up_index, expr_id) = &self.id_array[self.down_index]; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Reviewers might wonder why the code of CommonSubexprRewriter
after the #9871 revert doesn't resemble to the code before that PR: https://github.com/apache/datafusion/pull/9871/files#diff-351499880963d6a383c92e156e75019cd9ce33107724a9635853d7d4cd1898d0L754-L809?
And why the 2 halting conditions (https://github.com/apache/datafusion/pull/9871/files#diff-351499880963d6a383c92e156e75019cd9ce33107724a9635853d7d4cd1898d0L757-L761) are missing from the new code. This is because it turned out that:
- the
self.curr_index >= self.id_array.len()
bound check is poinless as we already indexed the array withself.curr_index
: https://github.com/apache/datafusion/pull/9871/files#diff-351499880963d6a383c92e156e75019cd9ce33107724a9635853d7d4cd1898d0L754.
(Note:curr_index
is calleddown_index
in the new PR.) self.max_series_number > *series_number
is also pointless (and actually there is no point in storingmax_series_number
in the rewriter at all) as*series_number
(calledup_index
in the new code, that is basically the postorder index of the node) can never be smaller than the last replaced expression's postorder index (max_series_number
). This is because in the rewriter we are doing a preorder traversal and a smaller postorder index could appear only in a replaced expression's subtree or in a subtree of a previous child of the node's ancestors.
(BTW, I verified this with adding apanic!
to the halting condition and running all tests.)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thank you @peter-toth -- I went over this PR carefully and it looks like a significant improvement to me. In my opinion as long as it doesn't regress performance (I am running benchmarks now) I think we should merge it in as it adds additional test coverage
I also ran our test suite in InfluxDB IOx with this change and all the tests passed .
Doesn't seem to fix reported bug 🤔
I also filed #10413 to track the bug you found (🦅 👁️ ). However, this PR doesn't seem to fix it yet 🤔 . I pushed a test to show this and also tried it manually:
Finished `dev` profile [unoptimized + debuginfo] target(s) in 35.29s
Running `target/debug/datafusion-cli`
DataFusion CLI v37.1.0
> select a + b from (select 1 as a, 2 as b, 1 as "a + b");
+-------+
| a + b |
+-------+
| 1 |
+-------+
1 row(s) fetched.
Elapsed 0.018 seconds.
{}
vs #{}
- I believe @MohamedAbdeen21 used
#{expr}
in make common expression alias human-readable #10333 to follow what is done by DuckDB -- perhaps we could do so too in this PR (I also think#{}
is slightly easier to notice visually than{}
)
EDIT -- now I see @peter-toth 's #10396 (comment) and so maybe we should postpone the identifier update until a follow on PR
/// (a `String`) as `Identifier`. | ||
/// Note that the current implementation contains: | ||
/// - the `Display` of an expression (a `String`) and | ||
/// - the identifiers of the childrens of the expression |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do we need to form such complicated identifiers -- what if we simply use #{}
like in #10333 as suggested by @MohamedAbdeen21 in #10396 (comment)?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I added a comment here: #10396 (comment) that identifiers as keys of the ExprStats
map is a must have but identifiers as aliases of exracted common expressions is not.
/// - The number of occurrences and | ||
/// - The DataType | ||
/// of an expression. | ||
type ExprStats = HashMap<Identifier, (usize, DataType)>; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nit the code might be easier to follow if this a proper struct and move the manipulation functions (like to_arrays
for example) into methods
Maybe as a follow on PR
/// | ||
/// `affected_id` is updated with any sub expressions that were replaced | ||
fn expr_identifier(expr: &Expr, sub_expr_identifier: Identifier) -> Identifier { | ||
format!("{{{expr}{sub_expr_identifier}}}") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
see comments about identifiers elsewhere
// Alias this `Column` expr to it original "expr name", | ||
// `projection_push_down` optimizer use "expr name" to eliminate useless | ||
// projections. | ||
// TODO: do we really need to alias here? |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
FWIW I removed the alias and several tests failed:
$ cargo test --test sqllogictests
Compiling datafusion-optimizer v37.1.0 (/Users/andrewlamb/Software/datafusion2/datafusion/optimizer)
warning: unused variable: `expr_name`
--> datafusion/optimizer/src/common_subexpr_eliminate.rs:802:17
|
802 | let expr_name = expr.display_name()?;
| ^^^^^^^^^ help: if this is intentional, prefix it with an underscore: `_expr_name`
|
= note: `#[warn(unused_variables)]` on by default
Compiling datafusion v37.1.0 (/Users/andrewlamb/Software/datafusion2/datafusion/core)
warning: `datafusion-optimizer` (lib) generated 1 warning
...
Running "map.slt"
External error: query failed: DataFusion error: Optimizer rule 'common_sub_expression_eliminate' failed
caused by
Schema error: No field named "log(unsigned_integers.b)". Valid fields are a, "log({CAST(unsigned_integers.b AS Float32)|{unsigned_integers.b}})", "log(Int64(10),unsigned_integers.b)".
[SQL] select log(a, 64) a, log(b), log(10, b) from unsigned_integers;
at test_files/scalar.slt:592
External error: query failed: DataFusion error: Optimizer rule 'common_sub_expression_eliminate' failed
caused by
Schema error: No field named "aggregate_test_100.c2 % Int64(2) = Int64(0)". Valid fields are "{CAST(aggregate_test_100.c2 AS Int64) % Int64(2) = Int64(0)|{Int64(0)}|{CAST(aggregate_test_100.c2 AS Int64) % Int64(2)|{Int64(2)}|{CAST(aggregate_test_100.c2 AS Int64)|{aggregate_test_100.c2}}}}", "FIRST_VALUE(aggregate_test_100.c2) ORDER BY [CAST(aggregate_test_100.c2 AS Int64) % Int64(2) = Int64(0) ASC NULLS LAST, aggregate_test_100.c3 DESC NULLS FIRST]", "FIRST_VALUE(aggregate_test_100.c3 - Int64(100)) ORDER BY [CAST(aggregate_test_100.c2 AS Int64) % Int64(2) = Int64(0) ASC NULLS LAST, aggregate_test_100.c3 DESC NULLS FIRST]".
[SQL] SELECT DISTINCT ON (c2 % 2 = 0) c2, c3 - 100 FROM aggregate_test_100 ORDER BY c2 % 2 = 0, c3 DESC;
at test_files/distinct_on.slt:116
External error: query failed: DataFusion error: Optimizer rule 'common_sub_expression_eliminate' failed
caused by
Schema error: No field named "acos(round(Float64(1) / doubles.f64))". Valid fields are doubles.f64, i64_1, "acos({round(Float64(1) / doubles.f64)|{Float64(1) / doubles.f64|{doubles.f64}|{Float64(1)}}})".
[SQL] select f64, round(1.0 / f64) as i64_1, acos(round(1.0 / f64)) from doubles;
at test_files/expr.slt:2272
External error: query result mismatch:
[SQL] explain select a/2, a/2 + 1 from t
[Diff] (-expected|+actual)
- logical_plan
- 01)Projection: {t.a / Int64(2)|{Int64(2)}|{t.a}} AS t.a / Int64(2), {t.a / Int64(2)|{Int64(2)}|{t.a}} AS t.a / Int64(2) + Int64(1)
- 02)--Projection: t.a / Int64(2) AS {t.a / Int64(2)|{Int64(2)}|{t.a}}
- 03)----TableScan: t projection=[a]
at test_files/subquery.slt:1071
External error: query result mismatch:
[SQL] EXPLAIN SELECT x/2, x/2+1 FROM t;
[Diff] (-expected|+actual)
- logical_plan
- 01)Projection: {t.x / Int64(2)|{Int64(2)}|{t.x}} AS t.x / Int64(2), {t.x / Int64(2)|{Int64(2)}|{t.x}} AS t.x / Int64(2) + Int64(1)
- 02)--Projection: t.x / Int64(2) AS {t.x / Int64(2)|{Int64(2)}|{t.x}}
- 03)----TableScan: t projection=[x]
- physical_plan
- 01)ProjectionExec: expr=[{t.x / Int64(2)|{Int64(2)}|{t.x}}@0 as t.x / Int64(2), {t.x / Int64(2)|{Int64(2)}|{t.x}}@0 + 1 as t.x / Int64(2) + Int64(1)]
- 02)--ProjectionExec: expr=[x@0 / 2 as {t.x / Int64(2)|{Int64(2)}|{t.x}}]
- 03)----MemoryExec: partitions=1, partition_sizes=[1]
at test_files/select.slt:1425
External error: query result mismatch:
[SQL] EXPLAIN SELECT c3,
SUM(c9) OVER(ORDER BY c3+c4 DESC, c9 DESC, c2 ASC) as sum1,
SUM(c9) OVER(ORDER BY c3+c4 ASC, c9 ASC ) as sum2
FROM aggregate_test_100
LIMIT 5
[Diff] (-expected|+actual)
logical_plan
01)Projection: aggregate_test_100.c3, SUM(aggregate_test_100.c9) ORDER BY [aggregate_test_100.c3 + aggregate_test_100.c4 DESC NULLS FIRST, aggregate_test_100.c9 DESC NULLS FIRST, aggregate_test_100.c2 ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW AS sum1, SUM(aggregate_test_100.c9) ORDER BY [aggregate_test_100.c3 + aggregate_test_100.c4 ASC NULLS LAST, aggregate_test_100.c9 ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW AS sum2
02)--Limit: skip=0, fetch=5
- 03)----WindowAggr: windowExpr=[[SUM(aggregate_test_100.c9) ORDER BY [{aggregate_test_100.c3 + aggregate_test_100.c4|{aggregate_test_100.c4}|{aggregate_test_100.c3}} AS aggregate_test_100.c3 + aggregate_test_100.c4 ASC NULLS LAST, aggregate_test_100.c9 ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW AS SUM(aggregate_test_100.c9) ORDER BY [aggregate_test_100.c3 + aggregate_test_100.c4 ASC NULLS LAST, aggregate_test_100.c9 ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW]]
+ 03)----WindowAggr: windowExpr=[[SUM(aggregate_test_100.c9) ORDER BY [{aggregate_test_100.c3 + aggregate_test_100.c4|{aggregate_test_100.c4}|{aggregate_test_100.c3}} ASC NULLS LAST, aggregate_test_100.c9 ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW AS SUM(aggregate_test_100.c9) ORDER BY [aggregate_test_100.c3 + aggregate_test_100.c4 ASC NULLS LAST, aggregate_test_100.c9 ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW]]
04)------Projection: {aggregate_test_100.c3 + aggregate_test_100.c4|{aggregate_test_100.c4}|{aggregate_test_100.c3}}, aggregate_test_100.c3, aggregate_test_100.c9, SUM(aggregate_test_100.c9) ORDER BY [aggregate_test_100.c3 + aggregate_test_100.c4 DESC NULLS FIRST, aggregate_test_100.c9 DESC NULLS FIRST, aggregate_test_100.c2 ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW
- 05)--------WindowAggr: windowExpr=[[SUM(aggregate_test_100.c9) ORDER BY [{aggregate_test_100.c3 + aggregate_test_100.c4|{aggregate_test_100.c4}|{aggregate_test_100.c3}} AS aggregate_test_100.c3 + aggregate_test_100.c4 DESC NULLS FIRST, aggregate_test_100.c9 DESC NULLS FIRST, aggregate_test_100.c2 ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW AS SUM(aggregate_test_100.c9) ORDER BY [aggregate_test_100.c3 + aggregate_test_100.c4 DESC NULLS FIRST, aggregate_test_100.c9 DESC NULLS FIRST, aggregate_test_100.c2 ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW]]
+ 05)--------WindowAggr: windowExpr=[[SUM(aggregate_test_100.c9) ORDER BY [{aggregate_test_100.c3 + aggregate_test_100.c4|{aggregate_test_100.c4}|{aggregate_test_100.c3}} DESC NULLS FIRST, aggregate_test_100.c9 DESC NULLS FIRST, aggregate_test_100.c2 ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW AS SUM(aggregate_test_100.c9) ORDER BY [aggregate_test_100.c3 + aggregate_test_100.c4 DESC NULLS FIRST, aggregate_test_100.c9 DESC NULLS FIRST, aggregate_test_100.c2 ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW]]
06)----------Projection: aggregate_test_100.c3 + aggregate_test_100.c4 AS {aggregate_test_100.c3 + aggregate_test_100.c4|{aggregate_test_100.c4}|{aggregate_test_100.c3}}, aggregate_test_100.c2, aggregate_test_100.c3, aggregate_test_100.c9
07)------------TableScan: aggregate_test_100 projection=[c2, c3, c4, c9]
physical_plan
01)ProjectionExec: expr=[c3@1 as c3, SUM(aggregate_test_100.c9) ORDER BY [aggregate_test_100.c3 + aggregate_test_100.c4 DESC NULLS FIRST, aggregate_test_100.c9 DESC NULLS FIRST, aggregate_test_100.c2 ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW@3 as sum1, SUM(aggregate_test_100.c9) ORDER BY [aggregate_test_100.c3 + aggregate_test_100.c4 ASC NULLS LAST, aggregate_test_100.c9 ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW@4 as sum2]
02)--GlobalLimitExec: skip=0, fetch=5
03)----WindowAggExec: wdw=[SUM(aggregate_test_100.c9) ORDER BY [aggregate_test_100.c3 + aggregate_test_100.c4 ASC NULLS LAST, aggregate_test_100.c9 ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW: Ok(Field { name: "SUM(aggregate_test_100.c9) ORDER BY [aggregate_test_100.c3 + aggregate_test_100.c4 ASC NULLS LAST, aggregate_test_100.c9 ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW", data_type: UInt64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Range, start_bound: CurrentRow, end_bound: Following(Int16(NULL)), is_causal: false }]
04)------ProjectionExec: expr=[{aggregate_test_100.c3 + aggregate_test_100.c4|{aggregate_test_100.c4}|{aggregate_test_100.c3}}@0 as {aggregate_test_100.c3 + aggregate_test_100.c4|{aggregate_test_100.c4}|{aggregate_test_100.c3}}, c3@2 as c3, c9@3 as c9, SUM(aggregate_test_100.c9) ORDER BY [aggregate_test_100.c3 + aggregate_test_100.c4 DESC NULLS FIRST, aggregate_test_100.c9 DESC NULLS FIRST, aggregate_test_100.c2 ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW@4 as SUM(aggregate_test_100.c9) ORDER BY [aggregate_test_100.c3 + aggregate_test_100.c4 DESC NULLS FIRST, aggregate_test_100.c9 DESC NULLS FIRST, aggregate_test_100.c2 ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW]
05)--------BoundedWindowAggExec: wdw=[SUM(aggregate_test_100.c9) ORDER BY [aggregate_test_100.c3 + aggregate_test_100.c4 DESC NULLS FIRST, aggregate_test_100.c9 DESC NULLS FIRST, aggregate_test_100.c2 ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW: Ok(Field { name: "SUM(aggregate_test_100.c9) ORDER BY [aggregate_test_100.c3 + aggregate_test_100.c4 DESC NULLS FIRST, aggregate_test_100.c9 DESC NULLS FIRST, aggregate_test_100.c2 ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW", data_type: UInt64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Range, start_bound: Preceding(Int16(NULL)), end_bound: CurrentRow, is_causal: false }], mode=[Sorted]
06)----------SortPreservingMergeExec: [{aggregate_test_100.c3 + aggregate_test_100.c4|{aggregate_test_100.c4}|{aggregate_test_100.c3}}@0 DESC,c9@3 DESC,c2@1 ASC NULLS LAST]
07)------------SortExec: expr=[{aggregate_test_100.c3 + aggregate_test_100.c4|{aggregate_test_100.c4}|{aggregate_test_100.c3}}@0 DESC,c9@3 DESC,c2@1 ASC NULLS LAST], preserve_partitioning=[true]
08)--------------ProjectionExec: expr=[c3@1 + c4@2 as {aggregate_test_100.c3 + aggregate_test_100.c4|{aggregate_test_100.c4}|{aggregate_test_100.c3}}, c2@0 as c2, c3@1 as c3, c9@3 as c9]
09)----------------RepartitionExec: partitioning=RoundRobinBatch(2), input_partitions=1
10)------------------CsvExec: file_groups={1 group: [[WORKSPACE_ROOT/testing/data/csv/aggregate_test_100.csv]]}, projection=[c2, c3, c4, c9], has_header=true
at test_files/window.slt:1680
External error: query failed: DataFusion error: Optimizer rule 'common_sub_expression_eliminate' failed
caused by
Schema error: No field named "hits.ClientIP - Int64(1)". Valid fields are hits."ClientIP", "{CAST(hits.ClientIP AS Int64)|{hits.ClientIP}} - Int64(1)", "{CAST(hits.ClientIP AS Int64)|{hits.ClientIP}} - Int64(2)", "{CAST(hits.ClientIP AS Int64)|{hits.ClientIP}} - Int64(3)", "COUNT(*)".
[SQL] SELECT "ClientIP", "ClientIP" - 1, "ClientIP" - 2, "ClientIP" - 3, COUNT(*) AS c FROM hits GROUP BY "ClientIP", "ClientIP" - 1, "ClientIP" - 2, "ClientIP" - 3 ORDER BY c DESC LIMIT 10;
at test_files/clickbench.slt:241
External error: query failed: DataFusion error: Optimizer rule 'common_sub_expression_eliminate' failed
caused by
Schema error: No field named "value_dict.x_dict % Int64(2)". Valid fields are "{CAST(value_dict.x_dict AS Int64)|{value_dict.x_dict}} % Int64(2)", "SUM(value_dict.x_dict)".
[SQL] select sum(x_dict) from value_dict group by x_dict % 2 order by sum(x_dict);
at test_files/aggregate.slt:2696
External error: query result mismatch:
[SQL] EXPLAIN SELECT SUM(DISTINCT CAST(x AS DOUBLE)), MAX(DISTINCT CAST(x AS DOUBLE)) FROM t1 GROUP BY y;
[Diff] (-expected|+actual)
logical_plan
- 01)Projection: SUM(alias1) AS SUM(DISTINCT t1.x), MAX(alias1) AS MAX(DISTINCT t1.x)
- 02)--Aggregate: groupBy=[[t1.y]], aggr=[[SUM(alias1), MAX(alias1)]]
- 03)----Aggregate: groupBy=[[t1.y, {CAST(t1.x AS Float64)|{t1.x}} AS t1.x AS alias1]], aggr=[[]]
- 04)------Projection: CAST(t1.x AS Float64) AS {CAST(t1.x AS Float64)|{t1.x}}, t1.y
- 05)--------TableScan: t1 projection=[x, y]
+ 01)Projection: SUM(DISTINCT t1.x), MAX(DISTINCT t1.x)
+ 02)--Aggregate: groupBy=[[t1.y]], aggr=[[SUM(DISTINCT {CAST(t1.x AS Float64)|{t1.x}}) AS SUM(DISTINCT t1.x), MAX(DISTINCT {CAST(t1.x AS Float64)|{t1.x}}) AS MAX(DISTINCT t1.x)]]
+ 03)----Projection: CAST(t1.x AS Float64) AS {CAST(t1.x AS Float64)|{t1.x}}, t1.y
+ 04)------TableScan: t1 projection=[x, y]
physical_plan
- 01)ProjectionExec: expr=[SUM(alias1)@1 as SUM(DISTINCT t1.x), MAX(alias1)@2 as MAX(DISTINCT t1.x)]
- 02)--AggregateExec: mode=FinalPartitioned, gby=[y@0 as y], aggr=[SUM(alias1), MAX(alias1)]
+ 01)ProjectionExec: expr=[SUM(DISTINCT t1.x)@1 as SUM(DISTINCT t1.x), MAX(DISTINCT t1.x)@2 as MAX(DISTINCT t1.x)]
+ 02)--AggregateExec: mode=FinalPartitioned, gby=[y@0 as y], aggr=[SUM(DISTINCT t1.x), MAX(DISTINCT t1.x)]
03)----CoalesceBatchesExec: target_batch_size=2
04)------RepartitionExec: partitioning=Hash([y@0], 8), input_partitions=8
- 05)--------AggregateExec: mode=Partial, gby=[y@0 as y], aggr=[SUM(alias1), MAX(alias1)]
- 06)----------AggregateExec: mode=FinalPartitioned, gby=[y@0 as y, alias1@1 as alias1], aggr=[]
- 07)------------CoalesceBatchesExec: target_batch_size=2
- 08)--------------RepartitionExec: partitioning=Hash([y@0, alias1@1], 8), input_partitions=8
- 09)----------------RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1
- 10)------------------AggregateExec: mode=Partial, gby=[y@1 as y, {CAST(t1.x AS Float64)|{t1.x}}@0 as alias1], aggr=[]
- 11)--------------------ProjectionExec: expr=[CAST(x@0 AS Float64) as {CAST(t1.x AS Float64)|{t1.x}}, y@1 as y]
- 12)----------------------MemoryExec: partitions=1, partition_sizes=[1]
+ 05)--------RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1
+ 06)----------AggregateExec: mode=Partial, gby=[y@1 as y], aggr=[SUM(DISTINCT t1.x), MAX(DISTINCT t1.x)]
+ 07)------------ProjectionExec: expr=[CAST(x@0 AS Float64) as {CAST(t1.x AS Float64)|{t1.x}}, y@1 as y]
+ 08)--------------MemoryExec: partitions=1, partition_sizes=[1]
at test_files/group_by.slt:4184
Error: Execution("9 failures")
error: test failed, to rerun pass `-p datafusion-sqllogictest --test sqllogictests`
Caused by:
process didn't exit successfully: `/Users/andrewlamb/Software/datafusion2/target/debug/deps/sqllogictests-ce3a36cfeab74789` (exit status: 1)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, I see that is needed but it looks weird to me. I mean if we consider the example:
select a + b as x, a + b as y
from ...
that should be eliminated to:
select common as x, common as y
from (
select a + b as common
from ...
)
I.e. we need to add aliases the extracted common expressions, but with this alias here we alias common
and this is what the rule does:
select common as "a + b" as x, common as "a + b" as y
from (
select a + b as common
from ...
)
I wanted to understand why exactly we need to alias at both places.
BTW I left a few other TODOs in the code but none of them means that code should be worse than it was before #9871. Maybe the longer identifiers due to the extra {
, }
and |
can make it a bit slower. I left the TODOs there to myself and others to remember where is some room for improvement.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I have a stashed PR that addresses this issue.
I'm not 100% sure about why the second alias is needed, but there was a comment saying that the second alias is used by another rule, so removing it inside CSE seems to break other rules (or CSE itself sometimes (??) idk, I'm still looking into it).
Anyway, the solution I have at the moment is calling .unalias()
on the expr when applying a new alias through expr.alias()
. That way, when the true alias x
is later (I assume after all the rules finished) given to the expr common as a + b
, it will become common as x
.
It passes the tests, but I really want to understand why it works before pushing the PR + I hope maybe I can find a better way by removing the second alias added by CSE all together, even if it means changing other rules.
@@ -789,6 +861,74 @@ mod test { | |||
assert_eq!(expected, formatted_plan); | |||
} | |||
|
|||
#[test] | |||
fn id_array_visitor() -> Result<()> { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is really nice to see more of how this code works 👍
@@ -1613,6 +1613,14 @@ select count(1) from v; | |||
---- | |||
1 | |||
|
|||
# Ensure CSE resolves columns correctly |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I added this test but it still resolves the expr as 1 (not 3) 🤔
No, this PR doesn't fix that issue at all. That issue is a resolution issue (#10413) and has nothing to do with CSE. The example I gave in the description doesn't contain any subexpressions to eliminate and The reason I mentioned the resolution issue is because of that issue I couldn't add a test case to this PR which would illustrate the issue of colflicting identifiers in Once #10413 is solved I can add a test case here.
I fully aggree that the current alias is very hard to read and this is because the identifiers are used for aliases as well.
Currently for both 1. and 2. we use the identifier and I'm sure that in 1. we have touse the identifier. In 2. I'm not sure and @MohamedAbdeen21's PR can be a good follow-up improvement. |
Sorry -- I missed that -- updated #10413 to match |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think this PR is an improvement:
My performance benchmarks show it appears to be slightly slower than main
logical_plan_tpcds_all 1.00 157.7±1.80ms ? ?/sec 1.03 162.6±2.85ms ? ?/sec
logical_plan_tpch_all 1.00 16.8±0.19ms ? ?/sec 1.02 17.2±0.30ms ? ?/sec
However I expect we'll make this up on subsequent PRs as we fix this rule to avoid the copies
Details
++ critcmp main refactor-commonsubexpreliminate
group main refactor-commonsubexpreliminate
----- ---- -------------------------------
logical_aggregate_with_join 1.00 1198.0±10.58µs ? ?/sec 1.03 1231.8±65.81µs ? ?/sec
logical_plan_tpcds_all 1.00 157.7±1.80ms ? ?/sec 1.03 162.6±2.85ms ? ?/sec
logical_plan_tpch_all 1.00 16.8±0.19ms ? ?/sec 1.02 17.2±0.30ms ? ?/sec
logical_select_all_from_1000 1.05 18.8±0.10ms ? ?/sec 1.00 18.0±0.10ms ? ?/sec
logical_select_one_from_700 1.01 826.5±7.83µs ? ?/sec 1.00 817.5±21.62µs ? ?/sec
logical_trivial_join_high_numbered_columns 1.00 763.6±11.20µs ? ?/sec 1.00 761.5±10.77µs ? ?/sec
logical_trivial_join_low_numbered_columns 1.00 745.5±8.27µs ? ?/sec 1.00 745.3±7.96µs ? ?/sec
physical_plan_tpcds_all 1.00 1351.3±16.05ms ? ?/sec 1.00 1345.2±13.72ms ? ?/sec
physical_plan_tpch_all 1.00 91.3±1.75ms ? ?/sec 1.04 94.5±1.87ms ? ?/sec
physical_plan_tpch_q1 1.00 5.0±0.06ms ? ?/sec 1.05 5.3±0.08ms ? ?/sec
physical_plan_tpch_q10 1.02 4.5±0.06ms ? ?/sec 1.00 4.4±0.10ms ? ?/sec
physical_plan_tpch_q11 1.03 4.0±0.08ms ? ?/sec 1.00 3.9±0.09ms ? ?/sec
physical_plan_tpch_q12 1.07 3.2±0.05ms ? ?/sec 1.00 3.0±0.08ms ? ?/sec
physical_plan_tpch_q13 1.00 2.2±0.04ms ? ?/sec 1.00 2.2±0.05ms ? ?/sec
physical_plan_tpch_q14 1.04 2.8±0.05ms ? ?/sec 1.00 2.7±0.05ms ? ?/sec
physical_plan_tpch_q16 1.00 3.8±0.05ms ? ?/sec 1.03 3.9±0.05ms ? ?/sec
physical_plan_tpch_q17 1.00 3.6±0.12ms ? ?/sec 1.01 3.6±0.05ms ? ?/sec
physical_plan_tpch_q18 1.00 3.9±0.07ms ? ?/sec 1.04 4.0±0.07ms ? ?/sec
physical_plan_tpch_q19 1.01 6.2±0.09ms ? ?/sec 1.00 6.2±0.09ms ? ?/sec
physical_plan_tpch_q2 1.00 7.8±0.09ms ? ?/sec 1.01 7.8±0.09ms ? ?/sec
physical_plan_tpch_q20 1.00 4.6±0.09ms ? ?/sec 1.06 4.8±0.08ms ? ?/sec
physical_plan_tpch_q21 1.00 6.2±0.11ms ? ?/sec 1.00 6.2±0.12ms ? ?/sec
physical_plan_tpch_q22 1.01 3.4±0.08ms ? ?/sec 1.00 3.3±0.08ms ? ?/sec
physical_plan_tpch_q3 1.00 3.1±0.05ms ? ?/sec 1.06 3.3±0.08ms ? ?/sec
physical_plan_tpch_q4 1.00 2.3±0.03ms ? ?/sec 1.11 2.5±0.04ms ? ?/sec
physical_plan_tpch_q5 1.00 4.7±0.15ms ? ?/sec 1.00 4.7±0.04ms ? ?/sec
physical_plan_tpch_q6 1.03 1629.0±90.38µs ? ?/sec 1.00 1578.4±32.11µs ? ?/sec
physical_plan_tpch_q7 1.00 5.9±0.08ms ? ?/sec 1.00 5.9±0.06ms ? ?/sec
physical_plan_tpch_q8 1.00 7.5±0.09ms ? ?/sec 1.00 7.5±0.08ms ? ?/sec
physical_plan_tpch_q9 1.02 5.7±0.08ms ? ?/sec 1.00 5.6±0.06ms ? ?/sec
physical_select_all_from_1000 1.04 61.4±0.28ms ? ?/sec 1.00 59.1±0.37ms ? ?/sec
physical_select_one_from_700 1.07 3.8±0.03ms ? ?/sec 1.00 3.6±0.04ms ? ?/sec
Thanks for the benchmarks @alamb! Maybe the longer identifiers can explain that gap. |
@alamb, IMO if this PR can be merged then the next steps should be:
I'm happy to take 4 as I already worked on it a bit, but unfortunately I have very little time to work on this project lately so I can't take 1. and 2. |
I'll rebase my PR this weekend. I do have other changes in mind regarding plan readability. If 1 is still available by the time I'm done, I'll be happy to take a look at it. |
Agree -- this is now tracked as its own issue and we can deal with it separately
I will do this
Sounds like @MohamedAbdeen21 is going to do this maybe this weekend
👍
That would be amazing -- thank you @peter-toth -- I filed #10426 to track |
All right, I think we have our next steps outlined and tracked with tickets. 🚀 ! |
Thanks again @peter-toth and @MohamedAbdeen21 |
Thanks for the review! |
* Revert "fix(9870): common expression elimination optimization, should always re-find the correct expression during re-write. (apache#9871)" This reverts commit cd7a00b. * expr id should always contain the full expr structure, cleaner expr ids, better JumpMark handling, better variable names, code cleanup, some new todos * move `Expr` from `expr_set`s to `affected_id`s * better naming, docs fixes * introduce `CommonExprs` type alias, minor todo fix * add test --------- Co-authored-by: Andrew Lamb <[email protected]>
Which issue does this PR close?
Part of #9873.
Rationale for this change
This PR started as part of #9873 to reduce number of
Expr
clones but after some investigation it shifted to be a fix for the rule's correctness issues.The current
CommonSubexprEliminate
was refactored in fix(9870): common expression elimination optimization, should always re-find the correct expression during re-write. #9871 to remove theIdArray
cache and simplify the identifier of expresions. Unfortunately that change doesn't seem to be correct. The source of the issue is that an idenfifier needs to represent an expression subtreee and the newly chosen "stringified expr" as identifier doesn't seem to fulfill that purpose. E.g. an identifier shouldn't belong to 2 different expressions:Sidenote:
Actually I wanted to show that correctness issue of the current
CommonSubexprEliminate
in a test, but when I wrote a test with colliding column names I run into a different issue, that DataFusion resolves thecol("a") + col("b")
expression as if it wascol("a + b")
if ana + b
field exists in the schema.This is a different issue (not related to
CommonSubexprEliminate
at all) and can be easily reproduced:So in this the first commit of PR I revert fix(9870): common expression elimination optimization, should always re-find the correct expression during re-write. #9871.
Then I investigated what is the actual purpose of
Identifier
s, why don't we use a simpleHashMap<Expr, (usize, DataType, Identifier)>
asExprSet
? It is clear that we need to generate a unique alias for the extracted common expressions, but why is the key of the map is anIdentifier
and not&Expr
orExpr
itself. And actually it turned out that the reasons are already explained in the comments.If we used
Expr
as the key of the map computing thehash()
of the keys would require traversing on the wholeExpr
, which can be very costly asExpr
s contain indirections to subexpressions asBox<Expr>
orVec<Expr>
.Using special identifiers to represent
Expr
trees and caching those identifiers by the preorder visit indexes inIdArray
should significantly speed up the second top-down traversal that does the actual expression rewrite.Sidenote: the current long
String
identifiers are also not a good choice. We need to revisit this in a follow-up PR and choose something like(usize, &Expr)
tuple as identifiers. The first element of a tuple is a pre-calculatedhash()
of an expression tree, that is built-up during the first bottom-up traversal. And the referece to expression is there to implement theeq()
.The second commit is a refactor and fix of the algorithm as reverting Stop copying
Expr
s and LogicalPlans so much during Common Subexpression Elimination #9873 caused the Common expression elimination should also re-find the correct expression, during re-write. #9870 issue to resurface. This is a major refactor but I think the code ofExprIdentifierVisitor
andCommonSubexprRewriter
became much cleaner.The 3rd commit eliminates some
Expr
clones inExprSet
s.The 4th and 5th commit contain only renames and docs fixes. I think
ExprStats
is a better name forExprSet
as the purpose of that data structure is store the counts. Also, IMOCommonExpr
/common_exprs
is a better name foraffected_id
to store the common expressions that got extracted.What changes are included in this PR?
Please see above.
Are these changes tested?
Yes, with existing UTs.
Are there any user-facing changes?
No.