From b340d55ffced4889fc0f2c5c7bc6576f7ddf0f5b Mon Sep 17 00:00:00 2001 From: Dan Harris Date: Sun, 24 Jul 2022 15:41:20 -0400 Subject: [PATCH] Avoid decoding values we do not need --- parquet/src/arrow/array_reader/byte_array.rs | 31 +++++++++++++------- 1 file changed, 21 insertions(+), 10 deletions(-) diff --git a/parquet/src/arrow/array_reader/byte_array.rs b/parquet/src/arrow/array_reader/byte_array.rs index 60489a26b93f..34b38e1be8f2 100644 --- a/parquet/src/arrow/array_reader/byte_array.rs +++ b/parquet/src/arrow/array_reader/byte_array.rs @@ -601,10 +601,16 @@ impl ByteArrayDecoderDelta { /// Decoder from [`Encoding::RLE_DICTIONARY`] to [`OffsetBuffer`] pub struct ByteArrayDecoderDictionary { + /// Decoder for the dictionary offsets array decoder: RleDecoder, + /// We want to decode the offsets in chunks so we will maintain an internal buffer of decoded + /// offsets index_buf: Box<[i32; 1024]>, + /// Current length of `index_buf` index_buf_len: usize, + /// Current offset into `index_buf`. If `index_buf_offset` == `index_buf_len` then we've consumed + /// the entire buffer and need to decode another chunk of offsets. index_offset: usize, /// This is a maximum as the null count is not always known, e.g. value data from @@ -641,6 +647,7 @@ impl ByteArrayDecoderDictionary { while values_read != len && self.max_remaining_values != 0 { if self.index_offset == self.index_buf_len { + // We've consumed the entire index buffer so we need to reload it before proceeding let read = self.decoder.get_batch(self.index_buf.as_mut())?; if read == 0 { break; @@ -680,20 +687,24 @@ impl ByteArrayDecoderDictionary { let mut values_skip = 0; while values_skip < to_skip { if self.index_offset == self.index_buf_len { - let read = self.decoder.get_batch(self.index_buf.as_mut())?; - if read == 0 { + // Instead of reloading the buffer, just skip in the decoder + let skip = self.decoder.skip(to_skip - values_skip)?; + + if skip == 0 { break; } - self.index_buf_len = read; - self.index_offset = 0; - } - let skip = - (to_skip - values_skip).min(self.index_buf_len - self.index_offset); + self.max_remaining_values -= skip; + values_skip += skip; + } else { + // We still have indices buffered, so skip within the buffer + let skip = + (to_skip - values_skip).min(self.index_buf_len - self.index_offset); - self.index_offset += skip; - self.max_remaining_values -= skip; - values_skip += skip; + self.index_offset += skip; + self.max_remaining_values -= skip; + values_skip += skip; + } } Ok(values_skip) }