diff --git a/crates/polars-arrow/src/compute/cast/binary_to.rs b/crates/polars-arrow/src/compute/cast/binary_to.rs index c7970fe6a051..d5e8bfb30852 100644 --- a/crates/polars-arrow/src/compute/cast/binary_to.rs +++ b/crates/polars-arrow/src/compute/cast/binary_to.rs @@ -139,6 +139,7 @@ pub fn binary_to_dictionary( from: &BinaryArray, ) -> PolarsResult> { let mut array = MutableDictionaryArray::>::new(); + array.reserve(from.len()); array.try_extend(from.iter())?; Ok(array.into()) diff --git a/crates/polars-arrow/src/compute/cast/binview_to.rs b/crates/polars-arrow/src/compute/cast/binview_to.rs index 8c7ef4c2453a..1c157110ec49 100644 --- a/crates/polars-arrow/src/compute/cast/binview_to.rs +++ b/crates/polars-arrow/src/compute/cast/binview_to.rs @@ -21,6 +21,7 @@ pub(super) fn binview_to_dictionary( from: &BinaryViewArray, ) -> PolarsResult> { let mut array = MutableDictionaryArray::>::new(); + array.reserve(from.len()); array.try_extend(from.iter())?; Ok(array.into()) @@ -30,6 +31,7 @@ pub(super) fn utf8view_to_dictionary( from: &Utf8ViewArray, ) -> PolarsResult> { let mut array = MutableDictionaryArray::>::new(); + array.reserve(from.len()); array.try_extend(from.iter())?; Ok(array.into()) diff --git a/crates/polars-arrow/src/compute/cast/primitive_to.rs b/crates/polars-arrow/src/compute/cast/primitive_to.rs index d0d2056b70de..583b6ab19a96 100644 --- a/crates/polars-arrow/src/compute/cast/primitive_to.rs +++ b/crates/polars-arrow/src/compute/cast/primitive_to.rs @@ -318,6 +318,7 @@ pub fn primitive_to_dictionary( let mut array = MutableDictionaryArray::::try_empty(MutablePrimitiveArray::::from( from.data_type().clone(), ))?; + array.reserve(from.len()); array.try_extend(iter)?; Ok(array.into()) diff --git a/crates/polars-arrow/src/compute/cast/utf8_to.rs b/crates/polars-arrow/src/compute/cast/utf8_to.rs index 4df2876d394e..85b478c43817 100644 --- a/crates/polars-arrow/src/compute/cast/utf8_to.rs +++ b/crates/polars-arrow/src/compute/cast/utf8_to.rs @@ -27,6 +27,7 @@ pub fn utf8_to_dictionary( from: &Utf8Array, ) -> PolarsResult> { let mut array = MutableDictionaryArray::>::new(); + array.reserve(from.len()); array.try_extend(from.iter())?; Ok(array.into()) diff --git a/crates/polars-io/src/parquet/write/writer.rs b/crates/polars-io/src/parquet/write/writer.rs index 2408d66e9ba2..620ac11c3351 100644 --- a/crates/polars-io/src/parquet/write/writer.rs +++ b/crates/polars-io/src/parquet/write/writer.rs @@ -102,7 +102,7 @@ where WriteOptions { write_statistics: self.statistics, compression: self.compression, - version: Version::V2, + version: Version::V1, data_pagesize_limit: self.data_page_size, } } diff --git a/crates/polars-parquet/src/arrow/write/dictionary.rs b/crates/polars-parquet/src/arrow/write/dictionary.rs index b3ea666865c9..0525578589eb 100644 --- a/crates/polars-parquet/src/arrow/write/dictionary.rs +++ b/crates/polars-parquet/src/arrow/write/dictionary.rs @@ -1,7 +1,6 @@ use arrow::array::{Array, BinaryViewArray, DictionaryArray, DictionaryKey, Utf8ViewArray}; use arrow::bitmap::{Bitmap, MutableBitmap}; use arrow::datatypes::{ArrowDataType, IntegerType}; -use num_traits::ToPrimitive; use polars_error::{polars_bail, PolarsResult}; use super::binary::{ @@ -16,23 +15,19 @@ use super::primitive::{ use super::{binview, nested, Nested, WriteOptions}; use crate::arrow::read::schema::is_nullable; use crate::arrow::write::{slice_nested_leaf, utils}; -use crate::parquet::encoding::hybrid_rle::encode_u32; +use crate::parquet::encoding::hybrid_rle::encode; use crate::parquet::encoding::Encoding; use crate::parquet::page::{DictPage, Page}; use crate::parquet::schema::types::PrimitiveType; use crate::parquet::statistics::{serialize_statistics, ParquetStatistics}; -use crate::write::{to_nested, DynIter, ParquetType}; +use crate::write::DynIter; pub(crate) fn encode_as_dictionary_optional( array: &dyn Array, + nested: &[Nested], type_: PrimitiveType, options: WriteOptions, ) -> Option>>> { - let nested = to_nested(array, &ParquetType::PrimitiveType(type_.clone())) - .ok()? - .pop() - .unwrap(); - let dtype = Box::new(array.data_type().clone()); let len_before = array.len(); @@ -52,35 +47,11 @@ pub(crate) fn encode_as_dictionary_optional( if (array.values().len() as f64) / (len_before as f64) > 0.75 { return None; } - if array.values().len().to_u16().is_some() { - let array = arrow::compute::cast::cast( - array, - &ArrowDataType::Dictionary( - IntegerType::UInt16, - Box::new(array.values().data_type().clone()), - false, - ), - Default::default(), - ) - .unwrap(); - - let array = array - .as_any() - .downcast_ref::>() - .unwrap(); - return Some(array_to_pages( - array, - type_, - &nested, - options, - Encoding::RleDictionary, - )); - } Some(array_to_pages( array, type_, - &nested, + nested, options, Encoding::RleDictionary, )) @@ -116,7 +87,7 @@ fn serialize_keys_values( buffer.push(num_bits as u8); // followed by the encoded indices. - Ok(encode_u32(buffer, keys, num_bits)?) + Ok(encode::(buffer, keys, num_bits)?) } else { let num_bits = utils::get_bit_width(keys.clone().max().unwrap_or(0) as u64); @@ -124,7 +95,7 @@ fn serialize_keys_values( buffer.push(num_bits as u8); // followed by the encoded indices. - Ok(encode_u32(buffer, keys, num_bits)?) + Ok(encode::(buffer, keys, num_bits)?) } } diff --git a/crates/polars-parquet/src/arrow/write/mod.rs b/crates/polars-parquet/src/arrow/write/mod.rs index d992a3f08a8e..f90ca2482fb4 100644 --- a/crates/polars-parquet/src/arrow/write/mod.rs +++ b/crates/polars-parquet/src/arrow/write/mod.rs @@ -219,7 +219,7 @@ pub fn array_to_pages( // Only take this path for primitive columns if matches!(nested.first(), Some(Nested::Primitive(_, _, _))) { if let Some(result) = - encode_as_dictionary_optional(primitive_array, type_.clone(), options) + encode_as_dictionary_optional(primitive_array, nested, type_.clone(), options) { return result; } diff --git a/crates/polars-parquet/src/arrow/write/nested/mod.rs b/crates/polars-parquet/src/arrow/write/nested/mod.rs index 46e15eec6c72..9aed392a06ee 100644 --- a/crates/polars-parquet/src/arrow/write/nested/mod.rs +++ b/crates/polars-parquet/src/arrow/write/nested/mod.rs @@ -6,7 +6,7 @@ use polars_error::PolarsResult; pub use rep::num_values; use super::Nested; -use crate::parquet::encoding::hybrid_rle::encode_u32; +use crate::parquet::encoding::hybrid_rle::encode; use crate::parquet::read::levels::get_bit_width; use crate::parquet::write::Version; @@ -41,12 +41,12 @@ fn write_rep_levels(buffer: &mut Vec, nested: &[Nested], version: Version) - match version { Version::V1 => { write_levels_v1(buffer, |buffer: &mut Vec| { - encode_u32(buffer, levels, num_bits)?; + encode::(buffer, levels, num_bits)?; Ok(()) })?; }, Version::V2 => { - encode_u32(buffer, levels, num_bits)?; + encode::(buffer, levels, num_bits)?; }, } @@ -65,10 +65,10 @@ fn write_def_levels(buffer: &mut Vec, nested: &[Nested], version: Version) - match version { Version::V1 => write_levels_v1(buffer, move |buffer: &mut Vec| { - encode_u32(buffer, levels, num_bits)?; + encode::(buffer, levels, num_bits)?; Ok(()) }), - Version::V2 => Ok(encode_u32(buffer, levels, num_bits)?), + Version::V2 => Ok(encode::(buffer, levels, num_bits)?), } } diff --git a/crates/polars-parquet/src/arrow/write/utils.rs b/crates/polars-parquet/src/arrow/write/utils.rs index 2032029b2de4..0ba9f4289bab 100644 --- a/crates/polars-parquet/src/arrow/write/utils.rs +++ b/crates/polars-parquet/src/arrow/write/utils.rs @@ -4,7 +4,7 @@ use polars_error::*; use super::{Version, WriteOptions}; use crate::parquet::compression::CompressionOptions; -use crate::parquet::encoding::hybrid_rle::encode_bool; +use crate::parquet::encoding::hybrid_rle::encode; use crate::parquet::encoding::Encoding; use crate::parquet::metadata::Descriptor; use crate::parquet::page::{DataPage, DataPageHeader, DataPageHeaderV1, DataPageHeaderV2}; @@ -14,7 +14,7 @@ use crate::parquet::statistics::ParquetStatistics; fn encode_iter_v1>(buffer: &mut Vec, iter: I) -> PolarsResult<()> { buffer.extend_from_slice(&[0; 4]); let start = buffer.len(); - encode_bool(buffer, iter)?; + encode::(buffer, iter, 1)?; let end = buffer.len(); let length = end - start; @@ -25,7 +25,7 @@ fn encode_iter_v1>(buffer: &mut Vec, iter: I) -> Po } fn encode_iter_v2>(writer: &mut Vec, iter: I) -> PolarsResult<()> { - Ok(encode_bool(writer, iter)?) + Ok(encode::(writer, iter, 1)?) } fn encode_iter>( diff --git a/crates/polars-parquet/src/parquet/encoding/hybrid_rle/encoder.rs b/crates/polars-parquet/src/parquet/encoding/hybrid_rle/encoder.rs index 1c4dd67ccec7..963499cf324f 100644 --- a/crates/polars-parquet/src/parquet/encoding/hybrid_rle/encoder.rs +++ b/crates/polars-parquet/src/parquet/encoding/hybrid_rle/encoder.rs @@ -3,98 +3,216 @@ use std::io::Write; use super::bitpacked_encode; use crate::parquet::encoding::{bitpacked, ceil8, uleb128}; -/// RLE-hybrid encoding of `u32`. This currently only yields bitpacked values. -pub fn encode_u32>( - writer: &mut W, - iterator: I, - num_bits: u32, -) -> std::io::Result<()> { - let num_bits = num_bits as u8; - // the length of the iterator. - let length = iterator.size_hint().1.unwrap(); +// Arbitrary value that balances memory usage and storage overhead +const MAX_VALUES_PER_LITERAL_RUN: usize = (1 << 10) * 8; + +pub trait Encoder { + fn bitpacked_encode>( + writer: &mut W, + iterator: I, + num_bits: usize, + ) -> std::io::Result<()>; + + fn run_length_encode( + writer: &mut W, + run_length: usize, + value: T, + bit_width: u32, + ) -> std::io::Result<()>; +} - // write the length + indicator - let mut header = ceil8(length) as u64; - header <<= 1; - header |= 1; // it is bitpacked => first bit is set - let mut container = [0; 10]; - let used = uleb128::encode(header, &mut container); - writer.write_all(&container[..used])?; +const U32_BLOCK_LEN: usize = 32; - bitpacked_encode_u32(writer, iterator, num_bits as usize)?; +impl Encoder for u32 { + fn bitpacked_encode>( + writer: &mut W, + mut iterator: I, + num_bits: usize, + ) -> std::io::Result<()> { + // the length of the iterator. + let length = iterator.size_hint().1.unwrap(); + + let mut header = ceil8(length) as u64; + header <<= 1; + header |= 1; // it is bitpacked => first bit is set + let mut container = [0; 10]; + let used = uleb128::encode(header, &mut container); + writer.write_all(&container[..used])?; + + let chunks = length / U32_BLOCK_LEN; + let remainder = length - chunks * U32_BLOCK_LEN; + let mut buffer = [0u32; U32_BLOCK_LEN]; + + // simplified from ceil8(U32_BLOCK_LEN * num_bits) since U32_BLOCK_LEN = 32 + let compressed_chunk_size = 4 * num_bits; + + for _ in 0..chunks { + iterator + .by_ref() + .take(U32_BLOCK_LEN) + .zip(buffer.iter_mut()) + .for_each(|(item, buf)| *buf = item); + + let mut packed = [0u8; 4 * U32_BLOCK_LEN]; + bitpacked::encode_pack::(&buffer, num_bits, packed.as_mut()); + writer.write_all(&packed[..compressed_chunk_size])?; + } + + if remainder != 0 { + // Must be careful here to ensure we write a multiple of `num_bits` + // (the bit width) to align with the spec. Some readers also rely on + // this - see https://github.com/pola-rs/polars/pull/13883. + + // this is ceil8(remainder * num_bits), but we ensure the output is a + // multiple of num_bits by rewriting it as ceil8(remainder) * num_bits + let compressed_remainder_size = ceil8(remainder) * num_bits; + iterator + .by_ref() + .take(remainder) + .zip(buffer.iter_mut()) + .for_each(|(item, buf)| *buf = item); + + let mut packed = [0u8; 4 * U32_BLOCK_LEN]; + bitpacked::encode_pack(&buffer[..remainder], num_bits, packed.as_mut()); + writer.write_all(&packed[..compressed_remainder_size])?; + }; + Ok(()) + } - Ok(()) + fn run_length_encode( + writer: &mut W, + run_length: usize, + value: u32, + bit_width: u32, + ) -> std::io::Result<()> { + // write the length + indicator + let mut header = run_length as u64; + header <<= 1; + let mut container = [0; 10]; + let used = uleb128::encode(header, &mut container); + writer.write_all(&container[..used])?; + + let num_bytes = ceil8(bit_width as usize); + let bytes = value.to_le_bytes(); + writer.write_all(&bytes[..num_bytes])?; + Ok(()) + } } -const U32_BLOCK_LEN: usize = 32; - -fn bitpacked_encode_u32>( - writer: &mut W, - mut iterator: I, - num_bits: usize, -) -> std::io::Result<()> { - // the length of the iterator. - let length = iterator.size_hint().1.unwrap(); - - let chunks = length / U32_BLOCK_LEN; - let remainder = length - chunks * U32_BLOCK_LEN; - let mut buffer = [0u32; U32_BLOCK_LEN]; - - // simplified from ceil8(U32_BLOCK_LEN * num_bits) since U32_BLOCK_LEN = 32 - let compressed_chunk_size = 4 * num_bits; - - for _ in 0..chunks { - iterator - .by_ref() - .take(U32_BLOCK_LEN) - .zip(buffer.iter_mut()) - .for_each(|(item, buf)| *buf = item); - - let mut packed = [0u8; 4 * U32_BLOCK_LEN]; - bitpacked::encode_pack::(&buffer, num_bits, packed.as_mut()); - writer.write_all(&packed[..compressed_chunk_size])?; +impl Encoder for bool { + fn bitpacked_encode>( + writer: &mut W, + iterator: I, + _num_bits: usize, + ) -> std::io::Result<()> { + // the length of the iterator. + let length = iterator.size_hint().1.unwrap(); + + let mut header = ceil8(length) as u64; + header <<= 1; + header |= 1; // it is bitpacked => first bit is set + let mut container = [0; 10]; + let used = uleb128::encode(header, &mut container); + writer.write_all(&container[..used])?; + bitpacked_encode(writer, iterator)?; + Ok(()) } - if remainder != 0 { - // Must be careful here to ensure we write a multiple of `num_bits` - // (the bit width) to align with the spec. Some readers also rely on - // this - see https://github.com/pola-rs/polars/pull/13883. - - // this is ceil8(remainder * num_bits), but we ensure the output is a - // multiple of num_bits by rewriting it as ceil8(remainder) * num_bits - let compressed_remainder_size = ceil8(remainder) * num_bits; - iterator - .by_ref() - .take(remainder) - .zip(buffer.iter_mut()) - .for_each(|(item, buf)| *buf = item); - - let mut packed = [0u8; 4 * U32_BLOCK_LEN]; - bitpacked::encode_pack(&buffer, num_bits, packed.as_mut()); - writer.write_all(&packed[..compressed_remainder_size])?; - }; - Ok(()) + fn run_length_encode( + writer: &mut W, + run_length: usize, + value: bool, + _bit_width: u32, + ) -> std::io::Result<()> { + // write the length + indicator + let mut header = run_length as u64; + header <<= 1; + let mut container = [0; 10]; + let used = uleb128::encode(header, &mut container); + writer.write_all(&container[..used])?; + writer.write_all(&(value as u8).to_le_bytes())?; + Ok(()) + } } -/// the bitpacked part of the encoder. -pub fn encode_bool>( +#[allow(clippy::comparison_chain)] +pub fn encode, W: Write, I: Iterator>( writer: &mut W, iterator: I, + num_bits: u32, ) -> std::io::Result<()> { - // the length of the iterator. - let length = iterator.size_hint().1.unwrap(); - - // write the length + indicator - let mut header = ceil8(length) as u64; - header <<= 1; - header |= 1; // it is bitpacked => first bit is set - let mut container = [0; 10]; - let used = uleb128::encode(header, &mut container); - - writer.write_all(&container[..used])?; - - // encode the iterator - bitpacked_encode(writer, iterator) + let mut consecutive_repeats: usize = 0; + let mut previous_val = T::default(); + let mut buffered_bits = [previous_val; MAX_VALUES_PER_LITERAL_RUN]; + let mut buffer_idx = 0; + let mut literal_run_idx = 0; + for val in iterator { + if val == previous_val { + consecutive_repeats += 1; + // Run is long enough to RLE, no need to buffer values + if consecutive_repeats >= 8 { + // Run is long enough to RLE, no need to buffer values + if consecutive_repeats > 8 { + continue; + } else { + // Ensure literal run has multiple of 8 values + // Take from consecutive repeats if needed to pad up + let literal_padding = (8 - (literal_run_idx % 8)) % 8; + consecutive_repeats -= literal_padding; + literal_run_idx += literal_padding; + } + } + // Too short to RLE, continue to buffer values + } else if consecutive_repeats > 8 { + // Flush literal run, if any, before RLE run + if literal_run_idx > 0 { + T::bitpacked_encode( + writer, + buffered_bits.iter().copied().take(literal_run_idx), + num_bits as usize, + )?; + literal_run_idx = 0; + } + T::run_length_encode(writer, consecutive_repeats, previous_val, num_bits)?; + consecutive_repeats = 1; + buffer_idx = 0; + } else { + // Not enough consecutive repeats to RLE, extend literal run + literal_run_idx = buffer_idx; + consecutive_repeats = 1; + } + // If buffer is full, bit-pack as literal run and reset + if buffer_idx == MAX_VALUES_PER_LITERAL_RUN { + T::bitpacked_encode( + writer, + buffered_bits.iter().copied().take(literal_run_idx), + num_bits as usize, + )?; + // Consecutive repeats may be consolidated into literal run + consecutive_repeats -= buffer_idx - literal_run_idx; + buffer_idx = 0; + literal_run_idx = 0; + } + buffered_bits[buffer_idx] = val; + previous_val = val; + buffer_idx += 1; + } + // Not enough consecutive repeats to RLE, extend literal run + if consecutive_repeats <= 8 { + literal_run_idx = buffer_idx; + consecutive_repeats = 0; + } + if literal_run_idx > 0 { + T::bitpacked_encode( + writer, + buffered_bits.iter().copied().take(literal_run_idx), + num_bits as usize, + )?; + } + if consecutive_repeats > 8 { + T::run_length_encode(writer, consecutive_repeats, previous_val, num_bits)?; + } + Ok(()) } #[cfg(test)] @@ -108,7 +226,7 @@ mod tests { let mut vec = vec![]; - encode_bool(&mut vec, iter)?; + encode::(&mut vec, iter, 1)?; assert_eq!(vec, vec![(2 << 1 | 1), 0b10011101u8, 0b00011101]); @@ -119,9 +237,10 @@ mod tests { fn bool_from_iter() -> std::io::Result<()> { let mut vec = vec![]; - encode_bool( + encode::( &mut vec, vec![true, true, true, true, true, true, true, true].into_iter(), + 1, )?; assert_eq!(vec, vec![(1 << 1 | 1), 0b11111111]); @@ -132,7 +251,7 @@ mod tests { fn test_encode_u32() -> std::io::Result<()> { let mut vec = vec![]; - encode_u32(&mut vec, vec![0, 1, 2, 1, 2, 1, 1, 0, 3].into_iter(), 2)?; + encode::(&mut vec, vec![0, 1, 2, 1, 2, 1, 1, 0, 3].into_iter(), 2)?; assert_eq!( vec, @@ -153,7 +272,7 @@ mod tests { let values = (0..128).map(|x| x % 4); - encode_u32(&mut vec, values, 2)?; + encode::(&mut vec, values, 2)?; let length = 128; let expected = 0b11_10_01_00u8; @@ -170,7 +289,7 @@ mod tests { let values = vec![3, 3, 0, 3, 2, 3, 3, 3, 3, 1, 3, 3, 3, 0, 3].into_iter(); let mut vec = vec![]; - encode_u32(&mut vec, values, 2)?; + encode::(&mut vec, values, 2)?; let expected = vec![5, 207, 254, 247, 51]; assert_eq!(expected, vec); diff --git a/crates/polars-parquet/src/parquet/encoding/hybrid_rle/mod.rs b/crates/polars-parquet/src/parquet/encoding/hybrid_rle/mod.rs index 3dc072552524..89816f87fb54 100644 --- a/crates/polars-parquet/src/parquet/encoding/hybrid_rle/mod.rs +++ b/crates/polars-parquet/src/parquet/encoding/hybrid_rle/mod.rs @@ -4,7 +4,7 @@ mod decoder; mod encoder; pub use bitmap::{encode_bool as bitpacked_encode, BitmapIter}; pub use decoder::Decoder; -pub use encoder::{encode_bool, encode_u32}; +pub use encoder::encode; use polars_utils::iter::FallibleIterator; use super::bitpacked; @@ -137,7 +137,7 @@ mod tests { let data = (0..1000).collect::>(); - encode_u32(&mut buffer, data.iter().cloned(), num_bits).unwrap(); + encode::(&mut buffer, data.iter().cloned(), num_bits).unwrap(); let decoder = HybridRleDecoder::try_new(&buffer, num_bits, data.len())?; diff --git a/crates/polars/tests/it/io/parquet/write/binary.rs b/crates/polars/tests/it/io/parquet/write/binary.rs index 3112f115c3e7..dd4e3a942c46 100644 --- a/crates/polars/tests/it/io/parquet/write/binary.rs +++ b/crates/polars/tests/it/io/parquet/write/binary.rs @@ -1,4 +1,4 @@ -use polars_parquet::parquet::encoding::hybrid_rle::encode_bool; +use polars_parquet::parquet::encoding::hybrid_rle::encode; use polars_parquet::parquet::encoding::Encoding; use polars_parquet::parquet::error::Result; use polars_parquet::parquet::metadata::Descriptor; @@ -25,7 +25,7 @@ fn unzip_option(array: &[Option>]) -> Result<(Vec, Vec)> { false } }); - encode_bool(&mut validity, iter)?; + encode::(&mut validity, iter, 1)?; // write the length, now that it is known let mut validity = validity.into_inner(); diff --git a/crates/polars/tests/it/io/parquet/write/primitive.rs b/crates/polars/tests/it/io/parquet/write/primitive.rs index 3b5ae150896a..e5da32252e99 100644 --- a/crates/polars/tests/it/io/parquet/write/primitive.rs +++ b/crates/polars/tests/it/io/parquet/write/primitive.rs @@ -1,4 +1,4 @@ -use polars_parquet::parquet::encoding::hybrid_rle::encode_bool; +use polars_parquet::parquet::encoding::hybrid_rle::encode; use polars_parquet::parquet::encoding::Encoding; use polars_parquet::parquet::error::Result; use polars_parquet::parquet::metadata::Descriptor; @@ -24,7 +24,7 @@ fn unzip_option(array: &[Option]) -> Result<(Vec, Vec) false } }); - encode_bool(&mut validity, iter)?; + encode::(&mut validity, iter, 1)?; // write the length, now that it is known let mut validity = validity.into_inner(); diff --git a/py-polars/tests/unit/io/test_parquet.py b/py-polars/tests/unit/io/test_parquet.py index 12ac1a835b40..846b4252e548 100644 --- a/py-polars/tests/unit/io/test_parquet.py +++ b/py-polars/tests/unit/io/test_parquet.py @@ -892,3 +892,46 @@ def test_no_glob_windows(tmp_path: Path) -> None: df.write_parquet(str(p2)) assert_frame_equal(pl.scan_parquet(str(p1), glob=False).collect(), df) + + +@pytest.mark.slow() +def test_hybrid_rle() -> None: + df = pl.DataFrame( + { + # Test primitive types + "i64": pl.repeat(int(2**63 - 1), n=10000, dtype=pl.Int64, eager=True), + "u64": pl.repeat(int(2**64 - 1), n=10000, dtype=pl.UInt64, eager=True), + "i8": pl.repeat(-int(2**7 - 1), n=10000, dtype=pl.Int8, eager=True), + "u8": pl.repeat(int(2**8 - 1), n=10000, dtype=pl.UInt8, eager=True), + "string": pl.repeat("a", n=10000, dtype=pl.String, eager=True), + "categorical": pl.Series((["a"] * 9 + ["b"]) * 1000, dtype=pl.Categorical), + # Test filling up bit-packing buffer + "large_bit_pack": ([0] * 5 + [1] * 5) * 1000, + # Test mix of bit-packed and RLE runs + "bit_pack_and_rle": ( + [0] + [1] * 19 + [2] * 8 + [3] * 12 + [4] * 5 + [5] * 5 + ) + * 200, + # Test some null values + "nulls_included": ( + [None] + [1] * 19 + [None] * 8 + [3] * 12 + [4] * 5 + [None] * 5 + ) + * 200, + # Test filling up bit-packing buffer for encode_bool, + # which is only used to encode validities + # Also checks that runs are handled correctly if buffer + # is flushed (at MAX_VALUES_PER_LITERAL_RUN values) + "large_bit_pack_validity": [0, None] * 4092 + + [0] * 9 + + [1] * 9 + + [2] * 10 + + [0] * 1788, + } + ) + f = io.BytesIO() + df.write_parquet(f) + f.seek(0) + for column in pq.ParquetFile(f).metadata.to_dict()["row_groups"][0]["columns"]: + assert "RLE_DICTIONARY" in column["encodings"] + f.seek(0) + assert_frame_equal(pl.read_parquet(f), df)