Skip to content

Commit

Permalink
More
Browse files Browse the repository at this point in the history
  • Loading branch information
viirya committed Jan 3, 2024
1 parent b7395a3 commit ebec14e
Show file tree
Hide file tree
Showing 25 changed files with 318 additions and 474 deletions.
228 changes: 84 additions & 144 deletions datafusion/core/src/physical_optimizer/enforce_distribution.rs

Large diffs are not rendered by default.

12 changes: 5 additions & 7 deletions datafusion/core/tests/sql/explain_analyze.rs
Original file line number Diff line number Diff line change
Expand Up @@ -660,14 +660,12 @@ async fn test_physical_plan_display_indent_multi_children() {
" CoalesceBatchesExec: target_batch_size=4096",
" HashJoinExec: mode=Partitioned, join_type=Inner, on=[(c1@0, c2@0)]",
" CoalesceBatchesExec: target_batch_size=4096",
" RepartitionExec: partitioning=Hash([c1@0], 9000), input_partitions=9000",
" RepartitionExec: partitioning=RoundRobinBatch(9000), input_partitions=1",
" CsvExec: file_groups={1 group: [[ARROW_TEST_DATA/csv/aggregate_test_100.csv]]}, projection=[c1], has_header=true",
" RepartitionExec: partitioning=Hash([c1@0], 9000), input_partitions=1",
" CsvExec: file_groups={1 group: [[ARROW_TEST_DATA/csv/aggregate_test_100.csv]]}, projection=[c1], has_header=true",
" CoalesceBatchesExec: target_batch_size=4096",
" RepartitionExec: partitioning=Hash([c2@0], 9000), input_partitions=9000",
" RepartitionExec: partitioning=RoundRobinBatch(9000), input_partitions=1",
" ProjectionExec: expr=[c1@0 as c2]",
" CsvExec: file_groups={1 group: [[ARROW_TEST_DATA/csv/aggregate_test_100.csv]]}, projection=[c1], has_header=true",
" RepartitionExec: partitioning=Hash([c2@0], 9000), input_partitions=1",
" ProjectionExec: expr=[c1@0 as c2]",
" CsvExec: file_groups={1 group: [[ARROW_TEST_DATA/csv/aggregate_test_100.csv]]}, projection=[c1], has_header=true",
];

let normalizer = ExplainNormalizer::new();
Expand Down
28 changes: 12 additions & 16 deletions datafusion/core/tests/sql/joins.rs
Original file line number Diff line number Diff line change
Expand Up @@ -124,19 +124,17 @@ async fn join_change_in_planner() -> Result<()> {
[
"SymmetricHashJoinExec: mode=Partitioned, join_type=Full, on=[(a2@1, a2@1)], filter=CAST(a1@0 AS Int64) > CAST(a1@1 AS Int64) + 3 AND CAST(a1@0 AS Int64) < CAST(a1@1 AS Int64) + 10",
" CoalesceBatchesExec: target_batch_size=8192",
" RepartitionExec: partitioning=Hash([a2@1], 8), input_partitions=8",
" RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1",
// " CsvExec: file_groups={1 group: [[tempdir/left.csv]]}, projection=[a1, a2], has_header=false",
" RepartitionExec: partitioning=Hash([a2@1], 8), input_partitions=1",
// " CsvExec: file_groups={1 group: [[tempdir/left.csv]]}, projection=[a1, a2], has_header=false",
" CoalesceBatchesExec: target_batch_size=8192",
" RepartitionExec: partitioning=Hash([a2@1], 8), input_partitions=8",
" RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1",
// " CsvExec: file_groups={1 group: [[tempdir/right.csv]]}, projection=[a1, a2], has_header=false"
" RepartitionExec: partitioning=Hash([a2@1], 8), input_partitions=1",
// " CsvExec: file_groups={1 group: [[tempdir/right.csv]]}, projection=[a1, a2], has_header=false"
]
};
let mut actual: Vec<&str> = formatted.trim().lines().collect();
// Remove CSV lines
actual.remove(4);
actual.remove(7);
actual.remove(3);
actual.remove(5);

assert_eq!(
expected,
Expand Down Expand Up @@ -172,19 +170,17 @@ async fn join_change_in_planner_without_sort() -> Result<()> {
[
"SymmetricHashJoinExec: mode=Partitioned, join_type=Full, on=[(a2@1, a2@1)], filter=CAST(a1@0 AS Int64) > CAST(a1@1 AS Int64) + 3 AND CAST(a1@0 AS Int64) < CAST(a1@1 AS Int64) + 10",
" CoalesceBatchesExec: target_batch_size=8192",
" RepartitionExec: partitioning=Hash([a2@1], 8), input_partitions=8",
" RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1",
// " CsvExec: file_groups={1 group: [[tempdir/left.csv]]}, projection=[a1, a2], has_header=false",
" RepartitionExec: partitioning=Hash([a2@1], 8), input_partitions=1",
// " CsvExec: file_groups={1 group: [[tempdir/left.csv]]}, projection=[a1, a2], has_header=false",
" CoalesceBatchesExec: target_batch_size=8192",
" RepartitionExec: partitioning=Hash([a2@1], 8), input_partitions=8",
" RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1",
// " CsvExec: file_groups={1 group: [[tempdir/right.csv]]}, projection=[a1, a2], has_header=false"
" RepartitionExec: partitioning=Hash([a2@1], 8), input_partitions=1",
// " CsvExec: file_groups={1 group: [[tempdir/right.csv]]}, projection=[a1, a2], has_header=false"
]
};
let mut actual: Vec<&str> = formatted.trim().lines().collect();
// Remove CSV lines
actual.remove(4);
actual.remove(7);
actual.remove(3);
actual.remove(5);

assert_eq!(
expected,
Expand Down
18 changes: 8 additions & 10 deletions datafusion/core/tests/sql/order.rs
Original file line number Diff line number Diff line change
Expand Up @@ -208,19 +208,17 @@ ORDER BY 1, 2;
" ProjectionExec: expr=[Int64(0)@0 as m, t@1 as t]",
" AggregateExec: mode=FinalPartitioned, gby=[Int64(0)@0 as Int64(0), t@1 as t], aggr=[]",
" CoalesceBatchesExec: target_batch_size=8192",
" RepartitionExec: partitioning=Hash([Int64(0)@0, t@1], 2), input_partitions=2",
" RepartitionExec: partitioning=RoundRobinBatch(2), input_partitions=1",
" AggregateExec: mode=Partial, gby=[0 as Int64(0), t@0 as t], aggr=[]",
" ProjectionExec: expr=[column1@0 as t]",
" ValuesExec",
" RepartitionExec: partitioning=Hash([Int64(0)@0, t@1], 2), input_partitions=1",
" AggregateExec: mode=Partial, gby=[0 as Int64(0), t@0 as t], aggr=[]",
" ProjectionExec: expr=[column1@0 as t]",
" ValuesExec",
" ProjectionExec: expr=[Int64(1)@0 as m, t@1 as t]",
" AggregateExec: mode=FinalPartitioned, gby=[Int64(1)@0 as Int64(1), t@1 as t], aggr=[]",
" CoalesceBatchesExec: target_batch_size=8192",
" RepartitionExec: partitioning=Hash([Int64(1)@0, t@1], 2), input_partitions=2",
" RepartitionExec: partitioning=RoundRobinBatch(2), input_partitions=1",
" AggregateExec: mode=Partial, gby=[1 as Int64(1), t@0 as t], aggr=[]",
" ProjectionExec: expr=[column1@0 as t]",
" ValuesExec",
" RepartitionExec: partitioning=Hash([Int64(1)@0, t@1], 2), input_partitions=1",
" AggregateExec: mode=Partial, gby=[1 as Int64(1), t@0 as t], aggr=[]",
" ProjectionExec: expr=[column1@0 as t]",
" ValuesExec",
];
let formatted = displayable(plan.as_ref()).indent(true).to_string();
let actual: Vec<&str> = formatted.trim().lines().collect();
Expand Down
35 changes: 15 additions & 20 deletions datafusion/sqllogictest/test_files/aggregate.slt
Original file line number Diff line number Diff line change
Expand Up @@ -2541,10 +2541,9 @@ GlobalLimitExec: skip=0, fetch=4
----SortExec: TopK(fetch=4), expr=[MAX(traces.timestamp)@1 DESC]
------AggregateExec: mode=FinalPartitioned, gby=[trace_id@0 as trace_id], aggr=[MAX(traces.timestamp)]
--------CoalesceBatchesExec: target_batch_size=8192
----------RepartitionExec: partitioning=Hash([trace_id@0], 4), input_partitions=4
------------RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1
--------------AggregateExec: mode=Partial, gby=[trace_id@0 as trace_id], aggr=[MAX(traces.timestamp)]
----------------MemoryExec: partitions=1, partition_sizes=[1]
----------RepartitionExec: partitioning=Hash([trace_id@0], 4), input_partitions=1
------------AggregateExec: mode=Partial, gby=[trace_id@0 as trace_id], aggr=[MAX(traces.timestamp)]
--------------MemoryExec: partitions=1, partition_sizes=[1]


query TI
Expand Down Expand Up @@ -2596,10 +2595,9 @@ GlobalLimitExec: skip=0, fetch=4
----SortExec: TopK(fetch=4), expr=[MAX(traces.timestamp)@1 DESC]
------AggregateExec: mode=FinalPartitioned, gby=[trace_id@0 as trace_id], aggr=[MAX(traces.timestamp)], lim=[4]
--------CoalesceBatchesExec: target_batch_size=8192
----------RepartitionExec: partitioning=Hash([trace_id@0], 4), input_partitions=4
------------RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1
--------------AggregateExec: mode=Partial, gby=[trace_id@0 as trace_id], aggr=[MAX(traces.timestamp)], lim=[4]
----------------MemoryExec: partitions=1, partition_sizes=[1]
----------RepartitionExec: partitioning=Hash([trace_id@0], 4), input_partitions=1
------------AggregateExec: mode=Partial, gby=[trace_id@0 as trace_id], aggr=[MAX(traces.timestamp)], lim=[4]
--------------MemoryExec: partitions=1, partition_sizes=[1]

query TT
explain select trace_id, MIN(timestamp) from traces group by trace_id order by MIN(timestamp) desc limit 4;
Expand All @@ -2615,10 +2613,9 @@ GlobalLimitExec: skip=0, fetch=4
----SortExec: TopK(fetch=4), expr=[MIN(traces.timestamp)@1 DESC]
------AggregateExec: mode=FinalPartitioned, gby=[trace_id@0 as trace_id], aggr=[MIN(traces.timestamp)]
--------CoalesceBatchesExec: target_batch_size=8192
----------RepartitionExec: partitioning=Hash([trace_id@0], 4), input_partitions=4
------------RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1
--------------AggregateExec: mode=Partial, gby=[trace_id@0 as trace_id], aggr=[MIN(traces.timestamp)]
----------------MemoryExec: partitions=1, partition_sizes=[1]
----------RepartitionExec: partitioning=Hash([trace_id@0], 4), input_partitions=1
------------AggregateExec: mode=Partial, gby=[trace_id@0 as trace_id], aggr=[MIN(traces.timestamp)]
--------------MemoryExec: partitions=1, partition_sizes=[1]

query TT
explain select trace_id, MAX(timestamp) from traces group by trace_id order by MAX(timestamp) asc limit 4;
Expand All @@ -2634,10 +2631,9 @@ GlobalLimitExec: skip=0, fetch=4
----SortExec: TopK(fetch=4), expr=[MAX(traces.timestamp)@1 ASC NULLS LAST]
------AggregateExec: mode=FinalPartitioned, gby=[trace_id@0 as trace_id], aggr=[MAX(traces.timestamp)]
--------CoalesceBatchesExec: target_batch_size=8192
----------RepartitionExec: partitioning=Hash([trace_id@0], 4), input_partitions=4
------------RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1
--------------AggregateExec: mode=Partial, gby=[trace_id@0 as trace_id], aggr=[MAX(traces.timestamp)]
----------------MemoryExec: partitions=1, partition_sizes=[1]
----------RepartitionExec: partitioning=Hash([trace_id@0], 4), input_partitions=1
------------AggregateExec: mode=Partial, gby=[trace_id@0 as trace_id], aggr=[MAX(traces.timestamp)]
--------------MemoryExec: partitions=1, partition_sizes=[1]

query TT
explain select trace_id, MAX(timestamp) from traces group by trace_id order by trace_id asc limit 4;
Expand All @@ -2653,10 +2649,9 @@ GlobalLimitExec: skip=0, fetch=4
----SortExec: TopK(fetch=4), expr=[trace_id@0 ASC NULLS LAST]
------AggregateExec: mode=FinalPartitioned, gby=[trace_id@0 as trace_id], aggr=[MAX(traces.timestamp)]
--------CoalesceBatchesExec: target_batch_size=8192
----------RepartitionExec: partitioning=Hash([trace_id@0], 4), input_partitions=4
------------RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1
--------------AggregateExec: mode=Partial, gby=[trace_id@0 as trace_id], aggr=[MAX(traces.timestamp)]
----------------MemoryExec: partitions=1, partition_sizes=[1]
----------RepartitionExec: partitioning=Hash([trace_id@0], 4), input_partitions=1
------------AggregateExec: mode=Partial, gby=[trace_id@0 as trace_id], aggr=[MAX(traces.timestamp)]
--------------MemoryExec: partitions=1, partition_sizes=[1]

query TI
select trace_id, MAX(timestamp) from traces group by trace_id order by MAX(timestamp) desc limit 4;
Expand Down
30 changes: 13 additions & 17 deletions datafusion/sqllogictest/test_files/groupby.slt
Original file line number Diff line number Diff line change
Expand Up @@ -2752,10 +2752,9 @@ SortPreservingMergeExec: [country@0 ASC NULLS LAST]
----ProjectionExec: expr=[country@0 as country, FIRST_VALUE(sales_global.amount) ORDER BY [sales_global.ts ASC NULLS LAST]@1 as fv1, LAST_VALUE(sales_global.amount) ORDER BY [sales_global.ts ASC NULLS LAST]@2 as fv2]
------AggregateExec: mode=FinalPartitioned, gby=[country@0 as country], aggr=[FIRST_VALUE(sales_global.amount), LAST_VALUE(sales_global.amount)]
--------CoalesceBatchesExec: target_batch_size=8192
----------RepartitionExec: partitioning=Hash([country@0], 8), input_partitions=8
------------RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1
--------------AggregateExec: mode=Partial, gby=[country@0 as country], aggr=[FIRST_VALUE(sales_global.amount), LAST_VALUE(sales_global.amount)]
----------------MemoryExec: partitions=1, partition_sizes=[1]
----------RepartitionExec: partitioning=Hash([country@0], 8), input_partitions=1
------------AggregateExec: mode=Partial, gby=[country@0 as country], aggr=[FIRST_VALUE(sales_global.amount), LAST_VALUE(sales_global.amount)]
--------------MemoryExec: partitions=1, partition_sizes=[1]

query TRR
SELECT country, FIRST_VALUE(amount ORDER BY ts ASC) AS fv1,
Expand Down Expand Up @@ -2788,10 +2787,9 @@ SortPreservingMergeExec: [country@0 ASC NULLS LAST]
----ProjectionExec: expr=[country@0 as country, FIRST_VALUE(sales_global.amount) ORDER BY [sales_global.ts ASC NULLS LAST]@1 as fv1, LAST_VALUE(sales_global.amount) ORDER BY [sales_global.ts DESC NULLS FIRST]@2 as fv2]
------AggregateExec: mode=FinalPartitioned, gby=[country@0 as country], aggr=[FIRST_VALUE(sales_global.amount), LAST_VALUE(sales_global.amount)]
--------CoalesceBatchesExec: target_batch_size=8192
----------RepartitionExec: partitioning=Hash([country@0], 8), input_partitions=8
------------RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1
--------------AggregateExec: mode=Partial, gby=[country@0 as country], aggr=[FIRST_VALUE(sales_global.amount), LAST_VALUE(sales_global.amount)]
----------------MemoryExec: partitions=1, partition_sizes=[1]
----------RepartitionExec: partitioning=Hash([country@0], 8), input_partitions=1
------------AggregateExec: mode=Partial, gby=[country@0 as country], aggr=[FIRST_VALUE(sales_global.amount), LAST_VALUE(sales_global.amount)]
--------------MemoryExec: partitions=1, partition_sizes=[1]

query TRR
SELECT country, FIRST_VALUE(amount ORDER BY ts ASC) AS fv1,
Expand Down Expand Up @@ -4008,10 +4006,9 @@ physical_plan
ProjectionExec: expr=[SUM(DISTINCT t1.x)@1 as SUM(DISTINCT t1.x), MAX(DISTINCT t1.x)@2 as MAX(DISTINCT t1.x)]
--AggregateExec: mode=FinalPartitioned, gby=[y@0 as y], aggr=[SUM(DISTINCT t1.x), MAX(DISTINCT t1.x)]
----CoalesceBatchesExec: target_batch_size=2
------RepartitionExec: partitioning=Hash([y@0], 8), input_partitions=8
--------RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1
----------AggregateExec: mode=Partial, gby=[y@1 as y], aggr=[SUM(DISTINCT t1.x), MAX(DISTINCT t1.x)]
------------MemoryExec: partitions=1, partition_sizes=[1]
------RepartitionExec: partitioning=Hash([y@0], 8), input_partitions=1
--------AggregateExec: mode=Partial, gby=[y@1 as y], aggr=[SUM(DISTINCT t1.x), MAX(DISTINCT t1.x)]
----------MemoryExec: partitions=1, partition_sizes=[1]

query TT
EXPLAIN SELECT SUM(DISTINCT CAST(x AS DOUBLE)), MAX(DISTINCT CAST(x AS DOUBLE)) FROM t1 GROUP BY y;
Expand All @@ -4030,11 +4027,10 @@ ProjectionExec: expr=[SUM(alias1)@1 as SUM(DISTINCT t1.x), MAX(alias1)@2 as MAX(
--------AggregateExec: mode=Partial, gby=[y@0 as y], aggr=[SUM(alias1), MAX(alias1)]
----------AggregateExec: mode=FinalPartitioned, gby=[y@0 as y, alias1@1 as alias1], aggr=[]
------------CoalesceBatchesExec: target_batch_size=2
--------------RepartitionExec: partitioning=Hash([y@0, alias1@1], 8), input_partitions=8
----------------RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1
------------------AggregateExec: mode=Partial, gby=[y@1 as y, CAST(t1.x AS Float64)t1.x@0 as alias1], aggr=[]
--------------------ProjectionExec: expr=[CAST(x@0 AS Float64) as CAST(t1.x AS Float64)t1.x, y@1 as y]
----------------------MemoryExec: partitions=1, partition_sizes=[1]
--------------RepartitionExec: partitioning=Hash([y@0, alias1@1], 8), input_partitions=1
----------------AggregateExec: mode=Partial, gby=[y@1 as y, CAST(t1.x AS Float64)t1.x@0 as alias1], aggr=[]
------------------ProjectionExec: expr=[CAST(x@0 AS Float64) as CAST(t1.x AS Float64)t1.x, y@1 as y]
--------------------MemoryExec: partitions=1, partition_sizes=[1]

# create an unbounded table that contains ordered timestamp.
statement ok
Expand Down
Loading

0 comments on commit ebec14e

Please sign in to comment.