-
Notifications
You must be signed in to change notification settings - Fork 3.6k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
ARROW-11991: [Rust][DataFusion] Maintain partition information in Union #9806
Conversation
…test for the hash case.
@Dandandan do you know of a good example test where the underlying |
Codecov Report
@@ Coverage Diff @@
## master #9806 +/- ##
==========================================
+ Coverage 82.68% 82.70% +0.01%
==========================================
Files 256 256
Lines 60105 60167 +62
==========================================
+ Hits 49698 49760 +62
Misses 10407 10407
Continue to review full report at Codecov.
|
You are right, it is pretty new and unused. There is a lower level test for the In the DataFrame api you can pass arrow/rust/datafusion/src/dataframe.rs Lines 190 to 207 in a2a9f5d
|
If nothing currently uses hash partitioning, perhaps we can remove it from the enum for now? |
At least @andygrove was planning to add partitioned hash joins for use in Ballista, so the addition to be able to use it in combination with union would be useful? |
Yes, partitioned hash joins are needed to make Ballista scale with large data sets. I hope to start work on that pretty soon. It would be easier with the donation PR merged. |
Sorry for the slow progress. Will work on this tomorrow. I am new to arrow and realized my initial question was poorly worded. After reviewing the code I believe I can set a |
@Dandandan Does it make sense to also maintain |
Less valuable I think indeed, but for consistency/debugging/etc I think good to maintain both 👍 |
Awesome. I included it and tested that case. Once I get ci/cd to pass I will mark this as ready. Thank you for your help! |
I am hitting the following error in CI/CD
As seen from the logs here An issue has already been created for this here, which #9879 appears to fix. Once #9879 is merged I am pretty confident ci/cd will pass. |
@jacobBaumbach the no space left on device error is not caused by this PR |
Some(Partitioning::Hash(mut vector_acc, size_acc)), | ||
Partitioning::Hash(vector, size), | ||
) => { | ||
vector_acc.append(&mut vector.clone()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I am not sure this is correct -- of the input is hash partitioned by some expression across size_acc
partitions, the output will only be hash partitioned if the hash expressions are the same and the number of output partitions is the same, right?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think so too. Maybe just look for the first input (there is always at least 2 in union) and compare with the others to have same expression and same nr of partitions.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Just to make sure I am on the same page:
Two Partitioning::Hash
may only be composed if they have the same expression and number of partitions. If the two Partitioning::Hash
differ in expression and/or number of partitions then it becomes a UnknownPartitioning
. Correct?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
That is my understanding
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@alamb I am having trouble figuring out how to check for equality between PhysicalExpr
. Do I need to implement a PartialEq
or is there a more clever way?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🤔 I suspect we will need to implement PartialEq
(which you may be able to do via #[derive(PartialEq)]
annotation on PhysicalExpr
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I am still new to rust, so sorry for the beginner questions. Thank you for your patience.
Since PhysicalExpr
is a trait and not a struct or enum I am unable to use #[derive(PartialEq)]
. Additionally I am not able to do the following: pub trait PhysicalExpr: Send + Sync + Display + Debug + PartialEq
. My guess for why this doesn't work is the same reason the derive
isn't working, which is that PhysicalExpr
isn't a struct or enum. Do you know how to require all objects that implement a trait also implement PatialEq
or is there a different way?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I would have expected
pub trait PhysicalExpr: Send + Sync + Display + Debug + PartialEq
To be the correct way as it says that everything that implements PhysicalExpr
also needs to implement PartialEq
. You would then have to #[derive(PartialEq)]
or impl PartialEq
for all structs that claim to impl PhysicalExpr
What error did you get when you tried that?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
So I added PartialEq
and ensured all structs that impl PhysicalExpr
also impl PartialEq
. For every occurrence of Arc<dyn PhysicalExpr>
I get the following error, 132 times:
error[E0038]: the trait `PhysicalExpr` cannot be made into an object
--> datafusion/src/physical_plan/udf.rs:95:17
|
95 | args: &[Arc<dyn PhysicalExpr>],
| ^^^^^^^^^^^^^^^^ `PhysicalExpr` cannot be made into an object
|
note: for a trait to be "object safe" it needs to allow building a vtable to allow the call to be resolvable dynamically; for more information visit <https://doc.rust-lang.org/reference/items/traits.html#object-safety>
--> datafusion/src/physical_plan/mod.rs:188:57
|
188 | pub trait PhysicalExpr: Send + Sync + Display + Debug + PartialEq {
| ------------ ^^^^^^^^^ ...because it uses `Self` as a type parameter
| |
| this trait cannot be made into an object...
It appears that the inclusion of PartialEq makes it so PhysicalExpr
doesn't play nicely with dynamic dispatch. Due to PartialEq taking a type arg of Self
, which can't be determined under dynamic dispatch. I am not sure how to work around this.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🤔 it seems stackoverflow has some context
Perhap we need to implement our own version of equality for PhysicalExpr
or something.
I am not sure how to proceed, to be honest. @Dandandan any thoughts?
( | ||
Some(Partitioning::RoundRobinBatch(size_acc)), | ||
Partitioning::RoundRobinBatch(size), | ||
) => Some(Partitioning::RoundRobinBatch(size_acc + size)), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think the output batch size might be more precisely described as max(size_acc, size)
though I am unsure of the requirements on a operator that produces RoundRobinBatch(1000)
-- does that mean all batches must be 1000? The docs on Partitioning
are not super clear
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
RoundRobinBatch(1000)
means it still has 1000 partitions as output after repartitioning with round robin.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
So it doesn't say anything about this batch size, only he number of partitions.
The implementation of the Union creates the sum of input partitions as output, so each partition can be executed in parallel, so if we have 10 partitions in a
and 20 in b
, a UNION b
will have 30 partitions.
The Apache Arrow Rust community is moving the Rust implementation into its own dedicated github repositories arrow-rs and arrow-datafusion. It is likely we will not merge this PR into this repository Please see the mailing-list thread for more details We expect the process to take a few days and will follow up with a migration plan for the in-flight PRs. |
#10096 has removed the arrow implementation from this repository (it now resides in https://github.com/apache/arrow-rs and https://github.com/apache/arrow-datafusion) in the hopes of streamlining the development process Please re-target this PR (let us know if you need help doing so) to one/both of the new repositories. Thank you for understanding and helping to make arrow-rs and datafusion better |
Update output_partitioning for union to preserve Hash. Still needs a test for the hash case.