-
Notifications
You must be signed in to change notification settings - Fork 3.6k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
ARROW-11991: [Rust][DataFusion] Maintain partition information in Union #9806
Changes from all commits
1072d28
2fbdc80
5dc8034
f2c3571
d907df8
22892ba
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -60,15 +60,31 @@ impl ExecutionPlan for UnionExec { | |
|
||
/// Output of the union is the combination of all output partitions of the inputs | ||
fn output_partitioning(&self) -> Partitioning { | ||
// Sums all the output partitions | ||
let num_partitions = self | ||
.inputs | ||
let intial: Option<Partitioning> = None; | ||
self.inputs | ||
.iter() | ||
.map(|plan| plan.output_partitioning().partition_count()) | ||
.sum(); | ||
// TODO: this loses partitioning info in case of same partitioning scheme (for example `Partitioning::Hash`) | ||
// https://issues.apache.org/jira/browse/ARROW-11991 | ||
Partitioning::UnknownPartitioning(num_partitions) | ||
.fold(intial, |acc, input| { | ||
match (acc, input.output_partitioning()) { | ||
(None, partition) => Some(partition), | ||
( | ||
Some(Partitioning::Hash(mut vector_acc, size_acc)), | ||
Partitioning::Hash(vector, size), | ||
) => { | ||
vector_acc.append(&mut vector.clone()); | ||
Some(Partitioning::Hash(vector_acc, size_acc + size)) | ||
} | ||
( | ||
Some(Partitioning::RoundRobinBatch(size_acc)), | ||
Partitioning::RoundRobinBatch(size), | ||
) => Some(Partitioning::RoundRobinBatch(size_acc + size)), | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think the output batch size might be more precisely described as though I am unsure of the requirements on a operator that produces There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. So it doesn't say anything about this batch size, only he number of partitions. |
||
(Some(partition_acc), partition) => { | ||
Some(Partitioning::UnknownPartitioning( | ||
partition_acc.partition_count() + partition.partition_count(), | ||
)) | ||
} | ||
} | ||
}) | ||
.unwrap() | ||
} | ||
|
||
fn with_new_children( | ||
|
@@ -99,17 +115,17 @@ impl ExecutionPlan for UnionExec { | |
#[cfg(test)] | ||
mod tests { | ||
use super::*; | ||
use crate::physical_plan::expressions::Column; | ||
use crate::physical_plan::{ | ||
collect, | ||
csv::{CsvExec, CsvReadOptions}, | ||
repartition::RepartitionExec, | ||
}; | ||
use crate::test; | ||
use arrow::record_batch::RecordBatch; | ||
|
||
#[tokio::test] | ||
async fn test_union_partitions() -> Result<()> { | ||
fn get_csv_exec() -> Result<(CsvExec, CsvExec)> { | ||
let schema = test::aggr_test_schema(); | ||
|
||
// Create csv's with different partitioning | ||
let path = test::create_partitioned_csv("aggregate_test_100.csv", 4)?; | ||
let path2 = test::create_partitioned_csv("aggregate_test_100.csv", 5)?; | ||
|
@@ -129,15 +145,105 @@ mod tests { | |
1024, | ||
None, | ||
)?; | ||
Ok((csv, csv2)) | ||
} | ||
|
||
#[tokio::test] | ||
async fn test_union_partitions_unknown() -> Result<()> { | ||
let (csv, csv2) = get_csv_exec()?; | ||
|
||
let union_exec = Arc::new(UnionExec::new(vec![Arc::new(csv), Arc::new(csv2)])); | ||
|
||
// Should have 9 partitions and 9 output batches | ||
assert_eq!(union_exec.output_partitioning().partition_count(), 9); | ||
assert!(matches!( | ||
union_exec.output_partitioning(), | ||
Partitioning::UnknownPartitioning(9) | ||
)); | ||
|
||
let result: Vec<RecordBatch> = collect(union_exec).await?; | ||
assert_eq!(result.len(), 9); | ||
|
||
Ok(()) | ||
} | ||
|
||
#[tokio::test] | ||
async fn test_union_partitions_hash() -> Result<()> { | ||
let (csv, csv2) = get_csv_exec()?; | ||
let repartition = RepartitionExec::try_new( | ||
Arc::new(csv), | ||
Partitioning::Hash(vec![Arc::new(Column::new("c1"))], 5), | ||
)?; | ||
let repartition2 = RepartitionExec::try_new( | ||
Arc::new(csv2), | ||
Partitioning::Hash(vec![Arc::new(Column::new("c2"))], 5), | ||
)?; | ||
|
||
let union_exec = Arc::new(UnionExec::new(vec![ | ||
Arc::new(repartition), | ||
Arc::new(repartition2), | ||
])); | ||
|
||
// should be hash, have 10 partitions and 45 output batches | ||
assert!(matches!( | ||
union_exec.output_partitioning(), | ||
Partitioning::Hash(_, 10) | ||
)); | ||
|
||
let result: Vec<RecordBatch> = collect(union_exec).await?; | ||
assert_eq!(result.len(), 45); | ||
|
||
Ok(()) | ||
} | ||
|
||
#[tokio::test] | ||
async fn test_union_partitions_round_robin() -> Result<()> { | ||
let (csv, csv2) = get_csv_exec()?; | ||
let repartition = | ||
RepartitionExec::try_new(Arc::new(csv), Partitioning::RoundRobinBatch(4))?; | ||
let repartition2 = | ||
RepartitionExec::try_new(Arc::new(csv2), Partitioning::RoundRobinBatch(6))?; | ||
|
||
let union_exec = Arc::new(UnionExec::new(vec![ | ||
Arc::new(repartition), | ||
Arc::new(repartition2), | ||
])); | ||
|
||
// should be hash, have 10 partitions and 9 output batches | ||
assert!(matches!( | ||
union_exec.output_partitioning(), | ||
Partitioning::RoundRobinBatch(10) | ||
)); | ||
|
||
let result: Vec<RecordBatch> = collect(union_exec).await?; | ||
assert_eq!(result.len(), 9); | ||
|
||
Ok(()) | ||
} | ||
|
||
#[tokio::test] | ||
async fn test_union_partitions_mix() -> Result<()> { | ||
let (csv, csv2) = get_csv_exec()?; | ||
let repartition = RepartitionExec::try_new( | ||
Arc::new(csv), | ||
Partitioning::Hash(vec![Arc::new(Column::new("c1"))], 5), | ||
)?; | ||
let repartition2 = | ||
RepartitionExec::try_new(Arc::new(csv2), Partitioning::RoundRobinBatch(6))?; | ||
|
||
let union_exec = Arc::new(UnionExec::new(vec![ | ||
Arc::new(repartition), | ||
Arc::new(repartition2), | ||
])); | ||
|
||
// should be hash, have 11 partitions and 25 output batches | ||
assert!(matches!( | ||
union_exec.output_partitioning(), | ||
Partitioning::UnknownPartitioning(11) | ||
)); | ||
|
||
let result: Vec<RecordBatch> = collect(union_exec).await?; | ||
assert_eq!(result.len(), 25); | ||
|
||
Ok(()) | ||
} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I am not sure this is correct -- of the input is hash partitioned by some expression across
size_acc
partitions, the output will only be hash partitioned if the hash expressions are the same and the number of output partitions is the same, right?There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think so too. Maybe just look for the first input (there is always at least 2 in union) and compare with the others to have same expression and same nr of partitions.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Just to make sure I am on the same page:
Two
Partitioning::Hash
may only be composed if they have the same expression and number of partitions. If the twoPartitioning::Hash
differ in expression and/or number of partitions then it becomes aUnknownPartitioning
. Correct?There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
That is my understanding
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@alamb I am having trouble figuring out how to check for equality between
PhysicalExpr
. Do I need to implement aPartialEq
or is there a more clever way?There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🤔 I suspect we will need to implement
PartialEq
(which you may be able to do via#[derive(PartialEq)]
annotation onPhysicalExpr
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I am still new to rust, so sorry for the beginner questions. Thank you for your patience.
Since
PhysicalExpr
is a trait and not a struct or enum I am unable to use#[derive(PartialEq)]
. Additionally I am not able to do the following:pub trait PhysicalExpr: Send + Sync + Display + Debug + PartialEq
. My guess for why this doesn't work is the same reason thederive
isn't working, which is thatPhysicalExpr
isn't a struct or enum. Do you know how to require all objects that implement a trait also implementPatialEq
or is there a different way?There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I would have expected
To be the correct way as it says that everything that implements
PhysicalExpr
also needs to implementPartialEq
. You would then have to#[derive(PartialEq)]
orimpl PartialEq
for all structs that claim toimpl PhysicalExpr
What error did you get when you tried that?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
So I added
PartialEq
and ensured all structs that implPhysicalExpr
alsoimpl PartialEq
. For every occurrence ofArc<dyn PhysicalExpr>
I get the following error, 132 times:It appears that the inclusion of PartialEq makes it so
PhysicalExpr
doesn't play nicely with dynamic dispatch. Due to PartialEq taking a type arg ofSelf
, which can't be determined under dynamic dispatch. I am not sure how to work around this.There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🤔 it seems stackoverflow has some context
Perhap we need to implement our own version of equality for
PhysicalExpr
or something.I am not sure how to proceed, to be honest. @Dandandan any thoughts?