From b34771751327f9348be4e045f2e722055fd4b3a1 Mon Sep 17 00:00:00 2001 From: Gijs Burghoorn Date: Mon, 8 Jul 2024 08:49:04 +0200 Subject: [PATCH] perf: Collect Parquet dictionary binary as view (#17475) --- crates/polars-arrow/src/array/binview/mod.rs | 2 +- .../polars-arrow/src/array/binview/mutable.rs | 12 +++ crates/polars-arrow/src/array/binview/view.rs | 86 +++++++++++++---- crates/polars-arrow/src/array/mod.rs | 2 +- .../arrow/read/deserialize/binary/decoders.rs | 1 + .../arrow/read/deserialize/binview/basic.rs | 92 +++++++++++++------ .../src/arrow/read/deserialize/utils.rs | 41 ++++++++- .../src/parquet/encoding/hybrid_rle/mod.rs | 4 +- .../parquet/encoding/hybrid_rle/translator.rs | 48 ++++++++++ .../polars/tests/it/io/parquet/read/binary.rs | 10 +- .../tests/it/io/parquet/read/primitive.rs | 13 +-- 11 files changed, 252 insertions(+), 59 deletions(-) diff --git a/crates/polars-arrow/src/array/binview/mod.rs b/crates/polars-arrow/src/array/binview/mod.rs index deeda0df6c08..38888299b11b 100644 --- a/crates/polars-arrow/src/array/binview/mod.rs +++ b/crates/polars-arrow/src/array/binview/mod.rs @@ -34,7 +34,7 @@ use crate::array::iterator::NonNullValuesIter; use crate::bitmap::utils::{BitmapIter, ZipValidity}; pub type BinaryViewArray = BinaryViewArrayGeneric<[u8]>; pub type Utf8ViewArray = BinaryViewArrayGeneric; -pub use view::{View, INLINE_VIEW_SIZE}; +pub use view::View; use super::Splitable; diff --git a/crates/polars-arrow/src/array/binview/mutable.rs b/crates/polars-arrow/src/array/binview/mutable.rs index 25482754337a..891f8e6075e8 100644 --- a/crates/polars-arrow/src/array/binview/mutable.rs +++ b/crates/polars-arrow/src/array/binview/mutable.rs @@ -188,6 +188,18 @@ impl MutableBinaryViewArray { self.views.push(value); } + #[inline] + pub fn push_buffer(&mut self, buffer: Buffer) -> u32 { + if !self.in_progress_buffer.is_empty() { + self.completed_buffers + .push(Buffer::from(std::mem::take(&mut self.in_progress_buffer))); + } + + let buffer_idx = self.completed_buffers.len(); + self.completed_buffers.push(buffer); + buffer_idx as u32 + } + #[inline] pub fn push_value>(&mut self, value: V) { if let Some(validity) = &mut self.validity { diff --git a/crates/polars-arrow/src/array/binview/view.rs b/crates/polars-arrow/src/array/binview/view.rs index ccb771d2417d..fd205d8ce508 100644 --- a/crates/polars-arrow/src/array/binview/view.rs +++ b/crates/polars-arrow/src/array/binview/view.rs @@ -1,5 +1,5 @@ use std::cmp::Ordering; -use std::fmt::{Display, Formatter}; +use std::fmt::{self, Display, Formatter}; use std::ops::Add; use bytemuck::{Pod, Zeroable}; @@ -13,10 +13,12 @@ use crate::buffer::Buffer; use crate::datatypes::PrimitiveType; use crate::types::NativeType; -pub const INLINE_VIEW_SIZE: u32 = 12; - // We use this instead of u128 because we want alignment of <= 8 bytes. -#[derive(Debug, Copy, Clone, Default)] +/// A reference to a set of bytes. +/// +/// If `length <= 12`, these bytes are inlined over the `prefix`, `buffer_idx` and `offset` fields. +/// If `length > 12`, these fields specify a slice of a buffer. +#[derive(Copy, Clone, Default)] #[repr(C)] pub struct View { /// The length of the string/bytes. @@ -29,29 +31,77 @@ pub struct View { pub offset: u32, } +impl fmt::Debug for View { + fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result { + if self.length <= Self::MAX_INLINE_SIZE { + fmt.debug_struct("View") + .field("length", &self.length) + .field("content", &unsafe { + std::slice::from_raw_parts( + (self as *const _ as *const u8).add(4), + self.length as usize, + ) + }) + .finish() + } else { + fmt.debug_struct("View") + .field("length", &self.length) + .field("prefix", &self.prefix.to_be_bytes()) + .field("buffer_idx", &self.buffer_idx) + .field("offset", &self.offset) + .finish() + } + } +} + impl View { + pub const MAX_INLINE_SIZE: u32 = 12; + #[inline(always)] pub fn as_u128(self) -> u128 { unsafe { std::mem::transmute(self) } } + /// Create a new inline view + /// + /// # Panics + /// + /// Panics if the `bytes.len() > View::MAX_INLINE_SIZE`. + #[inline] + pub fn new_inline(bytes: &[u8]) -> Self { + debug_assert!(bytes.len() <= u32::MAX as usize); + assert!(bytes.len() as u32 <= Self::MAX_INLINE_SIZE); + + let mut view = Self { + length: bytes.len() as u32, + ..Default::default() + }; + + let view_ptr = &mut view as *mut _ as *mut u8; + + // SAFETY: + // - bytes length <= 12, + // - size_of:: == 16 + // - View is laid out as [length, prefix, buffer_idx, offset] (using repr(C)) + // - By grabbing the view_ptr and adding 4, we have provenance over prefix, buffer_idx and + // offset. (i.e. the same could not be achieved with &mut self.prefix as *mut _ as *mut u8) + unsafe { + let inline_data_ptr = view_ptr.add(4); + core::ptr::copy_nonoverlapping(bytes.as_ptr(), inline_data_ptr, bytes.len()); + } + view + } + #[inline] pub fn new_from_bytes(bytes: &[u8], buffer_idx: u32, offset: u32) -> Self { - if bytes.len() <= 12 { - let mut ret = Self { - length: bytes.len() as u32, - ..Default::default() - }; - let ret_ptr = &mut ret as *mut _ as *mut u8; - unsafe { - core::ptr::copy_nonoverlapping(bytes.as_ptr(), ret_ptr.add(4), bytes.len()); - } - ret + debug_assert!(bytes.len() <= u32::MAX as usize); + + if bytes.len() as u32 <= Self::MAX_INLINE_SIZE { + Self::new_inline(bytes) } else { - let prefix_buf: [u8; 4] = std::array::from_fn(|i| *bytes.get(i).unwrap_or(&0)); Self { length: bytes.len() as u32, - prefix: u32::from_le_bytes(prefix_buf), + prefix: u32::from_le_bytes(bytes[0..4].try_into().unwrap()), buffer_idx, offset, } @@ -190,8 +240,8 @@ where { for view in views { let len = view.length; - if len <= INLINE_VIEW_SIZE { - if len < INLINE_VIEW_SIZE && view.as_u128() >> (32 + len * 8) != 0 { + if len <= View::MAX_INLINE_SIZE { + if len < View::MAX_INLINE_SIZE && view.as_u128() >> (32 + len * 8) != 0 { polars_bail!(ComputeError: "view contained non-zero padding in prefix"); } diff --git a/crates/polars-arrow/src/array/mod.rs b/crates/polars-arrow/src/array/mod.rs index 0dd22cf51d9e..c2c0c958032d 100644 --- a/crates/polars-arrow/src/array/mod.rs +++ b/crates/polars-arrow/src/array/mod.rs @@ -763,7 +763,7 @@ mod values; pub use binary::{BinaryArray, BinaryValueIter, MutableBinaryArray, MutableBinaryValuesArray}; pub use binview::{ BinaryViewArray, BinaryViewArrayGeneric, MutableBinaryViewArray, MutablePlBinary, - MutablePlString, Utf8ViewArray, View, ViewType, INLINE_VIEW_SIZE, + MutablePlString, Utf8ViewArray, View, ViewType, }; pub use boolean::{BooleanArray, MutableBooleanArray}; pub use dictionary::{DictionaryArray, DictionaryKey, MutableDictionaryArray}; diff --git a/crates/polars-parquet/src/arrow/read/deserialize/binary/decoders.rs b/crates/polars-parquet/src/arrow/read/deserialize/binary/decoders.rs index d80dd6791d8b..97c1548b7df3 100644 --- a/crates/polars-parquet/src/arrow/read/deserialize/binary/decoders.rs +++ b/crates/polars-parquet/src/arrow/read/deserialize/binary/decoders.rs @@ -283,6 +283,7 @@ pub(crate) fn deserialize_plain(values: &[u8], num_values: usize) -> BinaryDict for v in all { dict_values.push(v) } + dict_values.into() } diff --git a/crates/polars-parquet/src/arrow/read/deserialize/binview/basic.rs b/crates/polars-parquet/src/arrow/read/deserialize/binview/basic.rs index e24cc576b005..3e0587d2ee17 100644 --- a/crates/polars-parquet/src/arrow/read/deserialize/binview/basic.rs +++ b/crates/polars-parquet/src/arrow/read/deserialize/binview/basic.rs @@ -1,16 +1,19 @@ use std::cell::Cell; use std::collections::VecDeque; -use arrow::array::{Array, ArrayRef, BinaryViewArray, MutableBinaryViewArray, Utf8ViewArray}; +use arrow::array::{Array, ArrayRef, BinaryViewArray, MutableBinaryViewArray, Utf8ViewArray, View}; use arrow::bitmap::{Bitmap, MutableBitmap}; use arrow::datatypes::{ArrowDataType, PhysicalType}; use polars_error::PolarsResult; use polars_utils::iter::FallibleIterator; use super::super::binary::decoders::*; +use crate::parquet::encoding::hybrid_rle::BinaryDictionaryTranslator; +use crate::parquet::error::ParquetError; use crate::parquet::page::{DataPage, DictPage}; -use crate::read::deserialize::utils; -use crate::read::deserialize::utils::{extend_from_decoder, next, DecodedState, MaybeNext}; +use crate::read::deserialize::utils::{ + self, extend_from_decoder, next, DecodedState, MaybeNext, TranslatedHybridRle, +}; use crate::read::{PagesIter, PrimitiveLogicalType}; type DecodedStateTuple = (MutableBinaryViewArray<[u8]>, MutableBitmap); @@ -102,33 +105,78 @@ impl<'a> utils::Decoder<'a> for BinViewDecoder { BinaryState::OptionalDictionary(page_validity, page_values) => { // Already done on the dict. validate_utf8 = false; + let page_dict = &page_values.dict; + let offsets = page_dict.offsets(); + + // @NOTE: If there is no lengths (i.e. 0-1 offset), then we will have only nulls. + let max_length = offsets.lengths().max().unwrap_or(0); + + // We do not have to push the buffer if all elements fit as inline views. + let buffer_idx = if max_length <= View::MAX_INLINE_SIZE as usize { + 0 + } else { + values.push_buffer(page_dict.values().clone()) + }; + + // @NOTE: we could potentially use the View::new_inline function here, but that + // would require two collectors & two translators. So I don't think it is worth + // it. + let translator = BinaryDictionaryTranslator { + dictionary: page_dict, + buffer_idx, + }; + let collector = TranslatedHybridRle::new(&mut page_values.values, &translator); + utils::extend_from_decoder( validity, page_validity, Some(additional), values, - &mut page_values - .values - .by_ref() - .map(|index| page_dict.value(index as usize)), + collector, )?; - page_values.values.get_result()?; }, BinaryState::RequiredDictionary(page) => { // Already done on the dict. validate_utf8 = false; + let page_dict = &page.dict; + let offsets = page_dict.offsets(); - for x in page - .values - .by_ref() - .map(|index| page_dict.value(index as usize)) - .take(additional) - { - values.push_value_ignore_validity(x) + if let Some(max_length) = offsets.lengths().max() { + // We do not have to push the buffer if all elements fit as inline views. + let buffer_idx = if max_length <= View::MAX_INLINE_SIZE as usize { + 0 + } else { + values.push_buffer(page_dict.values().clone()) + }; + + // @NOTE: we could potentially use the View::new_inline function here, but that + // would require two collectors & two translators. So I don't think it is worth + // it. + let translator = BinaryDictionaryTranslator { + dictionary: page_dict, + buffer_idx, + }; + + page.values.translate_and_collect_n_into( + values.views_mut(), + additional, + &translator, + )?; + if let Some(validity) = values.validity() { + validity.extend_constant(additional, true); + } + } else { + // @NOTE: If there are no dictionary items, there is no way we can look up + // items. + if additional != 0 { + return Err(ParquetError::oos( + "Attempt to search items with empty dictionary", + ) + .into()); + } } - page.values.get_result()?; }, BinaryState::FilteredOptional(page_validity, page_values) => { extend_from_decoder( @@ -273,17 +321,7 @@ pub(super) fn finish( } match data_type.to_physical_type() { - PhysicalType::BinaryView => unsafe { - Ok(BinaryViewArray::new_unchecked( - data_type.clone(), - array.views().clone(), - array.data_buffers().clone(), - array.validity().cloned(), - array.total_bytes_len(), - array.total_buffer_len(), - ) - .boxed()) - }, + PhysicalType::BinaryView => Ok(array.boxed()), PhysicalType::Utf8View => { // SAFETY: we already checked utf8 unsafe { diff --git a/crates/polars-parquet/src/arrow/read/deserialize/utils.rs b/crates/polars-parquet/src/arrow/read/deserialize/utils.rs index f8a9ea87fbd4..994e31571111 100644 --- a/crates/polars-parquet/src/arrow/read/deserialize/utils.rs +++ b/crates/polars-parquet/src/arrow/read/deserialize/utils.rs @@ -1,5 +1,6 @@ use std::collections::VecDeque; +use arrow::array::{MutableBinaryViewArray, View}; use arrow::bitmap::utils::BitmapIter; use arrow::bitmap::MutableBitmap; use arrow::pushable::Pushable; @@ -322,7 +323,7 @@ fn reserve_pushable_and_validity<'a, I, T, C: BatchableCollector>( } /// Extends a [`Pushable`] from an iterator of non-null values and an hybrid-rle decoder -pub(super) fn extend_from_decoder>( +pub(super) fn extend_from_decoder>( validity: &mut MutableBitmap, page_validity: &mut dyn PageValidity, limit: Option, @@ -431,6 +432,44 @@ where } } +impl<'a, 'b, 'c, T> BatchableCollector> + for TranslatedHybridRle<'a, 'b, 'c, View, T> +where + T: Translator, +{ + #[inline] + fn reserve(target: &mut MutableBinaryViewArray<[u8]>, n: usize) { + target.reserve(n); + } + + #[inline] + fn push_n(&mut self, target: &mut MutableBinaryViewArray<[u8]>, n: usize) -> ParquetResult<()> { + self.decoder + .translate_and_collect_n_into(target.views_mut(), n, self.translator)?; + + if let Some(validity) = target.validity() { + validity.extend_constant(n, true); + } + + Ok(()) + } + + #[inline] + fn push_n_nulls( + &mut self, + target: &mut MutableBinaryViewArray<[u8]>, + n: usize, + ) -> ParquetResult<()> { + target.extend_null(n); + Ok(()) + } + + #[inline] + fn skip_n(&mut self, n: usize) -> ParquetResult<()> { + self.decoder.skip_in_place(n) + } +} + impl, I: Iterator> BatchableCollector for I { #[inline] fn reserve(target: &mut P, n: usize) { diff --git a/crates/polars-parquet/src/parquet/encoding/hybrid_rle/mod.rs b/crates/polars-parquet/src/parquet/encoding/hybrid_rle/mod.rs index 6dbc22f857ba..079f600fd45a 100644 --- a/crates/polars-parquet/src/parquet/encoding/hybrid_rle/mod.rs +++ b/crates/polars-parquet/src/parquet/encoding/hybrid_rle/mod.rs @@ -14,7 +14,9 @@ pub use decoder::Decoder; pub use encoder::encode; use polars_utils::iter::FallibleIterator; use polars_utils::slice::GetSaferUnchecked; -pub use translator::{DictionaryTranslator, Translator, UnitTranslator}; +pub use translator::{ + BinaryDictionaryTranslator, DictionaryTranslator, FnTranslator, Translator, UnitTranslator, +}; use self::buffered::HybridRleBuffered; use super::{bitpacked, ceil8, uleb128}; diff --git a/crates/polars-parquet/src/parquet/encoding/hybrid_rle/translator.rs b/crates/polars-parquet/src/parquet/encoding/hybrid_rle/translator.rs index a49351072a45..b36df14978cc 100644 --- a/crates/polars-parquet/src/parquet/encoding/hybrid_rle/translator.rs +++ b/crates/polars-parquet/src/parquet/encoding/hybrid_rle/translator.rs @@ -1,3 +1,5 @@ +use arrow::array::{BinaryArray, View}; + use crate::parquet::encoding::bitpacked::{Decoder, Unpackable, Unpacked}; use crate::parquet::encoding::hybrid_rle::{BufferedBitpacked, HybridRleBuffered}; use crate::parquet::error::{ParquetError, ParquetResult}; @@ -239,6 +241,52 @@ impl<'a, T: Copy> Translator for DictionaryTranslator<'a, T> { } } +/// This is a binary dictionary translation variant of [`Translator`]. +/// +/// All the [`HybridRleDecoder`] values are regarded as a offset into a binary array regarded as a +/// dictionary. +/// +/// [`HybridRleDecoder`]: super::HybridRleDecoder +pub struct BinaryDictionaryTranslator<'a> { + pub dictionary: &'a BinaryArray, + pub buffer_idx: u32, +} + +impl<'a> Translator for BinaryDictionaryTranslator<'a> { + fn translate(&self, index: u32) -> ParquetResult { + if index as usize >= self.dictionary.len() { + return Err(ParquetError::oos("Dictionary index is out of range")); + } + + let value = self.dictionary.value(index as usize); + let (start, _) = self.dictionary.offsets().start_end(index as usize); + Ok(View::new_from_bytes(value, self.buffer_idx, start as u32)) + } + + fn translate_slice(&self, target: &mut Vec, source: &[u32]) -> ParquetResult<()> { + let Some(source_max) = source.iter().copied().max() else { + return Ok(()); + }; + + if source_max as usize >= self.dictionary.len() { + return Err(ParquetError::oos("Dictionary index is out of range")); + } + + let offsets = self.dictionary.offsets(); + + target.extend(source.iter().map(|&src_idx| { + // Safety: We have checked before that source only has indexes that are smaller than + // the dictionary length. + let value = unsafe { self.dictionary.value_unchecked(src_idx as usize) }; + debug_assert!((src_idx as usize) < offsets.len_proxy()); + let (start, _) = unsafe { offsets.start_end_unchecked(src_idx as usize) }; + View::new_from_bytes(value, self.buffer_idx, start as u32) + })); + + Ok(()) + } +} + /// A closure-based translator pub struct FnTranslator ParquetResult>(pub F); diff --git a/crates/polars/tests/it/io/parquet/read/binary.rs b/crates/polars/tests/it/io/parquet/read/binary.rs index a7a7eb4c4e36..724e7d791c42 100644 --- a/crates/polars/tests/it/io/parquet/read/binary.rs +++ b/crates/polars/tests/it/io/parquet/read/binary.rs @@ -1,3 +1,4 @@ +use polars_parquet::parquet::encoding::hybrid_rle::FnTranslator; use polars_parquet::parquet::error::ParquetResult; use polars_parquet::parquet::page::DataPage; @@ -22,10 +23,11 @@ pub fn page_to_vec( .map(Some) .map(|x| x.transpose()) .collect(), - FixedLenBinaryPageState::RequiredDictionary(dict) => dict - .indexes - .map(|x| dict.dict.value(x as usize).map(|x| x.to_vec()).map(Some)) - .collect(), + FixedLenBinaryPageState::RequiredDictionary(dict) => { + let dictionary = + FnTranslator(|v| dict.dict.value(v as usize).map(|v| Some(v.to_vec()))); + dict.indexes.translate_and_collect(&dictionary) + }, FixedLenBinaryPageState::OptionalDictionary(validity, dict) => { let values = dict .indexes diff --git a/crates/polars/tests/it/io/parquet/read/primitive.rs b/crates/polars/tests/it/io/parquet/read/primitive.rs index 825cdca48526..f9c47ace5679 100644 --- a/crates/polars/tests/it/io/parquet/read/primitive.rs +++ b/crates/polars/tests/it/io/parquet/read/primitive.rs @@ -1,8 +1,9 @@ use polars_parquet::parquet::deserialize::{ HybridRleDecoderIter, HybridRleIter, SliceFilteredIter, }; -use polars_parquet::parquet::encoding::hybrid_rle::Decoder; +use polars_parquet::parquet::encoding::hybrid_rle::{Decoder, FnTranslator}; use polars_parquet::parquet::encoding::Encoding; +use polars_parquet::parquet::error::ParquetResult; use polars_parquet::parquet::page::{split_buffer, DataPage, EncodedSplitBuffer}; use polars_parquet::parquet::schema::Repetition; use polars_parquet::parquet::types::NativeType; @@ -90,7 +91,7 @@ impl<'a, T: NativeType> PageState<'a, T> { pub fn page_to_vec( page: &DataPage, dict: Option<&PrimitivePageDict>, -) -> Result>, ParquetError> { +) -> ParquetResult>> { assert_eq!(page.descriptor.max_rep_level, 0); let state = PageState::::try_new(page, dict)?; @@ -100,10 +101,10 @@ pub fn page_to_vec( deserialize_optional(validity, values.by_ref().map(Ok)) }, NativePageState::Required(values) => Ok(values.map(Some).collect()), - NativePageState::RequiredDictionary(dict) => dict - .indexes - .map(|x| dict.dict.value(x as usize).copied().map(Some)) - .collect(), + NativePageState::RequiredDictionary(dict) => { + let dictionary = FnTranslator(|x| dict.dict.value(x as usize).copied().map(Some)); + dict.indexes.translate_and_collect(&dictionary) + }, NativePageState::OptionalDictionary(validity, dict) => { let values = dict.indexes.map(|x| dict.dict.value(x as usize).copied()); deserialize_optional(validity, values)