Skip to content

Commit

Permalink
Update explain plan to show TopK operator (#7826)
Browse files Browse the repository at this point in the history
* Updated sort.rs

solves: #7750

Replaced `SortExec: fetch={fetch}, expr=[{}]` with 'SortExec: TopK(fetch={fetch}), expr=[{}]' in [sort.rs](https://github.com/apache/arrow-datafusion/blob/main/datafusion/physical-plan/src/sorts/sort.rs) file

* fix: ci

---------

Co-authored-by: Pratibhanu Jarngal <[email protected]>
  • Loading branch information
haohuaijin and Night-Amber3301 authored Oct 15, 2023
1 parent 8208ff0 commit 26e43ac
Show file tree
Hide file tree
Showing 13 changed files with 28 additions and 32 deletions.
2 changes: 1 addition & 1 deletion datafusion/core/tests/sql/explain_analyze.rs
Original file line number Diff line number Diff line change
Expand Up @@ -611,7 +611,7 @@ async fn test_physical_plan_display_indent() {
let expected = vec![
"GlobalLimitExec: skip=0, fetch=10",
" SortPreservingMergeExec: [the_min@2 DESC], fetch=10",
" SortExec: fetch=10, expr=[the_min@2 DESC]",
" SortExec: TopK(fetch=10), expr=[the_min@2 DESC]",
" ProjectionExec: expr=[c1@0 as c1, MAX(aggregate_test_100.c12)@1 as MAX(aggregate_test_100.c12), MIN(aggregate_test_100.c12)@2 as the_min]",
" AggregateExec: mode=FinalPartitioned, gby=[c1@0 as c1], aggr=[MAX(aggregate_test_100.c12), MIN(aggregate_test_100.c12)]",
" CoalesceBatchesExec: target_batch_size=4096",
Expand Down
2 changes: 1 addition & 1 deletion datafusion/core/tests/sql/group_by.rs
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@ async fn group_by_limit() -> Result<()> {
let physical_plan = dataframe.create_physical_plan().await?;
let mut expected_physical_plan = r#"
GlobalLimitExec: skip=0, fetch=4
SortExec: fetch=4, expr=[MAX(traces.ts)@1 DESC]
SortExec: TopK(fetch=4), expr=[MAX(traces.ts)@1 DESC]
AggregateExec: mode=Single, gby=[trace_id@0 as trace_id], aggr=[MAX(traces.ts)], lim=[4]
"#.trim().to_string();
let actual_phys_plan =
Expand Down
6 changes: 1 addition & 5 deletions datafusion/physical-plan/src/sorts/sort.rs
Original file line number Diff line number Diff line change
Expand Up @@ -766,11 +766,7 @@ impl DisplayAs for SortExec {
let expr = PhysicalSortExpr::format_list(&self.expr);
match self.fetch {
Some(fetch) => {
write!(
f,
// TODO should this say topk?
"SortExec: fetch={fetch}, expr=[{expr}]",
)
write!(f, "SortExec: TopK(fetch={fetch}), expr=[{expr}]",)
}
None => write!(f, "SortExec: expr=[{expr}]"),
}
Expand Down
10 changes: 5 additions & 5 deletions datafusion/sqllogictest/test_files/aggregate.slt
Original file line number Diff line number Diff line change
Expand Up @@ -2338,7 +2338,7 @@ Limit: skip=0, fetch=4
physical_plan
GlobalLimitExec: skip=0, fetch=4
--SortPreservingMergeExec: [MAX(traces.timestamp)@1 DESC], fetch=4
----SortExec: fetch=4, expr=[MAX(traces.timestamp)@1 DESC]
----SortExec: TopK(fetch=4), expr=[MAX(traces.timestamp)@1 DESC]
------AggregateExec: mode=FinalPartitioned, gby=[trace_id@0 as trace_id], aggr=[MAX(traces.timestamp)]
--------CoalesceBatchesExec: target_batch_size=8192
----------RepartitionExec: partitioning=Hash([trace_id@0], 4), input_partitions=4
Expand Down Expand Up @@ -2393,7 +2393,7 @@ Limit: skip=0, fetch=4
physical_plan
GlobalLimitExec: skip=0, fetch=4
--SortPreservingMergeExec: [MAX(traces.timestamp)@1 DESC], fetch=4
----SortExec: fetch=4, expr=[MAX(traces.timestamp)@1 DESC]
----SortExec: TopK(fetch=4), expr=[MAX(traces.timestamp)@1 DESC]
------AggregateExec: mode=FinalPartitioned, gby=[trace_id@0 as trace_id], aggr=[MAX(traces.timestamp)], lim=[4]
--------CoalesceBatchesExec: target_batch_size=8192
----------RepartitionExec: partitioning=Hash([trace_id@0], 4), input_partitions=4
Expand All @@ -2412,7 +2412,7 @@ Limit: skip=0, fetch=4
physical_plan
GlobalLimitExec: skip=0, fetch=4
--SortPreservingMergeExec: [MIN(traces.timestamp)@1 DESC], fetch=4
----SortExec: fetch=4, expr=[MIN(traces.timestamp)@1 DESC]
----SortExec: TopK(fetch=4), expr=[MIN(traces.timestamp)@1 DESC]
------AggregateExec: mode=FinalPartitioned, gby=[trace_id@0 as trace_id], aggr=[MIN(traces.timestamp)]
--------CoalesceBatchesExec: target_batch_size=8192
----------RepartitionExec: partitioning=Hash([trace_id@0], 4), input_partitions=4
Expand All @@ -2431,7 +2431,7 @@ Limit: skip=0, fetch=4
physical_plan
GlobalLimitExec: skip=0, fetch=4
--SortPreservingMergeExec: [MAX(traces.timestamp)@1 ASC NULLS LAST], fetch=4
----SortExec: fetch=4, expr=[MAX(traces.timestamp)@1 ASC NULLS LAST]
----SortExec: TopK(fetch=4), expr=[MAX(traces.timestamp)@1 ASC NULLS LAST]
------AggregateExec: mode=FinalPartitioned, gby=[trace_id@0 as trace_id], aggr=[MAX(traces.timestamp)]
--------CoalesceBatchesExec: target_batch_size=8192
----------RepartitionExec: partitioning=Hash([trace_id@0], 4), input_partitions=4
Expand All @@ -2450,7 +2450,7 @@ Limit: skip=0, fetch=4
physical_plan
GlobalLimitExec: skip=0, fetch=4
--SortPreservingMergeExec: [trace_id@0 ASC NULLS LAST], fetch=4
----SortExec: fetch=4, expr=[trace_id@0 ASC NULLS LAST]
----SortExec: TopK(fetch=4), expr=[trace_id@0 ASC NULLS LAST]
------AggregateExec: mode=FinalPartitioned, gby=[trace_id@0 as trace_id], aggr=[MAX(traces.timestamp)]
--------CoalesceBatchesExec: target_batch_size=8192
----------RepartitionExec: partitioning=Hash([trace_id@0], 4), input_partitions=4
Expand Down
2 changes: 1 addition & 1 deletion datafusion/sqllogictest/test_files/topk.slt
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ Limit: skip=0, fetch=5
----TableScan: aggregate_test_100 projection=[c1, c2, c3, c4, c5, c6, c7, c8, c9, c10, c11, c12, c13]
physical_plan
GlobalLimitExec: skip=0, fetch=5
--SortExec: fetch=5, expr=[c13@12 DESC]
--SortExec: TopK(fetch=5), expr=[c13@12 DESC]
----CsvExec: file_groups={1 group: [[WORKSPACE_ROOT/testing/data/csv/aggregate_test_100.csv]]}, projection=[c1, c2, c3, c4, c5, c6, c7, c8, c9, c10, c11, c12, c13], has_header=true


Expand Down
2 changes: 1 addition & 1 deletion datafusion/sqllogictest/test_files/tpch/q10.slt.part
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ Limit: skip=0, fetch=10
physical_plan
GlobalLimitExec: skip=0, fetch=10
--SortPreservingMergeExec: [revenue@2 DESC], fetch=10
----SortExec: fetch=10, expr=[revenue@2 DESC]
----SortExec: TopK(fetch=10), expr=[revenue@2 DESC]
------ProjectionExec: expr=[c_custkey@0 as c_custkey, c_name@1 as c_name, SUM(lineitem.l_extendedprice * Int64(1) - lineitem.l_discount)@7 as revenue, c_acctbal@2 as c_acctbal, n_name@4 as n_name, c_address@5 as c_address, c_phone@3 as c_phone, c_comment@6 as c_comment]
--------AggregateExec: mode=FinalPartitioned, gby=[c_custkey@0 as c_custkey, c_name@1 as c_name, c_acctbal@2 as c_acctbal, c_phone@3 as c_phone, n_name@4 as n_name, c_address@5 as c_address, c_comment@6 as c_comment], aggr=[SUM(lineitem.l_extendedprice * Int64(1) - lineitem.l_discount)]
----------CoalesceBatchesExec: target_batch_size=8192
Expand Down
2 changes: 1 addition & 1 deletion datafusion/sqllogictest/test_files/tpch/q11.slt.part
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ Limit: skip=0, fetch=10
physical_plan
GlobalLimitExec: skip=0, fetch=10
--SortPreservingMergeExec: [value@1 DESC], fetch=10
----SortExec: fetch=10, expr=[value@1 DESC]
----SortExec: TopK(fetch=10), expr=[value@1 DESC]
------ProjectionExec: expr=[ps_partkey@0 as ps_partkey, SUM(partsupp.ps_supplycost * partsupp.ps_availqty)@1 as value]
--------NestedLoopJoinExec: join_type=Inner, filter=CAST(SUM(partsupp.ps_supplycost * partsupp.ps_availqty)@0 AS Decimal128(38, 15)) > SUM(partsupp.ps_supplycost * partsupp.ps_availqty) * Float64(0.0001)@1
----------AggregateExec: mode=FinalPartitioned, gby=[ps_partkey@0 as ps_partkey], aggr=[SUM(partsupp.ps_supplycost * partsupp.ps_availqty)]
Expand Down
2 changes: 1 addition & 1 deletion datafusion/sqllogictest/test_files/tpch/q13.slt.part
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ Limit: skip=0, fetch=10
physical_plan
GlobalLimitExec: skip=0, fetch=10
--SortPreservingMergeExec: [custdist@1 DESC,c_count@0 DESC], fetch=10
----SortExec: fetch=10, expr=[custdist@1 DESC,c_count@0 DESC]
----SortExec: TopK(fetch=10), expr=[custdist@1 DESC,c_count@0 DESC]
------ProjectionExec: expr=[c_count@0 as c_count, COUNT(*)@1 as custdist]
--------AggregateExec: mode=FinalPartitioned, gby=[c_count@0 as c_count], aggr=[COUNT(*)]
----------CoalesceBatchesExec: target_batch_size=8192
Expand Down
2 changes: 1 addition & 1 deletion datafusion/sqllogictest/test_files/tpch/q16.slt.part
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ Limit: skip=0, fetch=10
physical_plan
GlobalLimitExec: skip=0, fetch=10
--SortPreservingMergeExec: [supplier_cnt@3 DESC,p_brand@0 ASC NULLS LAST,p_type@1 ASC NULLS LAST,p_size@2 ASC NULLS LAST], fetch=10
----SortExec: fetch=10, expr=[supplier_cnt@3 DESC,p_brand@0 ASC NULLS LAST,p_type@1 ASC NULLS LAST,p_size@2 ASC NULLS LAST]
----SortExec: TopK(fetch=10), expr=[supplier_cnt@3 DESC,p_brand@0 ASC NULLS LAST,p_type@1 ASC NULLS LAST,p_size@2 ASC NULLS LAST]
------ProjectionExec: expr=[group_alias_0@0 as part.p_brand, group_alias_1@1 as part.p_type, group_alias_2@2 as part.p_size, COUNT(alias1)@3 as supplier_cnt]
--------AggregateExec: mode=FinalPartitioned, gby=[group_alias_0@0 as group_alias_0, group_alias_1@1 as group_alias_1, group_alias_2@2 as group_alias_2], aggr=[COUNT(alias1)]
----------CoalesceBatchesExec: target_batch_size=8192
Expand Down
2 changes: 1 addition & 1 deletion datafusion/sqllogictest/test_files/tpch/q2.slt.part
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,7 @@ Limit: skip=0, fetch=10
physical_plan
GlobalLimitExec: skip=0, fetch=10
--SortPreservingMergeExec: [s_acctbal@0 DESC,n_name@2 ASC NULLS LAST,s_name@1 ASC NULLS LAST,p_partkey@3 ASC NULLS LAST], fetch=10
----SortExec: fetch=10, expr=[s_acctbal@0 DESC,n_name@2 ASC NULLS LAST,s_name@1 ASC NULLS LAST,p_partkey@3 ASC NULLS LAST]
----SortExec: TopK(fetch=10), expr=[s_acctbal@0 DESC,n_name@2 ASC NULLS LAST,s_name@1 ASC NULLS LAST,p_partkey@3 ASC NULLS LAST]
------ProjectionExec: expr=[s_acctbal@5 as s_acctbal, s_name@2 as s_name, n_name@8 as n_name, p_partkey@0 as p_partkey, p_mfgr@1 as p_mfgr, s_address@3 as s_address, s_phone@4 as s_phone, s_comment@6 as s_comment]
--------CoalesceBatchesExec: target_batch_size=8192
----------HashJoinExec: mode=Partitioned, join_type=Inner, on=[(p_partkey@0, ps_partkey@1), (ps_supplycost@7, MIN(partsupp.ps_supplycost)@0)]
Expand Down
2 changes: 1 addition & 1 deletion datafusion/sqllogictest/test_files/tpch/q3.slt.part
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ Limit: skip=0, fetch=10
physical_plan
GlobalLimitExec: skip=0, fetch=10
--SortPreservingMergeExec: [revenue@1 DESC,o_orderdate@2 ASC NULLS LAST], fetch=10
----SortExec: fetch=10, expr=[revenue@1 DESC,o_orderdate@2 ASC NULLS LAST]
----SortExec: TopK(fetch=10), expr=[revenue@1 DESC,o_orderdate@2 ASC NULLS LAST]
------ProjectionExec: expr=[l_orderkey@0 as l_orderkey, SUM(lineitem.l_extendedprice * Int64(1) - lineitem.l_discount)@3 as revenue, o_orderdate@1 as o_orderdate, o_shippriority@2 as o_shippriority]
--------AggregateExec: mode=FinalPartitioned, gby=[l_orderkey@0 as l_orderkey, o_orderdate@1 as o_orderdate, o_shippriority@2 as o_shippriority], aggr=[SUM(lineitem.l_extendedprice * Int64(1) - lineitem.l_discount)]
----------CoalesceBatchesExec: target_batch_size=8192
Expand Down
2 changes: 1 addition & 1 deletion datafusion/sqllogictest/test_files/tpch/q9.slt.part
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ Limit: skip=0, fetch=10
physical_plan
GlobalLimitExec: skip=0, fetch=10
--SortPreservingMergeExec: [nation@0 ASC NULLS LAST,o_year@1 DESC], fetch=10
----SortExec: fetch=10, expr=[nation@0 ASC NULLS LAST,o_year@1 DESC]
----SortExec: TopK(fetch=10), expr=[nation@0 ASC NULLS LAST,o_year@1 DESC]
------ProjectionExec: expr=[nation@0 as nation, o_year@1 as o_year, SUM(profit.amount)@2 as sum_profit]
--------AggregateExec: mode=FinalPartitioned, gby=[nation@0 as nation, o_year@1 as o_year], aggr=[SUM(profit.amount)]
----------CoalesceBatchesExec: target_batch_size=8192
Expand Down
Loading

0 comments on commit 26e43ac

Please sign in to comment.