Skip to content

Commit

Permalink
feat(rust): Add RLE to RLE_DICTIONARY encoder (pola-rs#15959)
Browse files Browse the repository at this point in the history
Co-authored-by: ritchie <[email protected]>
  • Loading branch information
thalassemia and ritchie46 authored May 3, 2024
1 parent dc45fc0 commit 6730a72
Show file tree
Hide file tree
Showing 14 changed files with 274 additions and 136 deletions.
1 change: 1 addition & 0 deletions crates/polars-arrow/src/compute/cast/binary_to.rs
Original file line number Diff line number Diff line change
Expand Up @@ -139,6 +139,7 @@ pub fn binary_to_dictionary<O: Offset, K: DictionaryKey>(
from: &BinaryArray<O>,
) -> PolarsResult<DictionaryArray<K>> {
let mut array = MutableDictionaryArray::<K, MutableBinaryArray<O>>::new();
array.reserve(from.len());
array.try_extend(from.iter())?;

Ok(array.into())
Expand Down
2 changes: 2 additions & 0 deletions crates/polars-arrow/src/compute/cast/binview_to.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ pub(super) fn binview_to_dictionary<K: DictionaryKey>(
from: &BinaryViewArray,
) -> PolarsResult<DictionaryArray<K>> {
let mut array = MutableDictionaryArray::<K, MutableBinaryViewArray<[u8]>>::new();
array.reserve(from.len());
array.try_extend(from.iter())?;

Ok(array.into())
Expand All @@ -30,6 +31,7 @@ pub(super) fn utf8view_to_dictionary<K: DictionaryKey>(
from: &Utf8ViewArray,
) -> PolarsResult<DictionaryArray<K>> {
let mut array = MutableDictionaryArray::<K, MutableBinaryViewArray<str>>::new();
array.reserve(from.len());
array.try_extend(from.iter())?;

Ok(array.into())
Expand Down
1 change: 1 addition & 0 deletions crates/polars-arrow/src/compute/cast/primitive_to.rs
Original file line number Diff line number Diff line change
Expand Up @@ -318,6 +318,7 @@ pub fn primitive_to_dictionary<T: NativeType + Eq + Hash, K: DictionaryKey>(
let mut array = MutableDictionaryArray::<K, _>::try_empty(MutablePrimitiveArray::<T>::from(
from.data_type().clone(),
))?;
array.reserve(from.len());
array.try_extend(iter)?;

Ok(array.into())
Expand Down
1 change: 1 addition & 0 deletions crates/polars-arrow/src/compute/cast/utf8_to.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ pub fn utf8_to_dictionary<O: Offset, K: DictionaryKey>(
from: &Utf8Array<O>,
) -> PolarsResult<DictionaryArray<K>> {
let mut array = MutableDictionaryArray::<K, MutableUtf8Array<O>>::new();
array.reserve(from.len());
array.try_extend(from.iter())?;

Ok(array.into())
Expand Down
2 changes: 1 addition & 1 deletion crates/polars-io/src/parquet/write/writer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,7 @@ where
WriteOptions {
write_statistics: self.statistics,
compression: self.compression,
version: Version::V2,
version: Version::V1,
data_pagesize_limit: self.data_page_size,
}
}
Expand Down
41 changes: 6 additions & 35 deletions crates/polars-parquet/src/arrow/write/dictionary.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
use arrow::array::{Array, BinaryViewArray, DictionaryArray, DictionaryKey, Utf8ViewArray};
use arrow::bitmap::{Bitmap, MutableBitmap};
use arrow::datatypes::{ArrowDataType, IntegerType};
use num_traits::ToPrimitive;
use polars_error::{polars_bail, PolarsResult};

use super::binary::{
Expand All @@ -16,23 +15,19 @@ use super::primitive::{
use super::{binview, nested, Nested, WriteOptions};
use crate::arrow::read::schema::is_nullable;
use crate::arrow::write::{slice_nested_leaf, utils};
use crate::parquet::encoding::hybrid_rle::encode_u32;
use crate::parquet::encoding::hybrid_rle::encode;
use crate::parquet::encoding::Encoding;
use crate::parquet::page::{DictPage, Page};
use crate::parquet::schema::types::PrimitiveType;
use crate::parquet::statistics::{serialize_statistics, ParquetStatistics};
use crate::write::{to_nested, DynIter, ParquetType};
use crate::write::DynIter;

pub(crate) fn encode_as_dictionary_optional(
array: &dyn Array,
nested: &[Nested],
type_: PrimitiveType,
options: WriteOptions,
) -> Option<PolarsResult<DynIter<'static, PolarsResult<Page>>>> {
let nested = to_nested(array, &ParquetType::PrimitiveType(type_.clone()))
.ok()?
.pop()
.unwrap();

let dtype = Box::new(array.data_type().clone());

let len_before = array.len();
Expand All @@ -52,35 +47,11 @@ pub(crate) fn encode_as_dictionary_optional(
if (array.values().len() as f64) / (len_before as f64) > 0.75 {
return None;
}
if array.values().len().to_u16().is_some() {
let array = arrow::compute::cast::cast(
array,
&ArrowDataType::Dictionary(
IntegerType::UInt16,
Box::new(array.values().data_type().clone()),
false,
),
Default::default(),
)
.unwrap();

let array = array
.as_any()
.downcast_ref::<DictionaryArray<u16>>()
.unwrap();
return Some(array_to_pages(
array,
type_,
&nested,
options,
Encoding::RleDictionary,
));
}

Some(array_to_pages(
array,
type_,
&nested,
nested,
options,
Encoding::RleDictionary,
))
Expand Down Expand Up @@ -116,15 +87,15 @@ fn serialize_keys_values<K: DictionaryKey>(
buffer.push(num_bits as u8);

// followed by the encoded indices.
Ok(encode_u32(buffer, keys, num_bits)?)
Ok(encode::<u32, _, _>(buffer, keys, num_bits)?)
} else {
let num_bits = utils::get_bit_width(keys.clone().max().unwrap_or(0) as u64);

// num_bits as a single byte
buffer.push(num_bits as u8);

// followed by the encoded indices.
Ok(encode_u32(buffer, keys, num_bits)?)
Ok(encode::<u32, _, _>(buffer, keys, num_bits)?)
}
}

Expand Down
2 changes: 1 addition & 1 deletion crates/polars-parquet/src/arrow/write/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -219,7 +219,7 @@ pub fn array_to_pages(
// Only take this path for primitive columns
if matches!(nested.first(), Some(Nested::Primitive(_, _, _))) {
if let Some(result) =
encode_as_dictionary_optional(primitive_array, type_.clone(), options)
encode_as_dictionary_optional(primitive_array, nested, type_.clone(), options)
{
return result;
}
Expand Down
10 changes: 5 additions & 5 deletions crates/polars-parquet/src/arrow/write/nested/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ use polars_error::PolarsResult;
pub use rep::num_values;

use super::Nested;
use crate::parquet::encoding::hybrid_rle::encode_u32;
use crate::parquet::encoding::hybrid_rle::encode;
use crate::parquet::read::levels::get_bit_width;
use crate::parquet::write::Version;

Expand Down Expand Up @@ -41,12 +41,12 @@ fn write_rep_levels(buffer: &mut Vec<u8>, nested: &[Nested], version: Version) -
match version {
Version::V1 => {
write_levels_v1(buffer, |buffer: &mut Vec<u8>| {
encode_u32(buffer, levels, num_bits)?;
encode::<u32, _, _>(buffer, levels, num_bits)?;
Ok(())
})?;
},
Version::V2 => {
encode_u32(buffer, levels, num_bits)?;
encode::<u32, _, _>(buffer, levels, num_bits)?;
},
}

Expand All @@ -65,10 +65,10 @@ fn write_def_levels(buffer: &mut Vec<u8>, nested: &[Nested], version: Version) -

match version {
Version::V1 => write_levels_v1(buffer, move |buffer: &mut Vec<u8>| {
encode_u32(buffer, levels, num_bits)?;
encode::<u32, _, _>(buffer, levels, num_bits)?;
Ok(())
}),
Version::V2 => Ok(encode_u32(buffer, levels, num_bits)?),
Version::V2 => Ok(encode::<u32, _, _>(buffer, levels, num_bits)?),
}
}

Expand Down
6 changes: 3 additions & 3 deletions crates/polars-parquet/src/arrow/write/utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ use polars_error::*;

use super::{Version, WriteOptions};
use crate::parquet::compression::CompressionOptions;
use crate::parquet::encoding::hybrid_rle::encode_bool;
use crate::parquet::encoding::hybrid_rle::encode;
use crate::parquet::encoding::Encoding;
use crate::parquet::metadata::Descriptor;
use crate::parquet::page::{DataPage, DataPageHeader, DataPageHeaderV1, DataPageHeaderV2};
Expand All @@ -14,7 +14,7 @@ use crate::parquet::statistics::ParquetStatistics;
fn encode_iter_v1<I: Iterator<Item = bool>>(buffer: &mut Vec<u8>, iter: I) -> PolarsResult<()> {
buffer.extend_from_slice(&[0; 4]);
let start = buffer.len();
encode_bool(buffer, iter)?;
encode::<bool, _, _>(buffer, iter, 1)?;
let end = buffer.len();
let length = end - start;

Expand All @@ -25,7 +25,7 @@ fn encode_iter_v1<I: Iterator<Item = bool>>(buffer: &mut Vec<u8>, iter: I) -> Po
}

fn encode_iter_v2<I: Iterator<Item = bool>>(writer: &mut Vec<u8>, iter: I) -> PolarsResult<()> {
Ok(encode_bool(writer, iter)?)
Ok(encode::<bool, _, _>(writer, iter, 1)?)
}

fn encode_iter<I: Iterator<Item = bool>>(
Expand Down
Loading

0 comments on commit 6730a72

Please sign in to comment.