Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implement projection for arrow IPC Reader file / streams #1339

Merged
merged 11 commits into from
Mar 9, 2022

Conversation

Dandandan
Copy link
Contributor

@Dandandan Dandandan commented Feb 19, 2022

Which issue does this PR close?

Closes #1338

Rationale for this change

Projection can avoid and loading it into arrays (this PR), and also could avoid reading it in the first place (not yet implemented).

What changes are included in this PR?

  • Changing the signature to try_new(reader: R, projection: Option<Vec<usize>>)
  • Change read_record_batch to avoid creating arrays for columns in the projection.

We do not yet skip reading the data in the first place.

Are there any user-facing changes?

Yes - adding a second parameter to FileReader::new and StreamReader::new

@github-actions github-actions bot added the arrow Changes to the arrow crate label Feb 19, 2022
@github-actions github-actions bot added the arrow-flight Changes to the arrow-flight crate label Feb 19, 2022
@Dandandan Dandandan marked this pull request as draft February 19, 2022 14:56
@Dandandan Dandandan added the api-change Changes to the arrow API label Feb 19, 2022
@codecov-commenter
Copy link

codecov-commenter commented Feb 19, 2022

Codecov Report

Merging #1339 (0341ad8) into master (f4c7102) will decrease coverage by 0.03%.
The diff coverage is 87.65%.

❗ Current head 0341ad8 differs from pull request most recent head b94ef98. Consider uploading reports for the commit b94ef98 to get more accurate results

Impacted file tree graph

@@            Coverage Diff             @@
##           master    #1339      +/-   ##
==========================================
- Coverage   83.03%   82.99%   -0.04%     
==========================================
  Files         181      181              
  Lines       52949    52985      +36     
==========================================
+ Hits        43965    43975      +10     
- Misses       8984     9010      +26     
Impacted Files Coverage Δ
arrow-flight/src/utils.rs 0.00% <0.00%> (ø)
arrow/src/array/data.rs 83.15% <ø> (-0.15%) ⬇️
arrow/src/array/mod.rs 100.00% <ø> (ø)
arrow/src/array/transform/mod.rs 84.65% <ø> (+0.26%) ⬆️
arrow/src/csv/writer.rs 71.32% <0.00%> (-0.82%) ⬇️
...ntegration-testing/src/bin/arrow-file-to-stream.rs 0.00% <0.00%> (ø)
...ion-testing/src/bin/arrow-json-integration-test.rs 0.00% <0.00%> (ø)
...ntegration-testing/src/bin/arrow-stream-to-file.rs 0.00% <0.00%> (ø)
...ng/src/flight_server_scenarios/integration_test.rs 0.00% <0.00%> (ø)
parquet/src/arrow/array_reader.rs 78.27% <0.00%> (ø)
... and 36 more

Continue to review full report at Codecov.

Legend - Click here to learn more
Δ = absolute <relative> (impact), ø = not affected, ? = missing data
Powered by Codecov. Last update f4c7102...b94ef98. Read the comment docs.

@Dandandan Dandandan marked this pull request as ready for review February 19, 2022 17:39
@Dandandan Dandandan requested review from nevi-me and alamb February 19, 2022 19:18
@Dandandan
Copy link
Contributor Author

Any thoughts @alamb @nevi-me ?

let projection = projection.map(|projection| {
let fields = projection
.iter()
.map(|x| schema.fields[*x].clone())
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is it fine if this panics if x > fields.len()?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good point, I think here needs some extra check on the projection values to avoid panics.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Perhaps we could reuse Schema::project: https://docs.rs/arrow/9.1.0/arrow/datatypes/struct.Schema.html#method.project

(which also handles metadata correctly)

arrow/src/ipc/reader.rs Outdated Show resolved Hide resolved
let projection = projection.map(|projection| {
let fields = projection
.iter()
.map(|x| schema.fields[*x].clone())
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Perhaps we could reuse Schema::project: https://docs.rs/arrow/9.1.0/arrow/datatypes/struct.Schema.html#method.project

(which also handles metadata correctly)

@@ -808,6 +848,9 @@ pub struct StreamReader<R: Read> {
///
/// This value is set to `true` the first time the reader's `next()` returns `None`.
finished: bool,

/// Optional projection
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
/// Optional projection
/// Optional projection and projected schema

@@ -845,11 +888,23 @@ impl<R: Read> StreamReader<R> {
// Create an array of optional dictionary value arrays, one per field.
let dictionaries_by_field = vec![None; schema.fields().len()];

let projection = projection.map(|projection| {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

same here -- Schema::projection might make this code easier to read

@@ -922,7 +977,7 @@ impl<R: Read> StreamReader<R> {
let mut buf = vec![0; message.bodyLength() as usize];
self.reader.read_exact(&mut buf)?;

read_record_batch(&buf, batch, self.schema(), &self.dictionaries_by_field).map(Some)
read_record_batch(&buf, batch, self.schema(), &self.dictionaries_by_field, self.projection.as_ref().map(|x| x.0.as_ref())).map(Some)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

maybe it is worth adding something like

impl<R: Read> StreamReader<R> {
...
  /// get projected schema, if any
  pub fn projected_schema(&self) -> Option<&Schema> {
    ...
  }

@alamb
Copy link
Contributor

alamb commented Feb 28, 2022

FWIW I think better error handling could be added as a follow on PR too

Nice work @Dandandan

@alamb
Copy link
Contributor

alamb commented Mar 6, 2022

I am sorry this one missed the cutoff for arrow 10.0.0; Perhaps we should merge it in and do the cleanups as a follow on PR

Copy link
Member

@jackwener jackwener left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Great job !
use project() and handle error is ok()

arrow/src/ipc/reader.rs Outdated Show resolved Hide resolved
arrow/src/ipc/reader.rs Outdated Show resolved Hide resolved
@Dandandan Dandandan merged commit 4bcc7a6 into apache:master Mar 9, 2022
@alamb alamb changed the title Implement projection for arrow file / streams Implement projection for arrow IPC Reader file / streams Mar 10, 2022
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
api-change Changes to the arrow API arrow Changes to the arrow crate arrow-flight Changes to the arrow-flight crate
Projects
None yet
Development

Successfully merging this pull request may close these issues.

Arrow IPC projection support
5 participants