Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ARROW-11991: [Rust][DataFusion] Maintain partition information in Union #9806

Closed
wants to merge 6 commits into from
130 changes: 118 additions & 12 deletions rust/datafusion/src/physical_plan/union.rs
Original file line number Diff line number Diff line change
Expand Up @@ -60,15 +60,31 @@ impl ExecutionPlan for UnionExec {

/// Output of the union is the combination of all output partitions of the inputs
fn output_partitioning(&self) -> Partitioning {
// Sums all the output partitions
let num_partitions = self
.inputs
let intial: Option<Partitioning> = None;
self.inputs
.iter()
.map(|plan| plan.output_partitioning().partition_count())
.sum();
// TODO: this loses partitioning info in case of same partitioning scheme (for example `Partitioning::Hash`)
// https://issues.apache.org/jira/browse/ARROW-11991
Partitioning::UnknownPartitioning(num_partitions)
.fold(intial, |acc, input| {
match (acc, input.output_partitioning()) {
(None, partition) => Some(partition),
(
Some(Partitioning::Hash(mut vector_acc, size_acc)),
Partitioning::Hash(vector, size),
) => {
vector_acc.append(&mut vector.clone());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am not sure this is correct -- of the input is hash partitioned by some expression across size_acc partitions, the output will only be hash partitioned if the hash expressions are the same and the number of output partitions is the same, right?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think so too. Maybe just look for the first input (there is always at least 2 in union) and compare with the others to have same expression and same nr of partitions.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just to make sure I am on the same page:

Two Partitioning::Hash may only be composed if they have the same expression and number of partitions. If the two Partitioning::Hash differ in expression and/or number of partitions then it becomes a UnknownPartitioning. Correct?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That is my understanding

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@alamb I am having trouble figuring out how to check for equality between PhysicalExpr. Do I need to implement a PartialEq or is there a more clever way?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🤔 I suspect we will need to implement PartialEq (which you may be able to do via #[derive(PartialEq)] annotation on PhysicalExpr

Copy link
Author

@jacobBaumbach jacobBaumbach Apr 8, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am still new to rust, so sorry for the beginner questions. Thank you for your patience.

Since PhysicalExpr is a trait and not a struct or enum I am unable to use #[derive(PartialEq)]. Additionally I am not able to do the following: pub trait PhysicalExpr: Send + Sync + Display + Debug + PartialEq. My guess for why this doesn't work is the same reason the derive isn't working, which is that PhysicalExpr isn't a struct or enum. Do you know how to require all objects that implement a trait also implement PatialEq or is there a different way?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would have expected

pub trait PhysicalExpr: Send + Sync + Display + Debug + PartialEq

To be the correct way as it says that everything that implements PhysicalExpr also needs to implement PartialEq. You would then have to #[derive(PartialEq)] or impl PartialEq for all structs that claim to impl PhysicalExpr

What error did you get when you tried that?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So I added PartialEq and ensured all structs that impl PhysicalExpr also impl PartialEq. For every occurrence of Arc<dyn PhysicalExpr> I get the following error, 132 times:

error[E0038]: the trait `PhysicalExpr` cannot be made into an object
   --> datafusion/src/physical_plan/udf.rs:95:17
    |
95  |     args: &[Arc<dyn PhysicalExpr>],
    |                 ^^^^^^^^^^^^^^^^ `PhysicalExpr` cannot be made into an object
    |
note: for a trait to be "object safe" it needs to allow building a vtable to allow the call to be resolvable dynamically; for more information visit <https://doc.rust-lang.org/reference/items/traits.html#object-safety>
   --> datafusion/src/physical_plan/mod.rs:188:57
    |
188 | pub trait PhysicalExpr: Send + Sync + Display + Debug + PartialEq {
    |           ------------                                  ^^^^^^^^^ ...because it uses `Self` as a type parameter
    |           |
    |           this trait cannot be made into an object...

It appears that the inclusion of PartialEq makes it so PhysicalExpr doesn't play nicely with dynamic dispatch. Due to PartialEq taking a type arg of Self, which can't be determined under dynamic dispatch. I am not sure how to work around this.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🤔 it seems stackoverflow has some context

Perhap we need to implement our own version of equality for PhysicalExpr or something.

I am not sure how to proceed, to be honest. @Dandandan any thoughts?

Some(Partitioning::Hash(vector_acc, size_acc + size))
}
(
Some(Partitioning::RoundRobinBatch(size_acc)),
Partitioning::RoundRobinBatch(size),
) => Some(Partitioning::RoundRobinBatch(size_acc + size)),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think the output batch size might be more precisely described as max(size_acc, size)

though I am unsure of the requirements on a operator that produces RoundRobinBatch(1000) -- does that mean all batches must be 1000? The docs on Partitioning are not super clear

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

RoundRobinBatch(1000) means it still has 1000 partitions as output after repartitioning with round robin.

Copy link
Contributor

@Dandandan Dandandan Apr 4, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So it doesn't say anything about this batch size, only he number of partitions.
The implementation of the Union creates the sum of input partitions as output, so each partition can be executed in parallel, so if we have 10 partitions in a and 20 in b, a UNION b will have 30 partitions.

(Some(partition_acc), partition) => {
Some(Partitioning::UnknownPartitioning(
partition_acc.partition_count() + partition.partition_count(),
))
}
}
})
.unwrap()
}

fn with_new_children(
Expand Down Expand Up @@ -99,17 +115,17 @@ impl ExecutionPlan for UnionExec {
#[cfg(test)]
mod tests {
use super::*;
use crate::physical_plan::expressions::Column;
use crate::physical_plan::{
collect,
csv::{CsvExec, CsvReadOptions},
repartition::RepartitionExec,
};
use crate::test;
use arrow::record_batch::RecordBatch;

#[tokio::test]
async fn test_union_partitions() -> Result<()> {
fn get_csv_exec() -> Result<(CsvExec, CsvExec)> {
let schema = test::aggr_test_schema();

// Create csv's with different partitioning
let path = test::create_partitioned_csv("aggregate_test_100.csv", 4)?;
let path2 = test::create_partitioned_csv("aggregate_test_100.csv", 5)?;
Expand All @@ -129,15 +145,105 @@ mod tests {
1024,
None,
)?;
Ok((csv, csv2))
}

#[tokio::test]
async fn test_union_partitions_unknown() -> Result<()> {
let (csv, csv2) = get_csv_exec()?;

let union_exec = Arc::new(UnionExec::new(vec![Arc::new(csv), Arc::new(csv2)]));

// Should have 9 partitions and 9 output batches
assert_eq!(union_exec.output_partitioning().partition_count(), 9);
assert!(matches!(
union_exec.output_partitioning(),
Partitioning::UnknownPartitioning(9)
));

let result: Vec<RecordBatch> = collect(union_exec).await?;
assert_eq!(result.len(), 9);

Ok(())
}

#[tokio::test]
async fn test_union_partitions_hash() -> Result<()> {
let (csv, csv2) = get_csv_exec()?;
let repartition = RepartitionExec::try_new(
Arc::new(csv),
Partitioning::Hash(vec![Arc::new(Column::new("c1"))], 5),
)?;
let repartition2 = RepartitionExec::try_new(
Arc::new(csv2),
Partitioning::Hash(vec![Arc::new(Column::new("c2"))], 5),
)?;

let union_exec = Arc::new(UnionExec::new(vec![
Arc::new(repartition),
Arc::new(repartition2),
]));

// should be hash, have 10 partitions and 45 output batches
assert!(matches!(
union_exec.output_partitioning(),
Partitioning::Hash(_, 10)
));

let result: Vec<RecordBatch> = collect(union_exec).await?;
assert_eq!(result.len(), 45);

Ok(())
}

#[tokio::test]
async fn test_union_partitions_round_robin() -> Result<()> {
let (csv, csv2) = get_csv_exec()?;
let repartition =
RepartitionExec::try_new(Arc::new(csv), Partitioning::RoundRobinBatch(4))?;
let repartition2 =
RepartitionExec::try_new(Arc::new(csv2), Partitioning::RoundRobinBatch(6))?;

let union_exec = Arc::new(UnionExec::new(vec![
Arc::new(repartition),
Arc::new(repartition2),
]));

// should be hash, have 10 partitions and 9 output batches
assert!(matches!(
union_exec.output_partitioning(),
Partitioning::RoundRobinBatch(10)
));

let result: Vec<RecordBatch> = collect(union_exec).await?;
assert_eq!(result.len(), 9);

Ok(())
}

#[tokio::test]
async fn test_union_partitions_mix() -> Result<()> {
let (csv, csv2) = get_csv_exec()?;
let repartition = RepartitionExec::try_new(
Arc::new(csv),
Partitioning::Hash(vec![Arc::new(Column::new("c1"))], 5),
)?;
let repartition2 =
RepartitionExec::try_new(Arc::new(csv2), Partitioning::RoundRobinBatch(6))?;

let union_exec = Arc::new(UnionExec::new(vec![
Arc::new(repartition),
Arc::new(repartition2),
]));

// should be hash, have 11 partitions and 25 output batches
assert!(matches!(
union_exec.output_partitioning(),
Partitioning::UnknownPartitioning(11)
));

let result: Vec<RecordBatch> = collect(union_exec).await?;
assert_eq!(result.len(), 25);

Ok(())
}
}