Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implement RecordBatch::concat #461

Closed
alamb opened this issue Jun 16, 2021 · 10 comments · Fixed by #537
Closed

Implement RecordBatch::concat #461

alamb opened this issue Jun 16, 2021 · 10 comments · Fixed by #537
Labels
arrow Changes to the arrow crate enhancement Any new improvement worthy of a entry in the changelog good first issue Good for newcomers

Comments

@alamb
Copy link
Contributor

alamb commented Jun 16, 2021

Is your feature request related to a problem or challenge? Please describe what you are trying to do.
When implementing an operator that needed to check for partitions across RecordBatch boundaries (in https://github.com/influxdata/influxdb_iox/pull/1733) I found myself wanting to concatenate record batches

In addition, datafusion also needed this code as well: https://github.com/apache/arrow-datafusion/blob/master/datafusion/src/physical_plan/coalesce_batches.rs#L232

Describe the solution you'd like
Propose adding a function like

impl RecordBatch { 
...
  /// concatenate `batches` together into a single record batch
  pub fn concat(batches: &[RecordBatch]) -> Result<RecordBatch>
}

Porting the implementation from DataFusion is probably a good place to start and then adding tests / comments.

@alamb alamb added good first issue Good for newcomers arrow Changes to the arrow crate enhancement Any new improvement worthy of a entry in the changelog labels Jun 16, 2021
@jorgecarleitao
Copy link
Member

Wouldn't it make it more sense to add this to compute as a normal function? Implementing it on the RecordBatch interface is breaking the separation between compute and data that we enforce with the compute module.

@alamb
Copy link
Contributor Author

alamb commented Jul 11, 2021

Wouldn't it make it more sense to add this to compute as a normal function? Implementing it on the RecordBatch interface is breaking the separation between compute and data that we enforce with the compute module.

@jorgecarleitao I think that comes down to "is concat a construction operation or a compute operation"? It could probably be classified as either. I don't have a strong opinion about where the "concat RecordBatch" function goes.

The only other prior art of using RecordBatches in compute kernels I found is pub fn filter_record_batch in filter.rs
`
https://github.com/apache/arrow-rs/blob/master/arrow/src/compute/kernels/filter.rs#L279-L298

We could perhaps add a concat_batches function in compute/kernels/concat.rs

@nevi-me
Copy link
Contributor

nevi-me commented Jul 11, 2021

I'd lean towards RecordBatch::filter and RecordBatch::concat as long as that ends up being a convenience that iterates through all columns, and calls the underlying compute or other function.

Similar to a RecordBatch::slice that calls array.slice internally.

I think this would aid discoverability, in the sense that a person asking "what can I do with a record batch?" would look at impl RecordBatch and discover that easily.

@alamb
Copy link
Contributor Author

alamb commented Jul 11, 2021

I think this would aid discoverability, in the sense that a person asking "what can I do with a record batch?" would look at impl RecordBatch and discover that easily.

I am convinced by this argument 👍

@jorgecarleitao
Copy link
Member

Let me try to explain my reasoning atm.

All methods exposed on Array are O(1). In particular, .slice is O(1) over the array, and thus O(c) over the record where c is the number of fields.

concat over RecordBatch seems rather simple but is O(c * n * r) where c is the number of columns, r the number of records, and n the average length of the records. Since c is trivially parallelizable, I would say that the natural implementation is to actually rayon it, i.e. columns().iter_par()....

Generally, I consider non-parallel iterations over a record to be an anti-pattern, since parallelism over columns is one of the hallmarks of columnar formats. Imo the decision of how to iterate over columns does not belong to arrow-rs, but to Polars, DataFusion and the like. DataFusion uses iter; polars uses iter_par for the most part.

We do have some methods in compute that iter over the RecordBatch that follow this pattern (filter and sort I believe). So, in this context, I would be more inclined to place concat_record at the same level as them: methods that are not O(1) over the arrays' length that some may use when they do not want to commit to a multi-threaded execution. But again, imo this is an anti-pattern that we should not promote, as it enforces a specific threading model over columns.

The reasoning to have it in compute is to not drag compute dependencies to the core modules (I see them as being the datatypes, array, buffer, RecordBatch, alloc). The reason being that compute has a massive compile time when compared to the rest of the crate, and keeping these separated makes it easier to split arrow in two crates (arrow and arrow-compute) to reduce compile and/or binary size. This is minor and can be solved by moving impl RecordBatch to the compute::kernels::concat if the time arises.

@ritchie46
Copy link
Contributor

I think a lean arrow-core crate would be beneficial
We already started some work be feature gating IO logic.

Wouldn't the same be achieved by impl FromIterator<ArrayRef> for RecordBatch? Then users can use the concat kernel and choose how to iter the columns. They also don't pay the compilation penalty until they actually implement that iterator + collect.

@alamb
Copy link
Contributor Author

alamb commented Jul 12, 2021

I think there is a tradeoff between "making the arrow crate accessible (easily usable) for newcomers" and "making it hard for users to write non optimal code"

From where I sit there is nothing about adding RecordBatch::concat or concat_batches that prevents other iteration methods over columns of a RecordBatch if needed (e.g. by polars, etc)

Splitting arrow-core and arrow-compute is a good idea in my opinion -- perhaps it is worthy of a ticket

@alamb
Copy link
Contributor Author

alamb commented Jul 12, 2021

BTW impl FromIterator<ArrayRef> for RecordBatch would need some way to name the fields of the Schema in the created RecordBatch

@pkit
Copy link

pkit commented Jun 15, 2024

@alamb will it be available in pyarrow? Right now it's kinda PITA to concat RecordBatch instances there. It goes like like: from batches -> to table -> to batches again.

@alamb
Copy link
Contributor Author

alamb commented Jun 17, 2024

Hi @pkit 👋 -- pyarrow is developed as part of the arrow mono repo https://github.com/apache/arrow and uses the C++ implementation of Arrow (not the Rust implementation, which is what is in this repo)

Thus, I recommend asking there

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
arrow Changes to the arrow crate enhancement Any new improvement worthy of a entry in the changelog good first issue Good for newcomers
Projects
None yet
Development

Successfully merging a pull request may close this issue.

5 participants