From 1072d28702125b45840713774e74a063ff879e21 Mon Sep 17 00:00:00 2001 From: jacobbaumbachBaumbach Date: Thu, 25 Mar 2021 22:45:34 -0700 Subject: [PATCH 1/4] Update output_partitioning for union to preserve Hash. Still needs a test for the hash case. --- rust/datafusion/src/physical_plan/union.rs | 26 +++++++++++++--------- 1 file changed, 16 insertions(+), 10 deletions(-) diff --git a/rust/datafusion/src/physical_plan/union.rs b/rust/datafusion/src/physical_plan/union.rs index cbab728a8428b..f91ab87e0ef4f 100644 --- a/rust/datafusion/src/physical_plan/union.rs +++ b/rust/datafusion/src/physical_plan/union.rs @@ -25,7 +25,7 @@ use std::{any::Any, sync::Arc}; use arrow::datatypes::SchemaRef; -use super::{ExecutionPlan, Partitioning, SendableRecordBatchStream}; +use super::{ExecutionPlan, Partitioning, PhysicalExpr, SendableRecordBatchStream}; use crate::error::Result; use async_trait::async_trait; @@ -60,15 +60,21 @@ impl ExecutionPlan for UnionExec { /// Output of the union is the combination of all output partitions of the inputs fn output_partitioning(&self) -> Partitioning { - // Sums all the output partitions - let num_partitions = self - .inputs - .iter() - .map(|plan| plan.output_partitioning().partition_count()) - .sum(); - // TODO: this loses partitioning info in case of same partitioning scheme (for example `Partitioning::Hash`) - // https://issues.apache.org/jira/browse/ARROW-11991 - Partitioning::UnknownPartitioning(num_partitions) + let intial: std::result::Result<(Vec>, usize), usize> = + Ok((vec![], 0)); + let input = self.inputs.iter().fold(intial, |acc, plan| { + match (acc, plan.output_partitioning()) { + (Ok((mut v, s)), Partitioning::Hash(vp, sp)) => { + v.append(&mut vp.clone()); + Ok((v, s + sp)) + } + (Ok((_, s)), p) | (Err(s), p) => Err(s + p.partition_count()), + } + }); + match input { + Ok((v, s)) => Partitioning::Hash(v, s), + Err(s) => Partitioning::UnknownPartitioning(s), + } } fn with_new_children( From 2fbdc80d1b0d576776e6901e1af82a2e0900bb8f Mon Sep 17 00:00:00 2001 From: jacobbaumbachBaumbach Date: Thu, 1 Apr 2021 22:50:49 -0700 Subject: [PATCH 2/4] add hash test --- rust/datafusion/src/physical_plan/union.rs | 46 ++++++++++++++++++++-- 1 file changed, 42 insertions(+), 4 deletions(-) diff --git a/rust/datafusion/src/physical_plan/union.rs b/rust/datafusion/src/physical_plan/union.rs index f91ab87e0ef4f..5ad544012b37b 100644 --- a/rust/datafusion/src/physical_plan/union.rs +++ b/rust/datafusion/src/physical_plan/union.rs @@ -105,17 +105,17 @@ impl ExecutionPlan for UnionExec { #[cfg(test)] mod tests { use super::*; + use crate::physical_plan::expressions::Column; use crate::physical_plan::{ collect, csv::{CsvExec, CsvReadOptions}, + repartition::RepartitionExec, }; use crate::test; use arrow::record_batch::RecordBatch; - #[tokio::test] - async fn test_union_partitions() -> Result<()> { + fn get_csv_exec() -> Result<(CsvExec, CsvExec)> { let schema = test::aggr_test_schema(); - // Create csv's with different partitioning let path = test::create_partitioned_csv("aggregate_test_100.csv", 4)?; let path2 = test::create_partitioned_csv("aggregate_test_100.csv", 5)?; @@ -135,15 +135,53 @@ mod tests { 1024, None, )?; + Ok((csv, csv2)) + } + + #[tokio::test] + async fn test_union_partitions() -> Result<()> { + let (csv, csv2) = get_csv_exec()?; let union_exec = Arc::new(UnionExec::new(vec![Arc::new(csv), Arc::new(csv2)])); // Should have 9 partitions and 9 output batches - assert_eq!(union_exec.output_partitioning().partition_count(), 9); + assert!(matches!( + union_exec.output_partitioning(), + Partitioning::UnknownPartitioning(9) + )); let result: Vec = collect(union_exec).await?; assert_eq!(result.len(), 9); Ok(()) } + + #[tokio::test] + async fn test_union_partitions_hash() -> Result<()> { + let (csv, csv2) = get_csv_exec()?; + let repartition = RepartitionExec::try_new( + Arc::new(csv), + Partitioning::Hash(vec![Arc::new(Column::new("c1"))], 5), + )?; + let repartition2 = RepartitionExec::try_new( + Arc::new(csv2), + Partitioning::Hash(vec![Arc::new(Column::new("c2"))], 5), + )?; + + let union_exec = Arc::new(UnionExec::new(vec![ + Arc::new(repartition), + Arc::new(repartition2), + ])); + + // should be hash, have 10 partitions and 45 output batches + assert!(matches!( + union_exec.output_partitioning(), + Partitioning::Hash(_, 10) + )); + + let result: Vec = collect(union_exec).await?; + assert_eq!(result.len(), 45); + + Ok(()) + } } From 5dc8034f8ced6bdd1c58f70c60ee73047e8730dd Mon Sep 17 00:00:00 2001 From: jacobbaumbachBaumbach Date: Fri, 2 Apr 2021 12:27:27 -0700 Subject: [PATCH 3/4] handle RoundRobin and add tests for this case and mixed partitioning --- rust/datafusion/src/physical_plan/union.rs | 92 ++++++++++++++++++---- 1 file changed, 77 insertions(+), 15 deletions(-) diff --git a/rust/datafusion/src/physical_plan/union.rs b/rust/datafusion/src/physical_plan/union.rs index 5ad544012b37b..5e41ba7b542e3 100644 --- a/rust/datafusion/src/physical_plan/union.rs +++ b/rust/datafusion/src/physical_plan/union.rs @@ -25,7 +25,7 @@ use std::{any::Any, sync::Arc}; use arrow::datatypes::SchemaRef; -use super::{ExecutionPlan, Partitioning, PhysicalExpr, SendableRecordBatchStream}; +use super::{ExecutionPlan, Partitioning, SendableRecordBatchStream}; use crate::error::Result; use async_trait::async_trait; @@ -60,21 +60,31 @@ impl ExecutionPlan for UnionExec { /// Output of the union is the combination of all output partitions of the inputs fn output_partitioning(&self) -> Partitioning { - let intial: std::result::Result<(Vec>, usize), usize> = - Ok((vec![], 0)); - let input = self.inputs.iter().fold(intial, |acc, plan| { - match (acc, plan.output_partitioning()) { - (Ok((mut v, s)), Partitioning::Hash(vp, sp)) => { - v.append(&mut vp.clone()); - Ok((v, s + sp)) + let intial: Option = None; + self.inputs + .iter() + .fold(intial, |acc, input| { + match (acc, input.output_partitioning()) { + (None, partition) => Some(partition), + ( + Some(Partitioning::Hash(mut vector_acc, size_acc)), + Partitioning::Hash(vector, size), + ) => { + vector_acc.append(&mut vector.clone()); + Some(Partitioning::Hash(vector_acc, size_acc + size)) + } + ( + Some(Partitioning::RoundRobinBatch(size_acc)), + Partitioning::RoundRobinBatch(size), + ) => Some(Partitioning::RoundRobinBatch(size_acc + size)), + (Some(partition_acc), partition) => { + Some(Partitioning::UnknownPartitioning( + partition_acc.partition_count() + partition.partition_count(), + )) + } } - (Ok((_, s)), p) | (Err(s), p) => Err(s + p.partition_count()), - } - }); - match input { - Ok((v, s)) => Partitioning::Hash(v, s), - Err(s) => Partitioning::UnknownPartitioning(s), - } + }) + .unwrap() } fn with_new_children( @@ -184,4 +194,56 @@ mod tests { Ok(()) } + + #[tokio::test] + async fn test_union_partitions_round_robin() -> Result<()> { + let (csv, csv2) = get_csv_exec()?; + let repartition = + RepartitionExec::try_new(Arc::new(csv), Partitioning::RoundRobinBatch(4))?; + let repartition2 = + RepartitionExec::try_new(Arc::new(csv2), Partitioning::RoundRobinBatch(6))?; + + let union_exec = Arc::new(UnionExec::new(vec![ + Arc::new(repartition), + Arc::new(repartition2), + ])); + + // should be hash, have 10 partitions and 9 output batches + assert!(matches!( + union_exec.output_partitioning(), + Partitioning::RoundRobinBatch(10) + )); + + let result: Vec = collect(union_exec).await?; + assert_eq!(result.len(), 9); + + Ok(()) + } + + #[tokio::test] + async fn test_union_partitions_round_mix() -> Result<()> { + let (csv, csv2) = get_csv_exec()?; + let repartition = RepartitionExec::try_new( + Arc::new(csv), + Partitioning::Hash(vec![Arc::new(Column::new("c1"))], 5), + )?; + let repartition2 = + RepartitionExec::try_new(Arc::new(csv2), Partitioning::RoundRobinBatch(6))?; + + let union_exec = Arc::new(UnionExec::new(vec![ + Arc::new(repartition), + Arc::new(repartition2), + ])); + + // should be hash, have 11 partitions and 25 output batches + assert!(matches!( + union_exec.output_partitioning(), + Partitioning::UnknownPartitioning(11) + )); + + let result: Vec = collect(union_exec).await?; + assert_eq!(result.len(), 25); + + Ok(()) + } } From d907df8aef83ff234396f9297dcd9309f6f520fb Mon Sep 17 00:00:00 2001 From: jacobbaumbachBaumbach Date: Fri, 2 Apr 2021 13:17:21 -0700 Subject: [PATCH 4/4] update test names to be consistent --- rust/datafusion/src/physical_plan/union.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/rust/datafusion/src/physical_plan/union.rs b/rust/datafusion/src/physical_plan/union.rs index 5e41ba7b542e3..0f3da9c10d2a6 100644 --- a/rust/datafusion/src/physical_plan/union.rs +++ b/rust/datafusion/src/physical_plan/union.rs @@ -149,7 +149,7 @@ mod tests { } #[tokio::test] - async fn test_union_partitions() -> Result<()> { + async fn test_union_partitions_unknown() -> Result<()> { let (csv, csv2) = get_csv_exec()?; let union_exec = Arc::new(UnionExec::new(vec![Arc::new(csv), Arc::new(csv2)])); @@ -221,7 +221,7 @@ mod tests { } #[tokio::test] - async fn test_union_partitions_round_mix() -> Result<()> { + async fn test_union_partitions_mix() -> Result<()> { let (csv, csv2) = get_csv_exec()?; let repartition = RepartitionExec::try_new( Arc::new(csv),