Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ARROW-11991: [Rust][DataFusion] Maintain partition information in Union #9806

Closed
wants to merge 6 commits into from

Conversation

jacobBaumbach
Copy link

Update output_partitioning for union to preserve Hash. Still needs a test for the hash case.

@github-actions
Copy link

@jacobBaumbach
Copy link
Author

jacobBaumbach commented Mar 26, 2021

@Dandandan do you know of a good example test where the underlying Partitioning is of type Hash? I am having trouble finding one in the codebase, which is preventing me from creating a test for the Hash case. Thanks!

@codecov-io
Copy link

codecov-io commented Mar 26, 2021

Codecov Report

Merging #9806 (22892ba) into master (2b87dfc) will increase coverage by 0.01%.
The diff coverage is 97.10%.

Impacted file tree graph

@@            Coverage Diff             @@
##           master    #9806      +/-   ##
==========================================
+ Coverage   82.68%   82.70%   +0.01%     
==========================================
  Files         256      256              
  Lines       60105    60167      +62     
==========================================
+ Hits        49698    49760      +62     
  Misses      10407    10407              
Impacted Files Coverage Δ
rust/datafusion/src/physical_plan/union.rs 94.11% <97.10%> (+6.61%) ⬆️
rust/parquet/src/encodings/encoding.rs 95.05% <0.00%> (+0.19%) ⬆️

Continue to review full report at Codecov.

Legend - Click here to learn more
Δ = absolute <relative> (impact), ø = not affected, ? = missing data
Powered by Codecov. Last update 2b87dfc...22892ba. Read the comment docs.

@Dandandan
Copy link
Contributor

@Dandandan do you know of a good example test where the underlying Partitioning is of type Hash? I am having trouble finding one in the codebase, which is preventing me from creating a test for the Hash case. Thanks!

You are right, it is pretty new and unused. There is a lower level test for the Hash partitioning but not much else is using it yet.

In the DataFrame api you can pass .repartition(Partitioning::Hash(expr))
See here the relevant code/example for RoundRobinBatch:

/// Repartition a DataFrame based on a logical partitioning scheme.
///
/// ```
/// # use datafusion::prelude::*;
/// # use datafusion::error::Result;
/// # fn main() -> Result<()> {
/// let mut ctx = ExecutionContext::new();
/// let df = ctx.read_csv("tests/example.csv", CsvReadOptions::new())?;
/// let df1 = df.repartition(Partitioning::RoundRobinBatch(4))?;
/// # Ok(())
/// # }
/// ```
fn repartition(
&self,
partitioning_scheme: Partitioning,
) -> Result<Arc<dyn DataFrame>>;

@alamb
Copy link
Contributor

alamb commented Apr 1, 2021

If nothing currently uses hash partitioning, perhaps we can remove it from the enum for now?

@Dandandan
Copy link
Contributor

If nothing currently uses hash partitioning, perhaps we can remove it from the enum for now?

At least @andygrove was planning to add partitioned hash joins for use in Ballista, so the addition to be able to use it in combination with union would be useful?

@andygrove
Copy link
Member

Yes, partitioned hash joins are needed to make Ballista scale with large data sets. I hope to start work on that pretty soon. It would be easier with the donation PR merged.

@jacobBaumbach
Copy link
Author

Sorry for the slow progress. Will work on this tomorrow. I am new to arrow and realized my initial question was poorly worded. After reviewing the code I believe I can set a Hash partition by wrapping the CSVExec in a RepartitionExec, which I can then pass to UnionExec.

@jacobBaumbach
Copy link
Author

@Dandandan Does it make sense to also maintain RoundRobin? Based on my understanding it doesn't seem like there is a value add, but like I mentioned above I am new to the project.

@Dandandan
Copy link
Contributor

@Dandandan Does it make sense to also maintain RoundRobin? Based on my understanding it doesn't seem like there is a value add, but like I mentioned above I am new to the project.

Less valuable I think indeed, but for consistency/debugging/etc I think good to maintain both 👍

@jacobBaumbach
Copy link
Author

Less valuable I think indeed, but for consistency/debugging/etc I think good to maintain both 👍

Awesome. I included it and tested that case. Once I get ci/cd to pass I will mark this as ready. Thank you for your help!

@jacobBaumbach
Copy link
Author

I am hitting the following error in CI/CD

npm WARN tar ENOSPC: no space left on device, write
npm WARN tar ENOSPC: no space left on device, write
npm ERR! code ENOSPC
npm ERR! syscall write
npm ERR! errno -28
npm ERR! nospc ENOSPC: no space left on device, write
npm ERR! nospc There appears to be insufficient space on your system to finish.
npm ERR! nospc Clear up some disk space and try again.

As seen from the logs here

An issue has already been created for this here, which #9879 appears to fix.

Once #9879 is merged I am pretty confident ci/cd will pass.

@jacobBaumbach jacobBaumbach marked this pull request as ready for review April 2, 2021 21:25
@alamb
Copy link
Contributor

alamb commented Apr 3, 2021

@jacobBaumbach the no space left on device error is not caused by this PR

Some(Partitioning::Hash(mut vector_acc, size_acc)),
Partitioning::Hash(vector, size),
) => {
vector_acc.append(&mut vector.clone());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am not sure this is correct -- of the input is hash partitioned by some expression across size_acc partitions, the output will only be hash partitioned if the hash expressions are the same and the number of output partitions is the same, right?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think so too. Maybe just look for the first input (there is always at least 2 in union) and compare with the others to have same expression and same nr of partitions.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just to make sure I am on the same page:

Two Partitioning::Hash may only be composed if they have the same expression and number of partitions. If the two Partitioning::Hash differ in expression and/or number of partitions then it becomes a UnknownPartitioning. Correct?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That is my understanding

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@alamb I am having trouble figuring out how to check for equality between PhysicalExpr. Do I need to implement a PartialEq or is there a more clever way?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🤔 I suspect we will need to implement PartialEq (which you may be able to do via #[derive(PartialEq)] annotation on PhysicalExpr

Copy link
Author

@jacobBaumbach jacobBaumbach Apr 8, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am still new to rust, so sorry for the beginner questions. Thank you for your patience.

Since PhysicalExpr is a trait and not a struct or enum I am unable to use #[derive(PartialEq)]. Additionally I am not able to do the following: pub trait PhysicalExpr: Send + Sync + Display + Debug + PartialEq. My guess for why this doesn't work is the same reason the derive isn't working, which is that PhysicalExpr isn't a struct or enum. Do you know how to require all objects that implement a trait also implement PatialEq or is there a different way?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would have expected

pub trait PhysicalExpr: Send + Sync + Display + Debug + PartialEq

To be the correct way as it says that everything that implements PhysicalExpr also needs to implement PartialEq. You would then have to #[derive(PartialEq)] or impl PartialEq for all structs that claim to impl PhysicalExpr

What error did you get when you tried that?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So I added PartialEq and ensured all structs that impl PhysicalExpr also impl PartialEq. For every occurrence of Arc<dyn PhysicalExpr> I get the following error, 132 times:

error[E0038]: the trait `PhysicalExpr` cannot be made into an object
   --> datafusion/src/physical_plan/udf.rs:95:17
    |
95  |     args: &[Arc<dyn PhysicalExpr>],
    |                 ^^^^^^^^^^^^^^^^ `PhysicalExpr` cannot be made into an object
    |
note: for a trait to be "object safe" it needs to allow building a vtable to allow the call to be resolvable dynamically; for more information visit <https://doc.rust-lang.org/reference/items/traits.html#object-safety>
   --> datafusion/src/physical_plan/mod.rs:188:57
    |
188 | pub trait PhysicalExpr: Send + Sync + Display + Debug + PartialEq {
    |           ------------                                  ^^^^^^^^^ ...because it uses `Self` as a type parameter
    |           |
    |           this trait cannot be made into an object...

It appears that the inclusion of PartialEq makes it so PhysicalExpr doesn't play nicely with dynamic dispatch. Due to PartialEq taking a type arg of Self, which can't be determined under dynamic dispatch. I am not sure how to work around this.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🤔 it seems stackoverflow has some context

Perhap we need to implement our own version of equality for PhysicalExpr or something.

I am not sure how to proceed, to be honest. @Dandandan any thoughts?

(
Some(Partitioning::RoundRobinBatch(size_acc)),
Partitioning::RoundRobinBatch(size),
) => Some(Partitioning::RoundRobinBatch(size_acc + size)),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think the output batch size might be more precisely described as max(size_acc, size)

though I am unsure of the requirements on a operator that produces RoundRobinBatch(1000) -- does that mean all batches must be 1000? The docs on Partitioning are not super clear

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

RoundRobinBatch(1000) means it still has 1000 partitions as output after repartitioning with round robin.

Copy link
Contributor

@Dandandan Dandandan Apr 4, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So it doesn't say anything about this batch size, only he number of partitions.
The implementation of the Union creates the sum of input partitions as output, so each partition can be executed in parallel, so if we have 10 partitions in a and 20 in b, a UNION b will have 30 partitions.

@alamb
Copy link
Contributor

alamb commented Apr 19, 2021

The Apache Arrow Rust community is moving the Rust implementation into its own dedicated github repositories arrow-rs and arrow-datafusion. It is likely we will not merge this PR into this repository

Please see the mailing-list thread for more details

We expect the process to take a few days and will follow up with a migration plan for the in-flight PRs.

@alamb
Copy link
Contributor

alamb commented May 3, 2021

#10096 has removed the arrow implementation from this repository (it now resides in https://github.com/apache/arrow-rs and https://github.com/apache/arrow-datafusion) in the hopes of streamlining the development process

Please re-target this PR (let us know if you need help doing so) to one/both of the new repositories.

Thank you for understanding and helping to make arrow-rs and datafusion better

@alamb alamb closed this May 3, 2021
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

5 participants