Skip to content

Commit

Permalink
Move repartition_file_scans out of enable_round_robin check in `E…
Browse files Browse the repository at this point in the history
…nforceDistribution` rule (#8731)

* Cleanup

* More

* Restore add_roundrobin_on_top

* Restore test files

* More

* Restore

* More

* More

* Make test stable

* For review

* Add test
  • Loading branch information
viirya authored Jan 5, 2024
1 parent 0208755 commit 29f23eb
Show file tree
Hide file tree
Showing 6 changed files with 45 additions and 31 deletions.
42 changes: 18 additions & 24 deletions datafusion/core/src/physical_optimizer/enforce_distribution.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1197,32 +1197,33 @@ fn ensure_distribution(
)
.map(
|(mut child, requirement, required_input_ordering, would_benefit, maintains)| {
// Don't need to apply when the returned row count is not greater than 1:
// Don't need to apply when the returned row count is not greater than batch size
let num_rows = child.plan.statistics()?.num_rows;
let repartition_beneficial_stats = if num_rows.is_exact().unwrap_or(false) {
num_rows
.get_value()
.map(|value| value > &batch_size)
.unwrap_or(true)
.unwrap() // safe to unwrap since is_exact() is true
} else {
true
};

// When `repartition_file_scans` is set, attempt to increase
// parallelism at the source.
if repartition_file_scans && repartition_beneficial_stats {
if let Some(new_child) =
child.plan.repartitioned(target_partitions, config)?
{
child.plan = new_child;
}
}

if enable_round_robin
// Operator benefits from partitioning (e.g. filter):
&& (would_benefit && repartition_beneficial_stats)
// Unless partitioning doesn't increase the partition count, it is not beneficial:
&& child.plan.output_partitioning().partition_count() < target_partitions
{
// When `repartition_file_scans` is set, attempt to increase
// parallelism at the source.
if repartition_file_scans {
if let Some(new_child) =
child.plan.repartitioned(target_partitions, config)?
{
child.plan = new_child;
}
}
// Increase parallelism by adding round-robin repartitioning
// on top of the operator. Note that we only do this if the
// partition count is not already equal to the desired partition
Expand Down Expand Up @@ -1361,17 +1362,10 @@ impl DistributionContext {

fn update_children(mut self) -> Result<Self> {
for child_context in self.children_nodes.iter_mut() {
child_context.distribution_connection = match child_context.plan.as_any() {
plan_any if plan_any.is::<RepartitionExec>() => matches!(
plan_any
.downcast_ref::<RepartitionExec>()
.unwrap()
.partitioning(),
Partitioning::RoundRobinBatch(_) | Partitioning::Hash(_, _)
),
plan_any
if plan_any.is::<SortPreservingMergeExec>()
|| plan_any.is::<CoalescePartitionsExec>() =>
child_context.distribution_connection = match &child_context.plan {
plan if is_repartition(plan)
|| is_coalesce_partitions(plan)
|| is_sort_preserving_merge(plan) =>
{
true
}
Expand Down Expand Up @@ -3870,14 +3864,14 @@ pub(crate) mod tests {
"RepartitionExec: partitioning=Hash([a@0], 2), input_partitions=2",
"AggregateExec: mode=Partial, gby=[a@0 as a], aggr=[]",
// Plan already has two partitions
"ParquetExec: file_groups={2 groups: [[x], [y]]}, projection=[a, b, c, d, e]",
"ParquetExec: file_groups={2 groups: [[x:0..100], [y:0..100]]}, projection=[a, b, c, d, e]",
];
let expected_csv = [
"AggregateExec: mode=FinalPartitioned, gby=[a@0 as a], aggr=[]",
"RepartitionExec: partitioning=Hash([a@0], 2), input_partitions=2",
"AggregateExec: mode=Partial, gby=[a@0 as a], aggr=[]",
// Plan already has two partitions
"CsvExec: file_groups={2 groups: [[x], [y]]}, projection=[a, b, c, d, e], has_header=false",
"CsvExec: file_groups={2 groups: [[x:0..100], [y:0..100]]}, projection=[a, b, c, d, e], has_header=false",
];

assert_optimized!(expected_parquet, plan_parquet, true, false, 2, true, 10);
Expand Down
2 changes: 1 addition & 1 deletion datafusion/sqllogictest/test_files/arrow_typeof.slt
Original file line number Diff line number Diff line change
Expand Up @@ -375,4 +375,4 @@ select arrow_cast(make_array(1, 2, 3), 'LargeList(Int64)');
query T
select arrow_typeof(arrow_cast(make_array(1, 2, 3), 'LargeList(Int64)'));
----
LargeList(Field { name: "item", data_type: Int64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} })
LargeList(Field { name: "item", data_type: Int64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} })
24 changes: 22 additions & 2 deletions datafusion/sqllogictest/test_files/repartition_scan.slt
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,26 @@ CoalesceBatchesExec: target_batch_size=8192
--FilterExec: column1@0 != 42
----ParquetExec: file_groups={4 groups: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/repartition_scan/parquet_table/2.parquet:0..101], [WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/repartition_scan/parquet_table/2.parquet:101..202], [WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/repartition_scan/parquet_table/2.parquet:202..303], [WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/repartition_scan/parquet_table/2.parquet:303..403]]}, projection=[column1], predicate=column1@0 != 42, pruning_predicate=column1_min@0 != 42 OR 42 != column1_max@1

# disable round robin repartitioning
statement ok
set datafusion.optimizer.enable_round_robin_repartition = false;

## Expect to see the scan read the file as "4" groups with even sizes (offsets) again
query TT
EXPLAIN SELECT column1 FROM parquet_table WHERE column1 <> 42;
----
logical_plan
Filter: parquet_table.column1 != Int32(42)
--TableScan: parquet_table projection=[column1], partial_filters=[parquet_table.column1 != Int32(42)]
physical_plan
CoalesceBatchesExec: target_batch_size=8192
--FilterExec: column1@0 != 42
----ParquetExec: file_groups={4 groups: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/repartition_scan/parquet_table/2.parquet:0..101], [WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/repartition_scan/parquet_table/2.parquet:101..202], [WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/repartition_scan/parquet_table/2.parquet:202..303], [WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/repartition_scan/parquet_table/2.parquet:303..403]]}, projection=[column1], predicate=column1@0 != 42, pruning_predicate=column1_min@0 != 42 OR 42 != column1_max@1

# enable round robin repartitioning again
statement ok
set datafusion.optimizer.enable_round_robin_repartition = true;

# create a second parquet file
statement ok
COPY (VALUES (100), (200)) TO 'test_files/scratch/repartition_scan/parquet_table/1.parquet'
Expand Down Expand Up @@ -147,7 +167,7 @@ WITH HEADER ROW
LOCATION 'test_files/scratch/repartition_scan/csv_table/';

query I
select * from csv_table;
select * from csv_table ORDER BY column1;
----
1
2
Expand Down Expand Up @@ -190,7 +210,7 @@ STORED AS json
LOCATION 'test_files/scratch/repartition_scan/json_table/';

query I
select * from "json_table";
select * from "json_table" ORDER BY column1;
----
1
2
Expand Down
4 changes: 2 additions & 2 deletions datafusion/sqllogictest/test_files/timestamps.slt
Original file line number Diff line number Diff line change
Expand Up @@ -1862,7 +1862,7 @@ SELECT to_timestamp(null) is null as c1,
----
true true true true true true true true true true true true true

# verify timestamp output types
# verify timestamp output types
query TTT
SELECT arrow_typeof(to_timestamp(1)), arrow_typeof(to_timestamp(null)), arrow_typeof(to_timestamp('2023-01-10 12:34:56.000'))
----
Expand All @@ -1880,7 +1880,7 @@ SELECT arrow_typeof(to_timestamp(1)) = arrow_typeof(1::timestamp) as c1,
true true true true true true

# known issues. currently overflows (expects default precision to be microsecond instead of nanoseconds. Work pending)
#verify extreme values
#verify extreme values
#query PPPPPPPP
#SELECT to_timestamp(-62125747200), to_timestamp(1926632005177), -62125747200::timestamp, 1926632005177::timestamp, cast(-62125747200 as timestamp), cast(1926632005177 as timestamp)
#----
Expand Down
2 changes: 1 addition & 1 deletion datafusion/sqllogictest/test_files/tpch/q2.slt.part
Original file line number Diff line number Diff line change
Expand Up @@ -238,7 +238,7 @@ order by
p_partkey
limit 10;
----
9828.21 Supplier#000000647 UNITED KINGDOM 13120 Manufacturer#5 x5U7MBZmwfG9 33-258-202-4782 s the slyly even ideas poach fluffily
9828.21 Supplier#000000647 UNITED KINGDOM 13120 Manufacturer#5 x5U7MBZmwfG9 33-258-202-4782 s the slyly even ideas poach fluffily
9508.37 Supplier#000000070 FRANCE 3563 Manufacturer#1 INWNH2w,OOWgNDq0BRCcBwOMQc6PdFDc4 16-821-608-1166 ests sleep quickly express ideas. ironic ideas haggle about the final T
9508.37 Supplier#000000070 FRANCE 17268 Manufacturer#4 INWNH2w,OOWgNDq0BRCcBwOMQc6PdFDc4 16-821-608-1166 ests sleep quickly express ideas. ironic ideas haggle about the final T
9453.01 Supplier#000000802 ROMANIA 10021 Manufacturer#5 ,6HYXb4uaHITmtMBj4Ak57Pd 29-342-882-6463 gular frets. permanently special multipliers believe blithely alongs
Expand Down
2 changes: 1 addition & 1 deletion datafusion/sqllogictest/test_files/window.slt
Original file line number Diff line number Diff line change
Expand Up @@ -3794,7 +3794,7 @@ select a,
1 1
2 1

# support scalar value in ORDER BY
# support scalar value in ORDER BY
query I
select rank() over (order by 1) rnk from (select 1 a union all select 2 a) x
----
Expand Down

0 comments on commit 29f23eb

Please sign in to comment.