Skip to content

Commit

Permalink
perf: Collect Parquet dictionary binary as view (#17475)
Browse files Browse the repository at this point in the history
  • Loading branch information
coastalwhite authored Jul 8, 2024
1 parent ccfacc8 commit b347717
Show file tree
Hide file tree
Showing 11 changed files with 252 additions and 59 deletions.
2 changes: 1 addition & 1 deletion crates/polars-arrow/src/array/binview/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ use crate::array::iterator::NonNullValuesIter;
use crate::bitmap::utils::{BitmapIter, ZipValidity};
pub type BinaryViewArray = BinaryViewArrayGeneric<[u8]>;
pub type Utf8ViewArray = BinaryViewArrayGeneric<str>;
pub use view::{View, INLINE_VIEW_SIZE};
pub use view::View;

use super::Splitable;

Expand Down
12 changes: 12 additions & 0 deletions crates/polars-arrow/src/array/binview/mutable.rs
Original file line number Diff line number Diff line change
Expand Up @@ -188,6 +188,18 @@ impl<T: ViewType + ?Sized> MutableBinaryViewArray<T> {
self.views.push(value);
}

#[inline]
pub fn push_buffer(&mut self, buffer: Buffer<u8>) -> u32 {
if !self.in_progress_buffer.is_empty() {
self.completed_buffers
.push(Buffer::from(std::mem::take(&mut self.in_progress_buffer)));
}

let buffer_idx = self.completed_buffers.len();
self.completed_buffers.push(buffer);
buffer_idx as u32
}

#[inline]
pub fn push_value<V: AsRef<T>>(&mut self, value: V) {
if let Some(validity) = &mut self.validity {
Expand Down
86 changes: 68 additions & 18 deletions crates/polars-arrow/src/array/binview/view.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
use std::cmp::Ordering;
use std::fmt::{Display, Formatter};
use std::fmt::{self, Display, Formatter};
use std::ops::Add;

use bytemuck::{Pod, Zeroable};
Expand All @@ -13,10 +13,12 @@ use crate::buffer::Buffer;
use crate::datatypes::PrimitiveType;
use crate::types::NativeType;

pub const INLINE_VIEW_SIZE: u32 = 12;

// We use this instead of u128 because we want alignment of <= 8 bytes.
#[derive(Debug, Copy, Clone, Default)]
/// A reference to a set of bytes.
///
/// If `length <= 12`, these bytes are inlined over the `prefix`, `buffer_idx` and `offset` fields.
/// If `length > 12`, these fields specify a slice of a buffer.
#[derive(Copy, Clone, Default)]
#[repr(C)]
pub struct View {
/// The length of the string/bytes.
Expand All @@ -29,29 +31,77 @@ pub struct View {
pub offset: u32,
}

impl fmt::Debug for View {
fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
if self.length <= Self::MAX_INLINE_SIZE {
fmt.debug_struct("View")
.field("length", &self.length)
.field("content", &unsafe {
std::slice::from_raw_parts(
(self as *const _ as *const u8).add(4),
self.length as usize,
)
})
.finish()
} else {
fmt.debug_struct("View")
.field("length", &self.length)
.field("prefix", &self.prefix.to_be_bytes())
.field("buffer_idx", &self.buffer_idx)
.field("offset", &self.offset)
.finish()
}
}
}

impl View {
pub const MAX_INLINE_SIZE: u32 = 12;

#[inline(always)]
pub fn as_u128(self) -> u128 {
unsafe { std::mem::transmute(self) }
}

/// Create a new inline view
///
/// # Panics
///
/// Panics if the `bytes.len() > View::MAX_INLINE_SIZE`.
#[inline]
pub fn new_inline(bytes: &[u8]) -> Self {
debug_assert!(bytes.len() <= u32::MAX as usize);
assert!(bytes.len() as u32 <= Self::MAX_INLINE_SIZE);

let mut view = Self {
length: bytes.len() as u32,
..Default::default()
};

let view_ptr = &mut view as *mut _ as *mut u8;

// SAFETY:
// - bytes length <= 12,
// - size_of::<View> == 16
// - View is laid out as [length, prefix, buffer_idx, offset] (using repr(C))
// - By grabbing the view_ptr and adding 4, we have provenance over prefix, buffer_idx and
// offset. (i.e. the same could not be achieved with &mut self.prefix as *mut _ as *mut u8)
unsafe {
let inline_data_ptr = view_ptr.add(4);
core::ptr::copy_nonoverlapping(bytes.as_ptr(), inline_data_ptr, bytes.len());
}
view
}

#[inline]
pub fn new_from_bytes(bytes: &[u8], buffer_idx: u32, offset: u32) -> Self {
if bytes.len() <= 12 {
let mut ret = Self {
length: bytes.len() as u32,
..Default::default()
};
let ret_ptr = &mut ret as *mut _ as *mut u8;
unsafe {
core::ptr::copy_nonoverlapping(bytes.as_ptr(), ret_ptr.add(4), bytes.len());
}
ret
debug_assert!(bytes.len() <= u32::MAX as usize);

if bytes.len() as u32 <= Self::MAX_INLINE_SIZE {
Self::new_inline(bytes)
} else {
let prefix_buf: [u8; 4] = std::array::from_fn(|i| *bytes.get(i).unwrap_or(&0));
Self {
length: bytes.len() as u32,
prefix: u32::from_le_bytes(prefix_buf),
prefix: u32::from_le_bytes(bytes[0..4].try_into().unwrap()),
buffer_idx,
offset,
}
Expand Down Expand Up @@ -190,8 +240,8 @@ where
{
for view in views {
let len = view.length;
if len <= INLINE_VIEW_SIZE {
if len < INLINE_VIEW_SIZE && view.as_u128() >> (32 + len * 8) != 0 {
if len <= View::MAX_INLINE_SIZE {
if len < View::MAX_INLINE_SIZE && view.as_u128() >> (32 + len * 8) != 0 {
polars_bail!(ComputeError: "view contained non-zero padding in prefix");
}

Expand Down
2 changes: 1 addition & 1 deletion crates/polars-arrow/src/array/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -763,7 +763,7 @@ mod values;
pub use binary::{BinaryArray, BinaryValueIter, MutableBinaryArray, MutableBinaryValuesArray};
pub use binview::{
BinaryViewArray, BinaryViewArrayGeneric, MutableBinaryViewArray, MutablePlBinary,
MutablePlString, Utf8ViewArray, View, ViewType, INLINE_VIEW_SIZE,
MutablePlString, Utf8ViewArray, View, ViewType,
};
pub use boolean::{BooleanArray, MutableBooleanArray};
pub use dictionary::{DictionaryArray, DictionaryKey, MutableDictionaryArray};
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -283,6 +283,7 @@ pub(crate) fn deserialize_plain(values: &[u8], num_values: usize) -> BinaryDict
for v in all {
dict_values.push(v)
}

dict_values.into()
}

Expand Down
92 changes: 65 additions & 27 deletions crates/polars-parquet/src/arrow/read/deserialize/binview/basic.rs
Original file line number Diff line number Diff line change
@@ -1,16 +1,19 @@
use std::cell::Cell;
use std::collections::VecDeque;

use arrow::array::{Array, ArrayRef, BinaryViewArray, MutableBinaryViewArray, Utf8ViewArray};
use arrow::array::{Array, ArrayRef, BinaryViewArray, MutableBinaryViewArray, Utf8ViewArray, View};
use arrow::bitmap::{Bitmap, MutableBitmap};
use arrow::datatypes::{ArrowDataType, PhysicalType};
use polars_error::PolarsResult;
use polars_utils::iter::FallibleIterator;

use super::super::binary::decoders::*;
use crate::parquet::encoding::hybrid_rle::BinaryDictionaryTranslator;
use crate::parquet::error::ParquetError;
use crate::parquet::page::{DataPage, DictPage};
use crate::read::deserialize::utils;
use crate::read::deserialize::utils::{extend_from_decoder, next, DecodedState, MaybeNext};
use crate::read::deserialize::utils::{
self, extend_from_decoder, next, DecodedState, MaybeNext, TranslatedHybridRle,
};
use crate::read::{PagesIter, PrimitiveLogicalType};

type DecodedStateTuple = (MutableBinaryViewArray<[u8]>, MutableBitmap);
Expand Down Expand Up @@ -102,33 +105,78 @@ impl<'a> utils::Decoder<'a> for BinViewDecoder {
BinaryState::OptionalDictionary(page_validity, page_values) => {
// Already done on the dict.
validate_utf8 = false;

let page_dict = &page_values.dict;
let offsets = page_dict.offsets();

// @NOTE: If there is no lengths (i.e. 0-1 offset), then we will have only nulls.
let max_length = offsets.lengths().max().unwrap_or(0);

// We do not have to push the buffer if all elements fit as inline views.
let buffer_idx = if max_length <= View::MAX_INLINE_SIZE as usize {
0
} else {
values.push_buffer(page_dict.values().clone())
};

// @NOTE: we could potentially use the View::new_inline function here, but that
// would require two collectors & two translators. So I don't think it is worth
// it.
let translator = BinaryDictionaryTranslator {
dictionary: page_dict,
buffer_idx,
};
let collector = TranslatedHybridRle::new(&mut page_values.values, &translator);

utils::extend_from_decoder(
validity,
page_validity,
Some(additional),
values,
&mut page_values
.values
.by_ref()
.map(|index| page_dict.value(index as usize)),
collector,
)?;
page_values.values.get_result()?;
},
BinaryState::RequiredDictionary(page) => {
// Already done on the dict.
validate_utf8 = false;

let page_dict = &page.dict;
let offsets = page_dict.offsets();

for x in page
.values
.by_ref()
.map(|index| page_dict.value(index as usize))
.take(additional)
{
values.push_value_ignore_validity(x)
if let Some(max_length) = offsets.lengths().max() {
// We do not have to push the buffer if all elements fit as inline views.
let buffer_idx = if max_length <= View::MAX_INLINE_SIZE as usize {
0
} else {
values.push_buffer(page_dict.values().clone())
};

// @NOTE: we could potentially use the View::new_inline function here, but that
// would require two collectors & two translators. So I don't think it is worth
// it.
let translator = BinaryDictionaryTranslator {
dictionary: page_dict,
buffer_idx,
};

page.values.translate_and_collect_n_into(
values.views_mut(),
additional,
&translator,
)?;
if let Some(validity) = values.validity() {
validity.extend_constant(additional, true);
}
} else {
// @NOTE: If there are no dictionary items, there is no way we can look up
// items.
if additional != 0 {
return Err(ParquetError::oos(
"Attempt to search items with empty dictionary",
)
.into());
}
}
page.values.get_result()?;
},
BinaryState::FilteredOptional(page_validity, page_values) => {
extend_from_decoder(
Expand Down Expand Up @@ -273,17 +321,7 @@ pub(super) fn finish(
}

match data_type.to_physical_type() {
PhysicalType::BinaryView => unsafe {
Ok(BinaryViewArray::new_unchecked(
data_type.clone(),
array.views().clone(),
array.data_buffers().clone(),
array.validity().cloned(),
array.total_bytes_len(),
array.total_buffer_len(),
)
.boxed())
},
PhysicalType::BinaryView => Ok(array.boxed()),
PhysicalType::Utf8View => {
// SAFETY: we already checked utf8
unsafe {
Expand Down
41 changes: 40 additions & 1 deletion crates/polars-parquet/src/arrow/read/deserialize/utils.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
use std::collections::VecDeque;

use arrow::array::{MutableBinaryViewArray, View};
use arrow::bitmap::utils::BitmapIter;
use arrow::bitmap::MutableBitmap;
use arrow::pushable::Pushable;
Expand Down Expand Up @@ -322,7 +323,7 @@ fn reserve_pushable_and_validity<'a, I, T, C: BatchableCollector<I, T>>(
}

/// Extends a [`Pushable`] from an iterator of non-null values and an hybrid-rle decoder
pub(super) fn extend_from_decoder<I, T: std::fmt::Debug, C: BatchableCollector<I, T>>(
pub(super) fn extend_from_decoder<I, T, C: BatchableCollector<I, T>>(
validity: &mut MutableBitmap,
page_validity: &mut dyn PageValidity,
limit: Option<usize>,
Expand Down Expand Up @@ -431,6 +432,44 @@ where
}
}

impl<'a, 'b, 'c, T> BatchableCollector<u32, MutableBinaryViewArray<[u8]>>
for TranslatedHybridRle<'a, 'b, 'c, View, T>
where
T: Translator<View>,
{
#[inline]
fn reserve(target: &mut MutableBinaryViewArray<[u8]>, n: usize) {
target.reserve(n);
}

#[inline]
fn push_n(&mut self, target: &mut MutableBinaryViewArray<[u8]>, n: usize) -> ParquetResult<()> {
self.decoder
.translate_and_collect_n_into(target.views_mut(), n, self.translator)?;

if let Some(validity) = target.validity() {
validity.extend_constant(n, true);
}

Ok(())
}

#[inline]
fn push_n_nulls(
&mut self,
target: &mut MutableBinaryViewArray<[u8]>,
n: usize,
) -> ParquetResult<()> {
target.extend_null(n);
Ok(())
}

#[inline]
fn skip_n(&mut self, n: usize) -> ParquetResult<()> {
self.decoder.skip_in_place(n)
}
}

impl<T, P: Pushable<T>, I: Iterator<Item = T>> BatchableCollector<T, P> for I {
#[inline]
fn reserve(target: &mut P, n: usize) {
Expand Down
4 changes: 3 additions & 1 deletion crates/polars-parquet/src/parquet/encoding/hybrid_rle/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,9 @@ pub use decoder::Decoder;
pub use encoder::encode;
use polars_utils::iter::FallibleIterator;
use polars_utils::slice::GetSaferUnchecked;
pub use translator::{DictionaryTranslator, Translator, UnitTranslator};
pub use translator::{
BinaryDictionaryTranslator, DictionaryTranslator, FnTranslator, Translator, UnitTranslator,
};

use self::buffered::HybridRleBuffered;
use super::{bitpacked, ceil8, uleb128};
Expand Down
Loading

0 comments on commit b347717

Please sign in to comment.