-
Notifications
You must be signed in to change notification settings - Fork 839
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Push ChunkReader
into SerializedPageReader
(#2463)
#2464
Push ChunkReader
into SerializedPageReader
(#2463)
#2464
Conversation
let row_count = calculate_row_count( | ||
offset_index, | ||
*seen_num_data_pages, | ||
self.total_num_values, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This was actually previously incorrect, it was using total_num_values, instead of total_row_count. We don't currently support skipping with lists, #2122 but this would cause issues if we did
/// Reads a [`PageHeader`] from the provided [`Read`] returning the number of bytes read | ||
fn read_page_header_len<T: Read>(input: &mut T) -> Result<(usize, PageHeader)> { | ||
/// A wrapper around a [`std::io::Read`] that keeps track of the bytes read | ||
struct TrackedRead<R> { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is kind of ugly, but the length of the header isn't stored anywhere
Ok(()) | ||
} | ||
match &mut self.state { | ||
SerializedPageReaderState::Values {..} =>{ Err(general_err!("Must set page_offset_index when using skip_next_page in SerializedPageReader.")) }, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It should now be possible to read the header, and then skip by just incrementing the offset @Ted-Jiang
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Cool ! 👍
.build() | ||
.unwrap(); | ||
|
||
let desc = ColumnDescriptor::new(Arc::new(t), 0, 0, ColumnPath::new(vec![])); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is somewhat cumbersome, but I opted to take ColumnChunkMetadata in the constructor as it is more future-proof, not to mention slightly more obvious what the values are. In particular it avoids ambiguity over whether values includes nulls (it does).
|
||
let page_len = front.compressed_page_size as usize; | ||
|
||
// TODO: Add ChunkReader get_bytes to potentially avoid copy |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is what will allow removing InMemoryRowGroupReader and just using SerializedPageReader. I intend to do this as a follow up
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You mean InMemoryColumnChunkReader
?
If the idea that we make InMemoryColumnChunk
impl ChunkReader
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think we could just replace InMemoryColumnChunk
with SerializedPageReader<Bytes>
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Cool, I was working on #2426. If you want I can rebase onto this branch and do that as part of that ticket
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
That would be fantastic if you could 👍
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks reasonable to me but I have not been down in this code for a while. I would defer to @thinkharderdev @Ted-Jiang @nevi-me or @sunchao
Thanks for pushing this along @tustvold 👍
7, | ||
Compression::UNCOMPRESSED, | ||
Type::INT32, | ||
Arc::new(Bytes::from(buf)), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I thought Bytes
were already ref counted -- is there any need to wrap this in an additional Arc
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Because the signature needs Arc<T: ChunkReader>
because ChunkReader
doesn't impl Clone
// The file source buffer which references exactly the bytes for the column trunk | ||
// to be read by this page reader. | ||
buf: SerializedPages<T>, | ||
pub struct SerializedPageReader<R: ChunkReader> { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is a non trivial change, right? To make this different than std::io::Read
? But on the other hand the SerializedFileReader
isn't changed -- https://docs.rs/parquet/20.0.0/parquet/file/index.html#example-of-reading-an-existing-file
Maybe to ease the transition we can add some docstring example showing how to create a ChunkReader
from a std::io::Read
? Or maybe it doesn't matter.
I think we should be sensitive and over communicate a change like this in the SerializedPageReader.
@zeevm do you have any comments on this change (especially in regards to your comments on #2394 (comment))?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is a non trivial change, right?
It brings SerializedPageReader into line with all the other readers, so I'm not really sure I agree that this is a major change. I suspect almost all users are using RowGroupReader::get_column_page_reader
and not calling this constructor. Tbh I'm not entirely sure why this method is even public... Perhaps I should take the opportunity to make it crate private as part of slowly reducing the amount of implementation detail that leaks out of the crate?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Perhaps I should take the opportunity to make it crate private as part of slowly reducing the amount of implementation detail that leaks out of the crate?
github codesearch https://cs.github.com/?q=SerializedPageReader%20language%3ARust&scopeName=All%20repos&scope= seems to suggest that most of the uses of this structure are forks of the arrow-rs codebase in various states of divergence.
Not that github codesearch would find all the possible issues, but it is a good sanity check that this isn't widel used
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM!
Ok(()) | ||
} | ||
match &mut self.state { | ||
SerializedPageReaderState::Values {..} =>{ Err(general_err!("Must set page_offset_index when using skip_next_page in SerializedPageReader.")) }, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Cool ! 👍
Ok(result) | ||
} | ||
let decompressor = create_codec(meta.compression())?; | ||
let (start, len) = meta.byte_range(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
i find
/// Returns the offset and length in bytes of the column chunk within the file
pub fn byte_range(&self) -> (u64, u64) {
let col_start = match self.dictionary_page_offset() {
Some(dictionary_page_offset) => dictionary_page_offset,
None => self.data_page_offset(),
};
let col_len = self.compressed_size();
assert!(
col_start >= 0 && col_len >= 0,
"column start and length should not be negative"
);
(col_start as u64, col_len as u64)
}
It may return the dictionary_page_start
, which is not right in checking wether is dictPage
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
That is intentional, we want it to return the dictionary_page_start if there is one as we use this to construct the dictionary page location.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Oh! forgot there is no dict_offset in locations
😂
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ok(result) | ||
} | ||
let decompressor = create_codec(meta.compression())?; | ||
let (start, len) = meta.byte_range(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is not correct, as it will potentially miss the dictionary page. I originally tried to use the presence of the dictionary_page_offset to infer the existence of a dictionary page, but this isn't consistently set. In particular "alltypes_tiny_pages.parquet" does not set the dictionary page offset, and yet the first page is a dictionary page
Benchmark runs are scheduled for baseline = ecc6210 and contender = 63ab69e. 63ab69e is a master commit associated with this PR. Results will be available as each benchmark for each run completes. |
ChunkReader
into SerializedPageReader
(#2463)
Which issue does this PR close?
Closes #2463
Rationale for this change
This pushes
ChunkReader
intoSerializedPageReader
. This not only simplifies the code, but will allow removing theInMemoryColumnChunkReader
inasync_reader.rs
and implementing page skipping the absence of a PageIndex #2460What changes are included in this PR?
Are there any user-facing changes?
This changes the interface of
SerializedPageReader
, which is public, albeit a very low-level API