Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Avoid decoding unneeded values in ByteArrayDecoderDictionary #2154

Merged
merged 1 commit into from
Jul 25, 2022
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
31 changes: 21 additions & 10 deletions parquet/src/arrow/array_reader/byte_array.rs
Original file line number Diff line number Diff line change
Expand Up @@ -601,10 +601,16 @@ impl ByteArrayDecoderDelta {

/// Decoder from [`Encoding::RLE_DICTIONARY`] to [`OffsetBuffer`]
pub struct ByteArrayDecoderDictionary {
/// Decoder for the dictionary offsets array
decoder: RleDecoder,

/// We want to decode the offsets in chunks so we will maintain an internal buffer of decoded
/// offsets
index_buf: Box<[i32; 1024]>,
/// Current length of `index_buf`
index_buf_len: usize,
/// Current offset into `index_buf`. If `index_buf_offset` == `index_buf_len` then we've consumed
/// the entire buffer and need to decode another chunk of offsets.
index_offset: usize,

/// This is a maximum as the null count is not always known, e.g. value data from
Expand Down Expand Up @@ -641,6 +647,7 @@ impl ByteArrayDecoderDictionary {

while values_read != len && self.max_remaining_values != 0 {
if self.index_offset == self.index_buf_len {
// We've consumed the entire index buffer so we need to reload it before proceeding
let read = self.decoder.get_batch(self.index_buf.as_mut())?;
if read == 0 {
break;
Expand Down Expand Up @@ -680,20 +687,24 @@ impl ByteArrayDecoderDictionary {
let mut values_skip = 0;
while values_skip < to_skip {
if self.index_offset == self.index_buf_len {
let read = self.decoder.get_batch(self.index_buf.as_mut())?;
if read == 0 {
// Instead of reloading the buffer, just skip in the decoder
let skip = self.decoder.skip(to_skip - values_skip)?;

if skip == 0 {
break;
}
self.index_buf_len = read;
self.index_offset = 0;
}

let skip =
(to_skip - values_skip).min(self.index_buf_len - self.index_offset);
self.max_remaining_values -= skip;
values_skip += skip;
} else {
// We still have indices buffered, so skip within the buffer
let skip =
(to_skip - values_skip).min(self.index_buf_len - self.index_offset);

self.index_offset += skip;
self.max_remaining_values -= skip;
values_skip += skip;
self.index_offset += skip;
self.max_remaining_values -= skip;
values_skip += skip;
}
}
Ok(values_skip)
}
Expand Down