From cc4a0dfc64ea9f3fff3de1597042e5ea3f66cc73 Mon Sep 17 00:00:00 2001 From: Gijs Burghoorn Date: Wed, 26 Jun 2024 15:33:49 +0200 Subject: [PATCH] perf(rust): faster decode on Parquet HybridRLE (#17208) Co-authored-by: Ritchie Vink --- .../arrow/read/deserialize/binary/decoders.rs | 14 +- .../arrow/read/deserialize/dictionary/mod.rs | 28 +- .../read/deserialize/dictionary/nested.rs | 14 +- .../deserialize/fixed_size_binary/basic.rs | 8 +- .../arrow/read/deserialize/nested_utils.rs | 14 +- .../arrow/read/deserialize/primitive/basic.rs | 4 +- .../src/arrow/read/deserialize/utils.rs | 13 +- .../src/parquet/encoding/bitpacked/decode.rs | 160 +++++++--- .../src/parquet/encoding/bitpacked/mod.rs | 2 +- .../src/parquet/encoding/bitpacked/unpack.rs | 9 + .../encoding/delta_bitpacked/decoder.rs | 14 +- .../parquet/encoding/hybrid_rle/decoder.rs | 4 +- .../src/parquet/encoding/hybrid_rle/mod.rs | 286 ++++++++++++------ .../polars/tests/it/io/parquet/read/binary.rs | 2 + .../tests/it/io/parquet/read/fixed_binary.rs | 2 + .../tests/it/io/parquet/read/primitive.rs | 6 +- .../it/io/parquet/read/primitive_nested.rs | 17 +- .../tests/it/io/parquet/read/struct_.rs | 6 +- .../polars/tests/it/io/parquet/read/utils.rs | 22 +- 19 files changed, 415 insertions(+), 210 deletions(-) diff --git a/crates/polars-parquet/src/arrow/read/deserialize/binary/decoders.rs b/crates/polars-parquet/src/arrow/read/deserialize/binary/decoders.rs index a9e373633165..4593fe16eea8 100644 --- a/crates/polars-parquet/src/arrow/read/deserialize/binary/decoders.rs +++ b/crates/polars-parquet/src/arrow/read/deserialize/binary/decoders.rs @@ -179,13 +179,13 @@ impl<'a> FilteredDelta<'a> { #[derive(Debug)] pub(crate) struct RequiredDictionary<'a> { - pub values: hybrid_rle::HybridRleDecoder<'a>, + pub values: hybrid_rle::BufferedHybridRleDecoderIter<'a>, pub dict: &'a BinaryDict, } impl<'a> RequiredDictionary<'a> { pub fn try_new(page: &'a DataPage, dict: &'a BinaryDict) -> PolarsResult { - let values = utils::dict_indices_decoder(page)?; + let values = utils::dict_indices_decoder(page)?.into_iter(); Ok(Self { dict, values }) } @@ -198,13 +198,13 @@ impl<'a> RequiredDictionary<'a> { #[derive(Debug)] pub(crate) struct FilteredRequiredDictionary<'a> { - pub values: SliceFilteredIter>, + pub values: SliceFilteredIter>, pub dict: &'a BinaryDict, } impl<'a> FilteredRequiredDictionary<'a> { pub fn try_new(page: &'a DataPage, dict: &'a BinaryDict) -> PolarsResult { - let values = utils::dict_indices_decoder(page)?; + let values = utils::dict_indices_decoder(page)?.into_iter(); let rows = get_selected_rows(page); let values = SliceFilteredIter::new(values, rows); @@ -220,20 +220,20 @@ impl<'a> FilteredRequiredDictionary<'a> { #[derive(Debug)] pub(crate) struct ValuesDictionary<'a> { - pub values: hybrid_rle::HybridRleDecoder<'a>, + pub values: hybrid_rle::BufferedHybridRleDecoderIter<'a>, pub dict: &'a BinaryDict, } impl<'a> ValuesDictionary<'a> { pub fn try_new(page: &'a DataPage, dict: &'a BinaryDict) -> PolarsResult { - let values = utils::dict_indices_decoder(page)?; + let values = utils::dict_indices_decoder(page)?.into_iter(); Ok(Self { dict, values }) } #[inline] pub fn len(&self) -> usize { - self.values.size_hint().0 + self.values.len() } } diff --git a/crates/polars-parquet/src/arrow/read/deserialize/dictionary/mod.rs b/crates/polars-parquet/src/arrow/read/deserialize/dictionary/mod.rs index f9401771706c..710b30fe0593 100644 --- a/crates/polars-parquet/src/arrow/read/deserialize/dictionary/mod.rs +++ b/crates/polars-parquet/src/arrow/read/deserialize/dictionary/mod.rs @@ -5,6 +5,9 @@ use std::collections::VecDeque; use arrow::array::{Array, DictionaryArray, DictionaryKey, PrimitiveArray}; use arrow::bitmap::MutableBitmap; use arrow::datatypes::ArrowDataType; +pub use nested::next_dict as nested_next_dict; +use polars_error::{polars_err, PolarsResult}; +use polars_utils::iter::FallibleIterator; use super::utils::{ self, dict_indices_decoder, extend_from_decoder, get_selected_rows, DecodedState, Decoder, @@ -12,7 +15,7 @@ use super::utils::{ }; use super::PagesIter; use crate::parquet::deserialize::SliceFilteredIter; -use crate::parquet::encoding::hybrid_rle::HybridRleDecoder; +use crate::parquet::encoding::hybrid_rle::BufferedHybridRleDecoderIter; use crate::parquet::encoding::Encoding; use crate::parquet::page::{DataPage, DictPage, Page}; use crate::parquet::schema::Repetition; @@ -23,29 +26,32 @@ pub enum State<'a> { Optional(Optional<'a>), Required(Required<'a>), FilteredRequired(FilteredRequired<'a>), - FilteredOptional(FilteredOptionalPageValidity<'a>, HybridRleDecoder<'a>), + FilteredOptional( + FilteredOptionalPageValidity<'a>, + BufferedHybridRleDecoderIter<'a>, + ), } #[derive(Debug)] pub struct Required<'a> { - values: HybridRleDecoder<'a>, + values: BufferedHybridRleDecoderIter<'a>, } impl<'a> Required<'a> { fn try_new(page: &'a DataPage) -> PolarsResult { - let values = dict_indices_decoder(page)?; + let values = dict_indices_decoder(page)?.into_iter(); Ok(Self { values }) } } #[derive(Debug)] pub struct FilteredRequired<'a> { - values: SliceFilteredIter>, + values: SliceFilteredIter>, } impl<'a> FilteredRequired<'a> { fn try_new(page: &'a DataPage) -> PolarsResult { - let values = dict_indices_decoder(page)?; + let values = dict_indices_decoder(page)?.into_iter(); let rows = get_selected_rows(page); let values = SliceFilteredIter::new(values, rows); @@ -56,13 +62,13 @@ impl<'a> FilteredRequired<'a> { #[derive(Debug)] pub struct Optional<'a> { - values: HybridRleDecoder<'a>, + values: BufferedHybridRleDecoderIter<'a>, validity: OptionalPageValidity<'a>, } impl<'a> Optional<'a> { fn try_new(page: &'a DataPage) -> PolarsResult { - let values = dict_indices_decoder(page)?; + let values = dict_indices_decoder(page)?.into_iter(); Ok(Self { values, @@ -132,7 +138,7 @@ where (Encoding::PlainDictionary | Encoding::RleDictionary, true, true) => { Ok(State::FilteredOptional( FilteredOptionalPageValidity::try_new(page)?, - dict_indices_decoder(page)?, + dict_indices_decoder(page)?.into_iter(), )) }, _ => Err(utils::not_implemented(page)), @@ -316,7 +322,3 @@ pub(super) fn next_dict Box< }, } } - -pub use nested::next_dict as nested_next_dict; -use polars_error::{polars_err, PolarsResult}; -use polars_utils::iter::FallibleIterator; diff --git a/crates/polars-parquet/src/arrow/read/deserialize/dictionary/nested.rs b/crates/polars-parquet/src/arrow/read/deserialize/dictionary/nested.rs index 8e75eebaff10..f6ec4093c174 100644 --- a/crates/polars-parquet/src/arrow/read/deserialize/dictionary/nested.rs +++ b/crates/polars-parquet/src/arrow/read/deserialize/dictionary/nested.rs @@ -10,7 +10,7 @@ use super::super::super::PagesIter; use super::super::nested_utils::*; use super::super::utils::{dict_indices_decoder, not_implemented, MaybeNext, PageState}; use super::finish_key; -use crate::parquet::encoding::hybrid_rle::HybridRleDecoder; +use crate::parquet::encoding::hybrid_rle::BufferedHybridRleDecoderIter; use crate::parquet::encoding::Encoding; use crate::parquet::page::{DataPage, DictPage, Page}; use crate::parquet::schema::Repetition; @@ -18,13 +18,13 @@ use crate::parquet::schema::Repetition; // The state of a required DataPage with a boolean physical type #[derive(Debug)] pub struct Required<'a> { - values: HybridRleDecoder<'a>, + values: BufferedHybridRleDecoderIter<'a>, length: usize, } impl<'a> Required<'a> { fn try_new(page: &'a DataPage) -> PolarsResult { - let values = dict_indices_decoder(page)?; + let values = dict_indices_decoder(page)?.into_iter(); let length = page.num_values(); Ok(Self { values, length }) } @@ -34,7 +34,7 @@ impl<'a> Required<'a> { #[allow(clippy::large_enum_variant)] #[derive(Debug)] pub enum State<'a> { - Optional(HybridRleDecoder<'a>), + Optional(BufferedHybridRleDecoderIter<'a>), Required(Required<'a>), } @@ -89,7 +89,9 @@ impl<'a, K: DictionaryKey> NestedDecoder<'a> for DictionaryDecoder { match (page.encoding(), is_optional, is_filtered) { (Encoding::RleDictionary | Encoding::PlainDictionary, true, false) => { - dict_indices_decoder(page).map(State::Optional) + dict_indices_decoder(page) + .map(|v| v.into_iter()) + .map(State::Optional) }, (Encoding::RleDictionary | Encoding::PlainDictionary, false, false) => { Required::try_new(page).map(State::Required) @@ -161,7 +163,7 @@ pub fn next_dict Box MaybeNext::Some(Err(e.into())), Ok(Some(page)) => { - let (page, dict) = match (&dict, page) { + let (page, dict) = match (&dict, &page) { (None, Page::Data(_)) => { return MaybeNext::Some(Err(polars_err!(ComputeError: "not implemented: dictionary arrays from non-dict-encoded pages", diff --git a/crates/polars-parquet/src/arrow/read/deserialize/fixed_size_binary/basic.rs b/crates/polars-parquet/src/arrow/read/deserialize/fixed_size_binary/basic.rs index 97572dfd7cc4..f09fb2011be0 100644 --- a/crates/polars-parquet/src/arrow/read/deserialize/fixed_size_binary/basic.rs +++ b/crates/polars-parquet/src/arrow/read/deserialize/fixed_size_binary/basic.rs @@ -84,13 +84,13 @@ impl<'a> FilteredRequired<'a> { #[derive(Debug)] pub(super) struct RequiredDictionary<'a> { - pub values: hybrid_rle::HybridRleDecoder<'a>, + pub values: hybrid_rle::BufferedHybridRleDecoderIter<'a>, pub dict: &'a Dict, } impl<'a> RequiredDictionary<'a> { pub(super) fn try_new(page: &'a DataPage, dict: &'a Dict) -> PolarsResult { - let values = dict_indices_decoder(page)?; + let values = dict_indices_decoder(page)?.into_iter(); Ok(Self { dict, values }) } @@ -103,14 +103,14 @@ impl<'a> RequiredDictionary<'a> { #[derive(Debug)] pub(super) struct OptionalDictionary<'a> { - pub(super) values: hybrid_rle::HybridRleDecoder<'a>, + pub(super) values: hybrid_rle::BufferedHybridRleDecoderIter<'a>, pub(super) validity: OptionalPageValidity<'a>, pub(super) dict: &'a Dict, } impl<'a> OptionalDictionary<'a> { pub(super) fn try_new(page: &'a DataPage, dict: &'a Dict) -> PolarsResult { - let values = dict_indices_decoder(page)?; + let values = dict_indices_decoder(page)?.into_iter(); Ok(Self { values, diff --git a/crates/polars-parquet/src/arrow/read/deserialize/nested_utils.rs b/crates/polars-parquet/src/arrow/read/deserialize/nested_utils.rs index a73f95f5b4ae..395ea7f8e253 100644 --- a/crates/polars-parquet/src/arrow/read/deserialize/nested_utils.rs +++ b/crates/polars-parquet/src/arrow/read/deserialize/nested_utils.rs @@ -1,4 +1,5 @@ use std::collections::VecDeque; +use std::iter::{Peekable, Zip}; use arrow::array::Array; use arrow::bitmap::MutableBitmap; @@ -7,7 +8,7 @@ use polars_utils::slice::GetSaferUnchecked; use super::super::PagesIter; use super::utils::{DecodedState, MaybeNext, PageState}; -use crate::parquet::encoding::hybrid_rle::HybridRleDecoder; +use crate::parquet::encoding::hybrid_rle::{BufferedHybridRleDecoderIter, HybridRleDecoder}; use crate::parquet::page::{split_buffer, DataPage, DictPage, Page}; use crate::parquet::read::levels::get_bit_width; @@ -238,7 +239,7 @@ pub fn init_nested(init: &[InitNested], capacity: usize) -> NestedState { } pub struct NestedPage<'a> { - iter: std::iter::Peekable, HybridRleDecoder<'a>>>, + iter: Peekable, BufferedHybridRleDecoderIter<'a>>>, } impl<'a> NestedPage<'a> { @@ -251,9 +252,12 @@ impl<'a> NestedPage<'a> { let max_def_level = page.descriptor.max_def_level; let reps = - HybridRleDecoder::try_new(rep_levels, get_bit_width(max_rep_level), page.num_values())?; + HybridRleDecoder::new(rep_levels, get_bit_width(max_rep_level), page.num_values()); let defs = - HybridRleDecoder::try_new(def_levels, get_bit_width(max_def_level), page.num_values())?; + HybridRleDecoder::new(def_levels, get_bit_width(max_def_level), page.num_values()); + + let reps = reps.into_iter(); + let defs = defs.into_iter(); let iter = reps.zip(defs).peekable(); @@ -523,7 +527,7 @@ where } }, Ok(Some(page)) => { - let page = match page { + let page = match &page { Page::Data(page) => page, Page::Dict(dict_page) => { *dict = Some(decoder.deserialize_dict(dict_page)); diff --git a/crates/polars-parquet/src/arrow/read/deserialize/primitive/basic.rs b/crates/polars-parquet/src/arrow/read/deserialize/primitive/basic.rs index a9f32191ada0..4c3655e0a01a 100644 --- a/crates/polars-parquet/src/arrow/read/deserialize/primitive/basic.rs +++ b/crates/polars-parquet/src/arrow/read/deserialize/primitive/basic.rs @@ -65,7 +65,7 @@ pub(super) struct ValuesDictionary<'a, T> where T: NativeType, { - pub values: hybrid_rle::HybridRleDecoder<'a>, + pub values: hybrid_rle::BufferedHybridRleDecoderIter<'a>, pub dict: &'a Vec, } @@ -74,7 +74,7 @@ where T: NativeType, { pub fn try_new(page: &'a DataPage, dict: &'a Vec) -> PolarsResult { - let values = utils::dict_indices_decoder(page)?; + let values = utils::dict_indices_decoder(page)?.into_iter(); Ok(Self { dict, values }) } diff --git a/crates/polars-parquet/src/arrow/read/deserialize/utils.rs b/crates/polars-parquet/src/arrow/read/deserialize/utils.rs index 951203535557..23e45bece122 100644 --- a/crates/polars-parquet/src/arrow/read/deserialize/utils.rs +++ b/crates/polars-parquet/src/arrow/read/deserialize/utils.rs @@ -3,7 +3,7 @@ use std::collections::VecDeque; use arrow::bitmap::utils::BitmapIter; use arrow::bitmap::MutableBitmap; use arrow::pushable::Pushable; -use polars_error::{polars_err, to_compute_err, PolarsError, PolarsResult}; +use polars_error::{polars_err, PolarsError, PolarsResult}; use super::super::PagesIter; use crate::parquet::deserialize::{ @@ -424,8 +424,8 @@ pub(super) fn next<'a, I: PagesIter, D: Decoder<'a>>( Err(e) => MaybeNext::Some(Err(e.into())), Ok(Some(page)) => { let page = match page { - Page::Data(page) => page, - Page::Dict(dict_page) => { + Page::Data(ref page) => page, + Page::Dict(ref dict_page) => { *dict = Some(decoder.deserialize_dict(dict_page)); return MaybeNext::More; }, @@ -472,8 +472,11 @@ pub(super) fn dict_indices_decoder(page: &DataPage) -> PolarsResult bool { diff --git a/crates/polars-parquet/src/parquet/encoding/bitpacked/decode.rs b/crates/polars-parquet/src/parquet/encoding/bitpacked/decode.rs index 838c4ed1f207..d3361f0f44c0 100644 --- a/crates/polars-parquet/src/parquet/encoding/bitpacked/decode.rs +++ b/crates/polars-parquet/src/parquet/encoding/bitpacked/decode.rs @@ -8,9 +8,36 @@ use crate::parquet::error::ParquetError; pub struct Decoder<'a, T: Unpackable> { packed: std::slice::Chunks<'a, u8>, num_bits: usize, - remaining: usize, // in number of items - current_pack_index: usize, // invariant: < T::PACK_LENGTH - unpacked: T::Unpacked, // has the current unpacked values. + /// number of items + length: usize, + _pd: std::marker::PhantomData, +} + +#[derive(Debug)] +pub struct DecoderIter { + buffer: Vec, + idx: usize, +} + +impl Iterator for DecoderIter { + type Item = T; + + fn next(&mut self) -> Option { + if self.idx >= self.buffer.len() { + return None; + } + + let value = self.buffer[self.idx]; + self.idx += 1; + + Some(value) + } + + fn size_hint(&self) -> (usize, Option) { + let len = self.buffer.len() - self.idx; + + (len, Some(len)) + } } #[inline] @@ -26,11 +53,18 @@ fn decode_pack(packed: &[u8], num_bits: usize, unpacked: &mut T:: impl<'a, T: Unpackable> Decoder<'a, T> { /// Returns a [`Decoder`] with `T` encoded in `packed` with `num_bits`. - pub fn try_new( - packed: &'a [u8], - num_bits: usize, - mut length: usize, - ) -> Result { + pub fn new(packed: &'a [u8], num_bits: usize, length: usize) -> Self { + Self::try_new(packed, num_bits, length).unwrap() + } + + pub fn collect_into_iter(self) -> DecoderIter { + let mut buffer = Vec::new(); + self.collect_into(&mut buffer); + DecoderIter { buffer, idx: 0 } + } + + /// Returns a [`Decoder`] with `T` encoded in `packed` with `num_bits`. + pub fn try_new(packed: &'a [u8], num_bits: usize, length: usize) -> Result { let block_size = std::mem::size_of::() * num_bits; if num_bits == 0 { @@ -44,47 +78,77 @@ impl<'a, T: Unpackable> Decoder<'a, T> { ))); } - let mut packed = packed.chunks(block_size); - let mut unpacked = T::Unpacked::zero(); - if let Some(chunk) = packed.next() { - decode_pack::(chunk, num_bits, &mut unpacked); - } else { - length = 0 - }; + let packed = packed.chunks(block_size); Ok(Self { - remaining: length, + length, packed, num_bits, - unpacked, - current_pack_index: 0, + _pd: Default::default(), }) } } -impl<'a, T: Unpackable> Iterator for Decoder<'a, T> { - type Item = T; +impl<'a, T: Unpackable> Decoder<'a, T> { + pub fn collect_into(mut self, vec: &mut Vec) { + // @NOTE: + // When microbenchmarking changing this from a element-wise iterator to a collect into + // improves the speed by around 4x. + // + // The unsafe code here allows us to not have to do a double memcopy. This saves us 20% in + // our microbenchmark. + // + // GB: I did some profiling on this function using the Yellow NYC Taxi dataset. There, the + // average self.length is ~52.8 and the average num_packs is ~2.2. Let this guide your + // decisions surrounding the optimization of this function. - #[inline] // -71% improvement in bench - fn next(&mut self) -> Option { - if self.remaining == 0 { - return None; - } - let result = self.unpacked[self.current_pack_index]; - self.current_pack_index += 1; - self.remaining -= 1; - if self.current_pack_index == T::Unpacked::LENGTH { - if let Some(packed) = self.packed.next() { - decode_pack::(packed, self.num_bits, &mut self.unpacked); - self.current_pack_index = 0; - } + // @NOTE: + // Since T::Unpacked::LENGTH is always a power of two and known at compile time. Division, + // modulo and multiplication are just trivial operators. + let num_packs = (self.length / T::Unpacked::LENGTH) + + usize::from(self.length % T::Unpacked::LENGTH != 0); + + // We reserve enough space here for self.length rounded up to the next multiple of + // T::Unpacked::LENGTH so that we can safely just write into that memory. Otherwise, we + // would have to make a special path where we memcopy twice which is less than ideal. + vec.reserve(num_packs * T::Unpacked::LENGTH); + + // IMPORTANT: This pointer calculation has to appear after the reserve since that reserve + // might move the buffer. + let mut unpacked_ptr = vec.as_mut_ptr().wrapping_add(vec.len()); + + for _ in 0..num_packs { + // This unwrap should never fail since the packed length is checked on initialized of + // the `Decoder`. + let packed = self.packed.next().unwrap(); + + // SAFETY: + // Since we did a `vec::reserve` before with the total length, we know that the memory + // necessary for a `T::Unpacked` is available. + // + // - The elements in this buffer are properly aligned, so elements in a slice will also + // be properly aligned. + // - It is deferencable because it is (i) not null, (ii) in one allocated object, (iii) + // not pointing to deallocated memory, (iv) we do not rely on atomicity and (v) we do + // not read or write beyond the lifetime of `vec`. + // - All data is initialized before reading it. This is not perfect but should not lead + // to any UB. + // - We don't alias the same data from anywhere else at the same time, because we have + // the mutable reference to `vec`. + let unpacked_ref = unsafe { (unpacked_ptr as *mut T::Unpacked).as_mut() }.unwrap(); + + decode_pack::(packed, self.num_bits, unpacked_ref); + + unpacked_ptr = unpacked_ptr.wrapping_add(T::Unpacked::LENGTH); } - Some(result) - } - #[inline] - fn size_hint(&self) -> (usize, Option) { - (self.remaining, Some(self.remaining)) + // SAFETY: + // We have written these elements before so we know that these are available now. + // + // - The capacity is larger since we reserved enough spaced with the opening + // `vec::reserve`. + // - All elements are initialized by the `decode_pack` into the `unpacked_ref`. + unsafe { vec.set_len(vec.len() + self.length) } } } @@ -93,6 +157,14 @@ mod tests { use super::super::tests::case1; use super::*; + impl<'a, T: Unpackable> Decoder<'a, T> { + pub fn collect(self) -> Vec { + let mut vec = Vec::new(); + self.collect_into(&mut vec); + vec + } + } + #[test] fn test_decode_rle() { // Test data: 0-7 with bit width 3 @@ -111,7 +183,7 @@ mod tests { let decoded = Decoder::::try_new(&data, num_bits, length) .unwrap() - .collect::>(); + .collect(); assert_eq!(decoded, vec![0, 1, 2, 3, 4, 5, 6, 7]); } @@ -121,7 +193,7 @@ mod tests { let decoded = Decoder::::try_new(&data, num_bits, expected.len()) .unwrap() - .collect::>(); + .collect(); assert_eq!(decoded, expected); } @@ -133,7 +205,7 @@ mod tests { let decoded = Decoder::::try_new(&data, num_bits, length) .unwrap() - .collect::>(); + .collect(); assert_eq!(decoded, vec![0, 1, 0, 1, 0, 1, 0, 1]); } @@ -145,7 +217,7 @@ mod tests { let decoded = Decoder::::try_new(&data, num_bits, length) .unwrap() - .collect::>(); + .collect(); assert_eq!(decoded, vec![0, 1, 0, 1, 0, 1, 0, 1]); } @@ -169,7 +241,7 @@ mod tests { let decoded = Decoder::::try_new(&data, num_bits, length) .unwrap() - .collect::>(); + .collect(); assert_eq!(decoded, expected); } @@ -195,7 +267,7 @@ mod tests { let decoded = Decoder::::try_new(&data, num_bits, length) .unwrap() - .collect::>(); + .collect(); assert_eq!(decoded, expected); } diff --git a/crates/polars-parquet/src/parquet/encoding/bitpacked/mod.rs b/crates/polars-parquet/src/parquet/encoding/bitpacked/mod.rs index 72bc89a0838d..4d80a43ed879 100644 --- a/crates/polars-parquet/src/parquet/encoding/bitpacked/mod.rs +++ b/crates/polars-parquet/src/parquet/encoding/bitpacked/mod.rs @@ -3,7 +3,7 @@ mod encode; mod pack; mod unpack; -pub use decode::Decoder; +pub use decode::{Decoder, DecoderIter}; pub use encode::{encode, encode_pack}; /// A byte slice (e.g. `[u8; 8]`) denoting types that represent complete packs. diff --git a/crates/polars-parquet/src/parquet/encoding/bitpacked/unpack.rs b/crates/polars-parquet/src/parquet/encoding/bitpacked/unpack.rs index 061b3acef333..ab6dbc00d5a5 100644 --- a/crates/polars-parquet/src/parquet/encoding/bitpacked/unpack.rs +++ b/crates/polars-parquet/src/parquet/encoding/bitpacked/unpack.rs @@ -45,6 +45,11 @@ macro_rules! unpack_impl { ) }; + // @NOTE + // I was surprised too, but this macro vs. a for loop saves around 4.5 - 5x on + // performance in a microbenchmark. Although the code it generates is completely + // insane. There should be something we can do here to make this less code, sane code + // and faster code. seq_macro::seq!(i in 0..$bits { let start_bit = i * NUM_BITS; let end_bit = start_bit + NUM_BITS; @@ -79,6 +84,10 @@ macro_rules! unpack { /// Unpack packed `input` into `output` with a bit width of `num_bits` pub fn $name(input: &[u8], output: &mut [$t; $bits], num_bits: usize) { // This will get optimised into a jump table + // + // @NOTE + // This jumptable appoach saves around 2 - 2.5x on performance over no jumptable and no + // generics. seq_macro::seq!(i in 0..=$bits { if i == num_bits { return $name::unpack::(input, output); diff --git a/crates/polars-parquet/src/parquet/encoding/delta_bitpacked/decoder.rs b/crates/polars-parquet/src/parquet/encoding/delta_bitpacked/decoder.rs index 09e871d78620..ee21a5094718 100644 --- a/crates/polars-parquet/src/parquet/encoding/delta_bitpacked/decoder.rs +++ b/crates/polars-parquet/src/parquet/encoding/delta_bitpacked/decoder.rs @@ -1,6 +1,6 @@ use super::super::{bitpacked, uleb128, zigzag_leb128}; use crate::parquet::encoding::ceil8; -use crate::parquet::error::ParquetError; +use crate::parquet::error::{ParquetError, ParquetResult}; /// An [`Iterator`] of [`i64`] #[derive(Debug)] @@ -15,7 +15,7 @@ struct Block<'a> { remaining: usize, // number of elements current_index: usize, // invariant: < values_per_mini_block // None represents a relative delta of zero, in which case there is no miniblock. - current_miniblock: Option>, + current_miniblock: Option>, // number of bytes consumed. consumed_bytes: usize, } @@ -26,7 +26,7 @@ impl<'a> Block<'a> { num_mini_blocks: usize, values_per_mini_block: usize, length: usize, - ) -> Result { + ) -> ParquetResult { let length = std::cmp::min(length, num_mini_blocks * values_per_mini_block); let mut consumed_bytes = 0; @@ -61,7 +61,7 @@ impl<'a> Block<'a> { Ok(block) } - fn advance_miniblock(&mut self) -> Result<(), ParquetError> { + fn advance_miniblock(&mut self) -> ParquetResult<()> { // unwrap is ok: we sliced it by num_mini_blocks in try_new let num_bits = self.bitwidths.next().copied().unwrap() as usize; @@ -79,7 +79,11 @@ impl<'a> Block<'a> { self.values = remainder; self.consumed_bytes += miniblock_length; - Some(bitpacked::Decoder::try_new(miniblock, num_bits, length).unwrap()) + Some( + bitpacked::Decoder::try_new(miniblock, num_bits, length) + .unwrap() + .collect_into_iter(), + ) } else { None }; diff --git a/crates/polars-parquet/src/parquet/encoding/hybrid_rle/decoder.rs b/crates/polars-parquet/src/parquet/encoding/hybrid_rle/decoder.rs index 64ff4dd25a06..f7adaa0ffb33 100644 --- a/crates/polars-parquet/src/parquet/encoding/hybrid_rle/decoder.rs +++ b/crates/polars-parquet/src/parquet/encoding/hybrid_rle/decoder.rs @@ -77,7 +77,7 @@ mod tests { assert_eq!(values, &[0b00001011]); let result = bitpacked::Decoder::::try_new(values, bit_width, length) .unwrap() - .collect::>(); + .collect(); assert_eq!(result, &[1, 1, 0, 1, 0]); } else { panic!() @@ -103,7 +103,7 @@ mod tests { assert_eq!(values, &[0b11101011, 0b00000010]); let result = bitpacked::Decoder::::try_new(values, bit_width, 10) .unwrap() - .collect::>(); + .collect(); assert_eq!(result, expected); } else { panic!() diff --git a/crates/polars-parquet/src/parquet/encoding/hybrid_rle/mod.rs b/crates/polars-parquet/src/parquet/encoding/hybrid_rle/mod.rs index 9c58c5f38a3c..149fa3ceaa4e 100644 --- a/crates/polars-parquet/src/parquet/encoding/hybrid_rle/mod.rs +++ b/crates/polars-parquet/src/parquet/encoding/hybrid_rle/mod.rs @@ -2,13 +2,15 @@ mod bitmap; mod decoder; mod encoder; + pub use bitmap::{encode_bool as bitpacked_encode, BitmapIter}; pub use decoder::Decoder; pub use encoder::encode; use polars_utils::iter::FallibleIterator; +use polars_utils::slice::GetSaferUnchecked; -use super::bitpacked; -use crate::parquet::error::ParquetError; +use super::{bitpacked, ceil8, uleb128}; +use crate::parquet::error::{ParquetError, ParquetResult}; /// The two possible states of an RLE-encoded run. #[derive(Debug, Clone, Copy, PartialEq, Eq)] @@ -20,118 +22,210 @@ pub enum HybridEncoded<'a> { Rle(&'a [u8], usize), } +/// A decoder for Hybrid-RLE encoded values #[derive(Debug, Clone)] -enum State<'a> { - None, - Bitpacked(bitpacked::Decoder<'a, u32>), - Rle(std::iter::Take>), - // Add a special branch for a single value to - // adhere to the strong law of small numbers. - Single(Option), +pub struct HybridRleDecoder<'a> { + data: &'a [u8], + num_bits: usize, + num_values: usize, } -/// [`Iterator`] of [`u32`] from a byte slice of Hybrid-RLE encoded values +/// A buffered [`Iterator`] of Hybrid-RLE encoded values #[derive(Debug, Clone)] -pub struct HybridRleDecoder<'a> { - decoder: Decoder<'a>, - state: State<'a>, - remaining: usize, - result: Result<(), ParquetError>, -} +pub struct BufferedHybridRleDecoderIter<'a> { + decoder: HybridRleDecoder<'a>, -#[inline] -fn read_next<'a>(decoder: &mut Decoder<'a>, remaining: usize) -> Result, ParquetError> { - Ok(match decoder.next() { - Some(HybridEncoded::Bitpacked(packed)) => { - let num_bits = decoder.num_bits(); - let length = std::cmp::min(packed.len() * 8 / num_bits, remaining); - let decoder = bitpacked::Decoder::::try_new(packed, num_bits, length)?; - State::Bitpacked(decoder) - }, - Some(HybridEncoded::Rle(pack, additional)) => { - let mut bytes = [0u8; std::mem::size_of::()]; - pack.iter().zip(bytes.iter_mut()).for_each(|(src, dst)| { - *dst = *src; - }); - let value = u32::from_le_bytes(bytes); - if additional == 1 { - State::Single(Some(value)) - } else { - State::Rle(std::iter::repeat(value).take(additional)) - } - }, - None => State::None, - }) + buffer: Vec, + buffer_index: usize, + result: Option, } -impl<'a> HybridRleDecoder<'a> { - /// Returns a new [`HybridRleDecoder`] - pub fn try_new(data: &'a [u8], num_bits: u32, num_values: usize) -> Result { - let num_bits = num_bits as usize; - let mut decoder = Decoder::new(data, num_bits); - let state = read_next(&mut decoder, num_values)?; - Ok(Self { - decoder, - state, - remaining: num_values, - result: Ok(()), - }) - } +impl<'a> BufferedHybridRleDecoderIter<'a> { + // @NOTE: + // These were not taken with too much thought to be honest. It might be better to increase + // these because it allows for more buffering at the cost of utilizing more memory. + const BASE_CAPACITY: usize = 128; + const STOP_AT_SIZE: usize = 64; } -impl<'a> Iterator for HybridRleDecoder<'a> { +impl<'a> Iterator for BufferedHybridRleDecoderIter<'a> { type Item = u32; fn next(&mut self) -> Option { - if self.remaining == 0 { + if self.buffer_index < self.buffer.len() { + let value = self.buffer[self.buffer_index]; + self.buffer_index += 1; + return Some(value); + } + + if self.decoder.num_values == 0 { return None; - }; - - loop { - if let Some(result) = match &mut self.state { - State::Single(opt_val) => { - // make sure to take so that next calls will return 'None' - // indicating that the iterator is finished. - opt_val.take() - }, - State::Bitpacked(decoder) => decoder.next(), - State::Rle(iter) => iter.next(), - State::None => Some(0), - } { - self.remaining -= 1; - return Some(result); - } + } - self.state = match read_next(&mut self.decoder, self.remaining) { - Ok(state) => state, - Err(e) => { - self.result = Err(e); - return None; - }, + if self.decoder.num_bits == 0 { + self.decoder.num_values -= 1; + return Some(0); + } + + self.buffer.clear(); + self.buffer_index = 1; + while self.buffer.len() < Self::STOP_AT_SIZE && self.decoder.num_values > 0 { + let result = self.decoder.collect_once(&mut self.buffer); + if let Err(err) = result { + self.result = Some(err); + return None; } } + + self.buffer.first().copied() } fn size_hint(&self) -> (usize, Option) { - (self.remaining, Some(self.remaining)) + let size = self.decoder.num_values + self.buffer.len() - self.buffer_index; + (size, Some(size)) } } -impl<'a> FallibleIterator for HybridRleDecoder<'a> { - #[inline] +impl<'a> FallibleIterator for BufferedHybridRleDecoderIter<'a> { fn get_result(&mut self) -> Result<(), ParquetError> { - std::mem::replace(&mut self.result, Ok(())) + match self.result.take() { + None => Ok(()), + Some(err) => Err(err), + } } } -impl<'a> ExactSizeIterator for HybridRleDecoder<'a> {} +impl<'a> ExactSizeIterator for BufferedHybridRleDecoderIter<'a> {} + +impl<'a> IntoIterator for HybridRleDecoder<'a> { + type Item = u32; + type IntoIter = BufferedHybridRleDecoderIter<'a>; + + fn into_iter(self) -> Self::IntoIter { + BufferedHybridRleDecoderIter { + decoder: self, + buffer: Vec::with_capacity(BufferedHybridRleDecoderIter::BASE_CAPACITY), + buffer_index: 0, + result: None, + } + } +} + +impl<'a> HybridRleDecoder<'a> { + /// Returns a new [`HybridRleDecoder`] + pub fn new(data: &'a [u8], num_bits: u32, num_values: usize) -> Self { + Self { + data, + num_bits: num_bits as usize, + num_values, + } + } + + pub fn iter(&self) -> BufferedHybridRleDecoderIter<'a> { + BufferedHybridRleDecoderIter { + decoder: self.clone(), + buffer: Vec::with_capacity(BufferedHybridRleDecoderIter::BASE_CAPACITY), + buffer_index: 0, + result: None, + } + } + + fn collect_once(&mut self, vec: &mut Vec) -> ParquetResult<()> { + // @NOTE: + // This is basically a collapsed version of the `decoder::Decoder`. Any change here + // probably also applies there. In a microbenchmark this collapse did around 3x for this + // specific piece of code, but I think this actually also makes the code more readable. + + debug_assert!(self.num_values > 0); + debug_assert!(self.num_bits > 0); + + let (indicator, consumed) = uleb128::decode(self.data); + self.data = unsafe { self.data.get_unchecked_release(consumed..) }; + + if consumed == 0 { + // We don't step everything at once because that might allocate a lot at once. So, we + // do it in steps. This reasoning might not hold up 100% for just HybridRleDecoder but + // it does for BufferedHybridRleDecoderIter. + // + // @TODO: There might be a better solution for this. + + const MAX_STEP_SIZE: usize = 64; + + let step_size = usize::min(self.num_values, MAX_STEP_SIZE); + vec.resize(vec.len() + step_size, 0); + self.num_values -= step_size; + + return Ok(()); + } + + if indicator & 1 == 1 { + // is bitpacking + let bytes = (indicator as usize >> 1) * self.num_bits; + let bytes = std::cmp::min(bytes, self.data.len()); + let (packed, remaining) = self.data.split_at(bytes); + self.data = remaining; + + let length = std::cmp::min(packed.len() * 8 / self.num_bits, self.num_values); + let decoder = bitpacked::Decoder::::try_new(packed, self.num_bits, length)?; + decoder.collect_into(vec); + self.num_values -= length; + } else { + // is rle + let run_length = indicator as usize >> 1; + // repeated-value := value that is repeated, using a fixed-width of round-up-to-next-byte(bit-width) + let rle_bytes = ceil8(self.num_bits); + let (pack, remaining) = self.data.split_at(rle_bytes); + self.data = remaining; + + let mut bytes = [0u8; std::mem::size_of::()]; + pack.iter().zip(bytes.iter_mut()).for_each(|(src, dst)| { + *dst = *src; + }); + let value = u32::from_le_bytes(bytes); + vec.resize(vec.len() + run_length, value); + self.num_values -= run_length; + } + + Ok(()) + } + + #[inline] + pub fn collect_into(mut self, vec: &mut Vec) -> ParquetResult<()> { + // @NOTE: + // When microbenchmarking, this performs around 2x better than using an element-wise + // iterator. + if self.num_values == 0 { + return Ok(()); + } + + if self.num_bits == 0 { + vec.resize(vec.len() + self.num_values, 0); + return Ok(()); + } + + vec.reserve(self.num_values); + + while self.num_values > 0 { + self.collect_once(vec)?; + } + + Ok(()) + } + + pub fn collect(self) -> ParquetResult> { + let mut vec = Vec::new(); + self.collect_into(&mut vec)?; + Ok(vec) + } +} #[cfg(test)] mod tests { + use super::*; #[test] - fn roundtrip() -> Result<(), ParquetError> { + fn roundtrip() -> ParquetResult<()> { let mut buffer = vec![]; let num_bits = 10u32; @@ -139,16 +233,16 @@ mod tests { encode::(&mut buffer, data.iter().cloned(), num_bits).unwrap(); - let decoder = HybridRleDecoder::try_new(&buffer, num_bits, data.len())?; + let decoder = HybridRleDecoder::new(&buffer, num_bits, data.len()); - let result = decoder.collect::>(); + let result = decoder.collect()?; assert_eq!(result, data); Ok(()) } #[test] - fn pyarrow_integration() -> Result<(), ParquetError> { + fn pyarrow_integration() -> ParquetResult<()> { // data encoded from pyarrow representing (0..1000) let data = vec![ 127, 0, 4, 32, 192, 0, 4, 20, 96, 192, 1, 8, 36, 160, 192, 2, 12, 52, 224, 192, 3, 16, @@ -223,51 +317,51 @@ mod tests { ]; let num_bits = 10; - let decoder = HybridRleDecoder::try_new(&data, num_bits, 1000)?; + let decoder = HybridRleDecoder::new(&data, num_bits, 1000); - let result = decoder.collect::>(); + let result = decoder.collect()?; assert_eq!(result, (0..1000).collect::>()); Ok(()) } #[test] - fn small() -> Result<(), ParquetError> { + fn small() -> ParquetResult<()> { let data = vec![3, 2]; let num_bits = 3; - let decoder = HybridRleDecoder::try_new(&data, num_bits, 1)?; + let decoder = HybridRleDecoder::new(&data, num_bits, 1); - let result = decoder.collect::>(); + let result = decoder.collect()?; assert_eq!(result, &[2]); Ok(()) } #[test] - fn zero_bit_width() -> Result<(), ParquetError> { + fn zero_bit_width() -> ParquetResult<()> { let data = vec![3]; let num_bits = 0; - let decoder = HybridRleDecoder::try_new(&data, num_bits, 2)?; + let decoder = HybridRleDecoder::new(&data, num_bits, 2); - let result = decoder.collect::>(); + let result = decoder.collect()?; assert_eq!(result, &[0, 0]); Ok(()) } #[test] - fn empty_values() -> Result<(), ParquetError> { + fn empty_values() -> ParquetResult<()> { let data = []; let num_bits = 1; - let decoder = HybridRleDecoder::try_new(&data, num_bits, 100)?; + let decoder = HybridRleDecoder::new(&data, num_bits, 100); - let result = decoder.collect::>(); + let result = decoder.collect()?; assert_eq!(result, vec![0; 100]); Ok(()) diff --git a/crates/polars/tests/it/io/parquet/read/binary.rs b/crates/polars/tests/it/io/parquet/read/binary.rs index a7a7eb4c4e36..63eaf49bd474 100644 --- a/crates/polars/tests/it/io/parquet/read/binary.rs +++ b/crates/polars/tests/it/io/parquet/read/binary.rs @@ -24,11 +24,13 @@ pub fn page_to_vec( .collect(), FixedLenBinaryPageState::RequiredDictionary(dict) => dict .indexes + .iter() .map(|x| dict.dict.value(x as usize).map(|x| x.to_vec()).map(Some)) .collect(), FixedLenBinaryPageState::OptionalDictionary(validity, dict) => { let values = dict .indexes + .iter() .map(|x| dict.dict.value(x as usize).map(|x| x.to_vec())); deserialize_optional(validity, values) }, diff --git a/crates/polars/tests/it/io/parquet/read/fixed_binary.rs b/crates/polars/tests/it/io/parquet/read/fixed_binary.rs index 7158864e21bf..6951e09367ad 100644 --- a/crates/polars/tests/it/io/parquet/read/fixed_binary.rs +++ b/crates/polars/tests/it/io/parquet/read/fixed_binary.rs @@ -21,11 +21,13 @@ pub fn page_to_vec( }, FixedLenBinaryPageState::RequiredDictionary(dict) => dict .indexes + .iter() .map(|x| dict.dict.value(x as usize).map(|x| x.to_vec()).map(Some)) .collect(), FixedLenBinaryPageState::OptionalDictionary(validity, dict) => { let values = dict .indexes + .iter() .map(|x| dict.dict.value(x as usize).map(|x| x.to_vec())); deserialize_optional(validity, values) }, diff --git a/crates/polars/tests/it/io/parquet/read/primitive.rs b/crates/polars/tests/it/io/parquet/read/primitive.rs index 825cdca48526..c11df388ee58 100644 --- a/crates/polars/tests/it/io/parquet/read/primitive.rs +++ b/crates/polars/tests/it/io/parquet/read/primitive.rs @@ -102,10 +102,14 @@ pub fn page_to_vec( NativePageState::Required(values) => Ok(values.map(Some).collect()), NativePageState::RequiredDictionary(dict) => dict .indexes + .iter() .map(|x| dict.dict.value(x as usize).copied().map(Some)) .collect(), NativePageState::OptionalDictionary(validity, dict) => { - let values = dict.indexes.map(|x| dict.dict.value(x as usize).copied()); + let values = dict + .indexes + .iter() + .map(|x| dict.dict.value(x as usize).copied()); deserialize_optional(validity, values) }, }, diff --git a/crates/polars/tests/it/io/parquet/read/primitive_nested.rs b/crates/polars/tests/it/io/parquet/read/primitive_nested.rs index 8b43786ac68d..b336cea2f498 100644 --- a/crates/polars/tests/it/io/parquet/read/primitive_nested.rs +++ b/crates/polars/tests/it/io/parquet/read/primitive_nested.rs @@ -86,9 +86,9 @@ fn read_array_impl>( ), ((Encoding::Rle, false), (Encoding::Rle, true)) => { let num_bits = get_bit_width(rep_level_encoding.1); - let rep_levels = HybridRleDecoder::try_new(rep_levels, num_bits, length)?; + let rep_levels = HybridRleDecoder::new(rep_levels, num_bits, length); compose_array( - rep_levels, + rep_levels.iter(), std::iter::repeat(0).take(length), max_rep_level, max_def_level, @@ -97,10 +97,10 @@ fn read_array_impl>( }, ((Encoding::Rle, true), (Encoding::Rle, false)) => { let num_bits = get_bit_width(def_level_encoding.1); - let def_levels = HybridRleDecoder::try_new(def_levels, num_bits, length)?; + let def_levels = HybridRleDecoder::new(def_levels, num_bits, length); compose_array( std::iter::repeat(0).take(length), - def_levels, + def_levels.iter(), max_rep_level, max_def_level, values, @@ -108,9 +108,11 @@ fn read_array_impl>( }, ((Encoding::Rle, false), (Encoding::Rle, false)) => { let rep_levels = - HybridRleDecoder::try_new(rep_levels, get_bit_width(rep_level_encoding.1), length)?; + HybridRleDecoder::new(rep_levels, get_bit_width(rep_level_encoding.1), length) + .iter(); let def_levels = - HybridRleDecoder::try_new(def_levels, get_bit_width(def_level_encoding.1), length)?; + HybridRleDecoder::new(def_levels, get_bit_width(def_level_encoding.1), length) + .iter(); compose_array(rep_levels, def_levels, max_rep_level, max_def_level, values) }, _ => todo!(), @@ -182,7 +184,8 @@ fn read_dict_array( let (_, consumed) = uleb128::decode(values); let values = &values[consumed..]; - let indices = bitpacked::Decoder::::try_new(values, bit_width as usize, length as usize)?; + let indices = bitpacked::Decoder::::try_new(values, bit_width as usize, length as usize)? + .collect_into_iter(); let values = indices.map(|id| dict_values[id as usize]); diff --git a/crates/polars/tests/it/io/parquet/read/struct_.rs b/crates/polars/tests/it/io/parquet/read/struct_.rs index 30798b649fe9..3d25dbeefe3d 100644 --- a/crates/polars/tests/it/io/parquet/read/struct_.rs +++ b/crates/polars/tests/it/io/parquet/read/struct_.rs @@ -1,9 +1,9 @@ use polars_parquet::parquet::encoding::hybrid_rle::HybridRleDecoder; -use polars_parquet::parquet::error::ParquetError; +use polars_parquet::parquet::error::ParquetResult; use polars_parquet::parquet::page::{split_buffer, DataPage, EncodedSplitBuffer}; use polars_parquet::parquet::read::levels::get_bit_width; -pub fn extend_validity(val: &mut Vec, page: &DataPage) -> Result<(), ParquetError> { +pub fn extend_validity(val: &mut Vec, page: &DataPage) -> ParquetResult<()> { let EncodedSplitBuffer { rep: _, def: def_levels, @@ -21,7 +21,7 @@ pub fn extend_validity(val: &mut Vec, page: &DataPage) -> Result<(), Parqu ); let mut def_levels = - HybridRleDecoder::try_new(def_levels, get_bit_width(def_level_encoding.1), length)?; + HybridRleDecoder::new(def_levels, get_bit_width(def_level_encoding.1), length).iter(); val.reserve(length); def_levels.try_for_each(|x| { diff --git a/crates/polars/tests/it/io/parquet/read/utils.rs b/crates/polars/tests/it/io/parquet/read/utils.rs index 6603627aadba..240303a4024c 100644 --- a/crates/polars/tests/it/io/parquet/read/utils.rs +++ b/crates/polars/tests/it/io/parquet/read/utils.rs @@ -1,5 +1,7 @@ use polars_parquet::parquet::deserialize::{HybridDecoderBitmapIter, HybridEncoded, HybridRleIter}; -use polars_parquet::parquet::encoding::hybrid_rle::{self, BitmapIter, HybridRleDecoder}; +use polars_parquet::parquet::encoding::hybrid_rle::{ + self, BitmapIter, BufferedHybridRleDecoderIter, HybridRleDecoder, +}; use polars_parquet::parquet::error::{ParquetError, ParquetResult}; use polars_parquet::parquet::page::{split_buffer, DataPage, EncodedSplitBuffer}; use polars_parquet::parquet::read::levels::get_bit_width; @@ -23,7 +25,11 @@ pub(super) fn dict_indices_decoder(page: &DataPage) -> ParquetResult { /// that decodes the runs, but not the individual values Bitmap(HybridDecoderBitmapIter<'a>), /// When the maximum definition level is larger than 1 - Levels(HybridRleDecoder<'a>, u32), + Levels(BufferedHybridRleDecoderIter<'a>, u32), } impl<'a> DefLevelsDecoder<'a> { @@ -52,11 +58,9 @@ impl<'a> DefLevelsDecoder<'a> { let iter = HybridRleIter::new(iter, page.num_values()); Self::Bitmap(iter) } else { - let iter = HybridRleDecoder::try_new( - def_levels, - get_bit_width(max_def_level), - page.num_values(), - )?; + let iter = + HybridRleDecoder::new(def_levels, get_bit_width(max_def_level), page.num_values()) + .iter(); Self::Levels(iter, max_def_level as u32) }) } @@ -137,7 +141,7 @@ fn deserialize_bitmap>>( } fn deserialize_levels>>( - levels: HybridRleDecoder, + levels: BufferedHybridRleDecoderIter, max: u32, mut values: I, ) -> Result>, ParquetError> {