-
Notifications
You must be signed in to change notification settings - Fork 839
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Implement projection for arrow IPC Reader
file / streams
#1339
Conversation
Codecov Report
@@ Coverage Diff @@
## master #1339 +/- ##
==========================================
- Coverage 83.03% 82.99% -0.04%
==========================================
Files 181 181
Lines 52949 52985 +36
==========================================
+ Hits 43965 43975 +10
- Misses 8984 9010 +26
Continue to review full report at Codecov.
|
arrow/src/ipc/reader.rs
Outdated
let projection = projection.map(|projection| { | ||
let fields = projection | ||
.iter() | ||
.map(|x| schema.fields[*x].clone()) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is it fine if this panics if x > fields.len()
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good point, I think here needs some extra check on the projection values to avoid panics.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Perhaps we could reuse Schema::project
: https://docs.rs/arrow/9.1.0/arrow/datatypes/struct.Schema.html#method.project
(which also handles metadata correctly)
arrow/src/ipc/reader.rs
Outdated
let projection = projection.map(|projection| { | ||
let fields = projection | ||
.iter() | ||
.map(|x| schema.fields[*x].clone()) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Perhaps we could reuse Schema::project
: https://docs.rs/arrow/9.1.0/arrow/datatypes/struct.Schema.html#method.project
(which also handles metadata correctly)
@@ -808,6 +848,9 @@ pub struct StreamReader<R: Read> { | |||
/// | |||
/// This value is set to `true` the first time the reader's `next()` returns `None`. | |||
finished: bool, | |||
|
|||
/// Optional projection |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
/// Optional projection | |
/// Optional projection and projected schema |
arrow/src/ipc/reader.rs
Outdated
@@ -845,11 +888,23 @@ impl<R: Read> StreamReader<R> { | |||
// Create an array of optional dictionary value arrays, one per field. | |||
let dictionaries_by_field = vec![None; schema.fields().len()]; | |||
|
|||
let projection = projection.map(|projection| { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
same here -- Schema::projection
might make this code easier to read
@@ -922,7 +977,7 @@ impl<R: Read> StreamReader<R> { | |||
let mut buf = vec![0; message.bodyLength() as usize]; | |||
self.reader.read_exact(&mut buf)?; | |||
|
|||
read_record_batch(&buf, batch, self.schema(), &self.dictionaries_by_field).map(Some) | |||
read_record_batch(&buf, batch, self.schema(), &self.dictionaries_by_field, self.projection.as_ref().map(|x| x.0.as_ref())).map(Some) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
maybe it is worth adding something like
impl<R: Read> StreamReader<R> {
...
/// get projected schema, if any
pub fn projected_schema(&self) -> Option<&Schema> {
...
}
FWIW I think better error handling could be added as a follow on PR too Nice work @Dandandan |
Co-authored-by: Andrew Lamb <[email protected]>
I am sorry this one missed the cutoff for arrow 10.0.0; Perhaps we should merge it in and do the cleanups as a follow on PR |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Great job !
use project()
and handle error is ok()
…into arrow_ipc_projection
IPC Reader
file / streams
Which issue does this PR close?
Closes #1338
Rationale for this change
Projection can avoid and loading it into arrays (this PR), and also could avoid reading it in the first place (not yet implemented).
What changes are included in this PR?
try_new(reader: R, projection: Option<Vec<usize>>)
read_record_batch
to avoid creating arrays for columns in the projection.We do not yet skip reading the data in the first place.
Are there any user-facing changes?
Yes - adding a second parameter to
FileReader::new
andStreamReader::new