diff --git a/protos/encodings.proto b/protos/encodings.proto index 6ad252dece..ac66d13dfa 100644 --- a/protos/encodings.proto +++ b/protos/encodings.proto @@ -211,6 +211,11 @@ message BitpackedForNonNeg { Buffer buffer = 3; } +message Bitpack2 { + // the number of bits of the uncompressed value. e.g. for a u32, this will be 32 + uint64 uncompressed_bits_per_value = 2; +} + // An array encoding for shredded structs that will never be null // // There is no actual data in this column. @@ -263,6 +268,7 @@ message ArrayEncoding { FixedSizeBinary fixed_size_binary = 11; BitpackedForNonNeg bitpacked_for_non_neg = 12; Constant constant = 13; + Bitpack2 bitpack2 = 14; } } diff --git a/rust/lance-encoding/Cargo.toml b/rust/lance-encoding/Cargo.toml index 4cc94ae4be..56205f2f98 100644 --- a/rust/lance-encoding/Cargo.toml +++ b/rust/lance-encoding/Cargo.toml @@ -42,6 +42,7 @@ bytemuck = "=1.18.0" arrayref = "0.3.7" paste = "1.0.15" seq-macro = "0.3.5" +byteorder.workspace = true [dev-dependencies] lance-testing.workspace = true diff --git a/rust/lance-encoding/src/decoder.rs b/rust/lance-encoding/src/decoder.rs index 25ff64e923..b6c72a5caa 100644 --- a/rust/lance-encoding/src/decoder.rs +++ b/rust/lance-encoding/src/decoder.rs @@ -246,6 +246,7 @@ use crate::encodings::logical::primitive::{ use crate::encodings::logical::r#struct::{ SimpleStructDecoder, SimpleStructScheduler, StructuralStructDecoder, StructuralStructScheduler, }; +use crate::encodings::physical::bitpack_fastlanes::BitpackMiniBlockDecompressor; use crate::encodings::physical::value::{ConstantDecompressor, ValueDecompressor}; use crate::encodings::physical::{ColumnBuffers, FileBuffers}; use crate::format::pb::{self, column_encoding}; @@ -490,6 +491,9 @@ impl DecompressorStrategy for CoreDecompressorStrategy { pb::array_encoding::ArrayEncoding::Flat(flat) => { Ok(Box::new(ValueDecompressor::new(flat))) } + pb::array_encoding::ArrayEncoding::Bitpack2(description) => { + Ok(Box::new(BitpackMiniBlockDecompressor::new(description))) + } _ => todo!(), } } diff --git a/rust/lance-encoding/src/encoder.rs b/rust/lance-encoding/src/encoder.rs index 837da5769c..3d3e267f2a 100644 --- a/rust/lance-encoding/src/encoder.rs +++ b/rust/lance-encoding/src/encoder.rs @@ -22,8 +22,10 @@ use crate::encodings::logical::blob::BlobFieldEncoder; use crate::encodings::logical::primitive::PrimitiveStructuralEncoder; use crate::encodings::logical::r#struct::StructFieldEncoder; use crate::encodings::logical::r#struct::StructStructuralEncoder; -use crate::encodings::physical::bitpack_fastlanes::compute_compressed_bit_width_for_non_neg; use crate::encodings::physical::bitpack_fastlanes::BitpackedForNonNegArrayEncoder; +use crate::encodings::physical::bitpack_fastlanes::{ + compute_compressed_bit_width_for_non_neg, BitpackMiniBlockEncoder, +}; use crate::encodings::physical::block_compress::{CompressionConfig, CompressionScheme}; use crate::encodings::physical::dictionary::AlreadyDictionaryEncoder; use crate::encodings::physical::fsst::FsstArrayEncoder; @@ -773,9 +775,18 @@ impl CompressionStrategy for CoreArrayEncodingStrategy { fn create_miniblock_compressor( &self, field: &Field, - _data: &DataBlock, + data: &DataBlock, ) -> Result> { assert!(field.data_type().byte_width() > 0); + if let DataBlock::FixedWidth(ref fixed_width_data) = data { + if fixed_width_data.bits_per_value == 8 + || fixed_width_data.bits_per_value == 16 + || fixed_width_data.bits_per_value == 32 + || fixed_width_data.bits_per_value == 64 + { + return Ok(Box::new(BitpackMiniBlockEncoder::default())); + } + } Ok(Box::new(ValueEncoder::default())) } diff --git a/rust/lance-encoding/src/encodings/physical.rs b/rust/lance-encoding/src/encodings/physical.rs index cbedb5dd17..b349513efa 100644 --- a/rust/lance-encoding/src/encodings/physical.rs +++ b/rust/lance-encoding/src/encodings/physical.rs @@ -283,6 +283,7 @@ pub fn decoder_from_array_encoding( pb::array_encoding::ArrayEncoding::Struct(_) => unreachable!(), // 2.1 only pb::array_encoding::ArrayEncoding::Constant(_) => unreachable!(), + pb::array_encoding::ArrayEncoding::Bitpack2(_) => unreachable!(), } } diff --git a/rust/lance-encoding/src/encodings/physical/bitpack_fastlanes.rs b/rust/lance-encoding/src/encodings/physical/bitpack_fastlanes.rs index 8900418e68..17d75fc4a9 100644 --- a/rust/lance-encoding/src/encodings/physical/bitpack_fastlanes.rs +++ b/rust/lance-encoding/src/encodings/physical/bitpack_fastlanes.rs @@ -8,6 +8,7 @@ use arrow::datatypes::{ }; use arrow_array::{Array, PrimitiveArray}; use arrow_schema::DataType; +use byteorder::{ByteOrder, LittleEndian}; use bytes::Bytes; use futures::future::{BoxFuture, FutureExt}; use log::trace; @@ -20,12 +21,16 @@ use crate::buffer::LanceBuffer; use crate::compression_algo::fastlanes::BitPacking; use crate::data::{BlockInfo, UsedEncoding}; use crate::data::{DataBlock, FixedWidthDataBlock, NullableDataBlock}; -use crate::decoder::{PageScheduler, PrimitivePageDecoder}; -use crate::encoder::{ArrayEncoder, EncodedArray}; -use crate::format::ProtobufUtils; +use crate::decoder::{MiniBlockDecompressor, PageScheduler, PrimitivePageDecoder}; +use crate::encoder::{ + ArrayEncoder, EncodedArray, MiniBlockChunk, MiniBlockCompressed, MiniBlockCompressor, +}; +use crate::format::{pb, ProtobufUtils}; +use crate::statistics::{GetStat, Stat}; use arrow::array::ArrayRef; use bytemuck::cast_slice; const ELEMS_PER_CHUNK: u64 = 1024; +const LOG_ELEMS_PER_CHUNK: u8 = 10; // Compute the compressed_bit_width for a given array of integers // todo: compute all statistics before encoding @@ -1560,3 +1565,247 @@ mod tests { // check_round_trip_bitpacked(arr).await; // } } + +// This macro chunks the FixedWidth DataBlock, bitpacks them with 1024 values per chunk, +// it puts the bit-width parameter in front of each chunk, +// and the bit-width parameter has the same bit-width as the uncompressed DataBlock +// for example, if the input DataBlock has `bits_per_value` of `16`, there will be 2 bytes(16 bits) +// in front of each chunk storing the `bit-width` parameter. +macro_rules! chunk_data_impl { + ($data:expr, $data_type:ty) => {{ + let data_buffer = $data.data.borrow_to_typed_slice::<$data_type>(); + let data_buffer = data_buffer.as_ref(); + + let bit_widths = $data + .get_stat(Stat::BitWidth) + .expect("FixedWidthDataBlock should have valid bit width statistics"); + let bit_widths_array = bit_widths + .as_any() + .downcast_ref::>() + .unwrap(); + + let (packed_chunk_sizes, total_size) = bit_widths_array + .values() + .iter() + .map(|&bit_width| { + let chunk_size = ((1024 * bit_width) / $data.bits_per_value) as usize; + (chunk_size, chunk_size + 1) + }) + .fold( + (Vec::with_capacity(bit_widths_array.len()), 0), + |(mut sizes, total), (size, inc)| { + sizes.push(size); + (sizes, total + inc) + }, + ); + + let mut output: Vec<$data_type> = Vec::with_capacity(total_size); + let mut chunks = Vec::with_capacity(bit_widths_array.len()); + + for i in 0..bit_widths_array.len() - 1 { + let start_elem = i * ELEMS_PER_CHUNK as usize; + let bit_width = bit_widths_array.value(i) as $data_type; + output.push(bit_width); + let output_len = output.len(); + unsafe { + output.set_len(output_len + packed_chunk_sizes[i]); + BitPacking::unchecked_pack( + bit_width as usize, + &data_buffer[start_elem..][..ELEMS_PER_CHUNK as usize], + &mut output[output_len..][..packed_chunk_sizes[i]], + ); + } + chunks.push(MiniBlockChunk { + num_bytes: ((1 + packed_chunk_sizes[i]) * std::mem::size_of::<$data_type>()) as u16, + log_num_values: LOG_ELEMS_PER_CHUNK, + }); + } + + // Handle the last chunk + let last_chunk_elem_num = if $data.num_values % ELEMS_PER_CHUNK == 0 { + 1024 + } else { + $data.num_values % ELEMS_PER_CHUNK + }; + let mut last_chunk = vec![0; ELEMS_PER_CHUNK as usize]; + last_chunk[..last_chunk_elem_num as usize].clone_from_slice( + &data_buffer[$data.num_values as usize - last_chunk_elem_num as usize..], + ); + let bit_width = bit_widths_array.value(bit_widths_array.len() - 1) as $data_type; + output.push(bit_width); + let output_len = output.len(); + unsafe { + output.set_len(output_len + packed_chunk_sizes[bit_widths_array.len() - 1]); + BitPacking::unchecked_pack( + bit_width as usize, + &last_chunk, + &mut output[output_len..][..packed_chunk_sizes[bit_widths_array.len() - 1]], + ); + } + chunks.push(MiniBlockChunk { + num_bytes: ((1 + packed_chunk_sizes[bit_widths_array.len() - 1]) + * std::mem::size_of::<$data_type>()) as u16, + log_num_values: 0, + }); + + ( + MiniBlockCompressed { + data: LanceBuffer::reinterpret_vec(output), + chunks, + num_values: $data.num_values, + }, + ProtobufUtils::bitpack2($data.bits_per_value), + ) + }}; +} + +#[derive(Debug, Default)] +pub struct BitpackMiniBlockEncoder {} + +impl BitpackMiniBlockEncoder { + fn chunk_data( + &self, + mut data: FixedWidthDataBlock, + ) -> (MiniBlockCompressed, crate::format::pb::ArrayEncoding) { + assert!(data.bits_per_value % 8 == 0); + match data.bits_per_value { + 8 => chunk_data_impl!(data, u8), + 16 => chunk_data_impl!(data, u16), + 32 => chunk_data_impl!(data, u32), + 64 => chunk_data_impl!(data, u64), + _ => unreachable!(), + } + } +} + +impl MiniBlockCompressor for BitpackMiniBlockEncoder { + fn compress( + &self, + chunk: DataBlock, + ) -> Result<(MiniBlockCompressed, crate::format::pb::ArrayEncoding)> { + match chunk { + DataBlock::FixedWidth(fixed_width) => Ok(self.chunk_data(fixed_width)), + _ => Err(Error::InvalidInput { + source: format!( + "Cannot compress a data block of type {} with BitpackMiniBlockEncoder", + chunk.name() + ) + .into(), + location: location!(), + }), + } + } +} + +/// A decompressor for fixed-width data that has +/// been written, as-is, to disk in single contiguous array +#[derive(Debug)] +pub struct BitpackMiniBlockDecompressor { + uncompressed_bit_width: u64, +} + +impl BitpackMiniBlockDecompressor { + pub fn new(description: &pb::Bitpack2) -> Self { + Self { + uncompressed_bit_width: description.uncompressed_bits_per_value, + } + } +} + +impl MiniBlockDecompressor for BitpackMiniBlockDecompressor { + fn decompress(&self, data: LanceBuffer, num_values: u64) -> Result { + assert!(data.len() >= 8); + assert!(num_values <= ELEMS_PER_CHUNK); + + // This macro decompresses a chunk(1024 values) of bitpacked values. + macro_rules! decompress_impl { + ($type:ty) => {{ + let uncompressed_bit_width = std::mem::size_of::<$type>() * 8; + let mut decompressed = vec![0 as $type; ELEMS_PER_CHUNK as usize]; + + // Copy for memory alignment + let chunk_in_u8: Vec = data.to_vec(); + let bit_width_bytes = &chunk_in_u8[..std::mem::size_of::<$type>()]; + let bit_width_value = LittleEndian::read_uint(bit_width_bytes, std::mem::size_of::<$type>()); + let chunk = cast_slice(&chunk_in_u8[std::mem::size_of::<$type>()..]); + + // The bit-packed chunk should have number of bytes (bit_width_value * ELEMS_PER_CHUNK / 8) + assert!(chunk.len() * std::mem::size_of::<$type>() == (bit_width_value * ELEMS_PER_CHUNK as u64) as usize / 8); + + unsafe { + BitPacking::unchecked_unpack( + bit_width_value as usize, + chunk, + &mut decompressed, + ); + } + + decompressed.shrink_to(num_values as usize); + Ok(DataBlock::FixedWidth(FixedWidthDataBlock { + data: LanceBuffer::reinterpret_vec(decompressed), + bits_per_value: uncompressed_bit_width as u64, + num_values, + block_info: BlockInfo::new(), + used_encoding: UsedEncoding::new(), + })) + }}; + } + + match self.uncompressed_bit_width { + 8 => decompress_impl!(u8), + 16 => decompress_impl!(u16), + 32 => decompress_impl!(u32), + 64 => decompress_impl!(u64), + _ => todo!(), + } + } +} + +#[cfg(test)] +mod test { + use std::{collections::HashMap, sync::Arc}; + + use arrow_array::{Int64Array, Int8Array}; + + use arrow_schema::DataType; + + use arrow_array::Array; + + use crate::{ + testing::{check_round_trip_encoding_of_data, TestCases}, + version::LanceFileVersion, + }; + + #[test_log::test(tokio::test)] + async fn test_miniblock_bitpack() { + let test_cases = TestCases::default().with_file_version(LanceFileVersion::V2_1); + + let arrays = vec![ + Arc::new(Int8Array::from(vec![100; 1024])) as Arc, + Arc::new(Int8Array::from(vec![1; 1024])) as Arc, + Arc::new(Int8Array::from(vec![16; 1024])) as Arc, + Arc::new(Int8Array::from(vec![-1; 1024])) as Arc, + Arc::new(Int8Array::from(vec![5; 1])) as Arc, + ]; + check_round_trip_encoding_of_data(arrays, &test_cases, HashMap::new()).await; + + for data_type in [DataType::Int16, DataType::Int32, DataType::Int64] { + let int64_arrays = vec![ + Int64Array::from(vec![3; 1024]), + Int64Array::from(vec![8; 1024]), + Int64Array::from(vec![16; 1024]), + Int64Array::from(vec![100; 1024]), + Int64Array::from(vec![512; 1024]), + Int64Array::from(vec![1000; 1024]), + Int64Array::from(vec![2000; 1024]), + Int64Array::from(vec![-1; 10]), + ]; + + let mut arrays = vec![]; + for int64_array in int64_arrays { + arrays.push(arrow_cast::cast(&int64_array, &data_type).unwrap()); + } + check_round_trip_encoding_of_data(arrays, &test_cases, HashMap::new()).await; + } + } +} diff --git a/rust/lance-encoding/src/format.rs b/rust/lance-encoding/src/format.rs index 647e683b86..ba66b15dc8 100644 --- a/rust/lance-encoding/src/format.rs +++ b/rust/lance-encoding/src/format.rs @@ -19,7 +19,7 @@ use pb::{ buffer::BufferType, nullable::{AllNull, NoNull, Nullability, SomeNull}, page_layout::Layout, - AllNullLayout, ArrayEncoding, Binary, Bitpacked, BitpackedForNonNeg, Dictionary, + AllNullLayout, ArrayEncoding, Binary, Bitpack2, Bitpacked, BitpackedForNonNeg, Dictionary, FixedSizeBinary, FixedSizeList, Flat, Fsst, MiniBlockLayout, Nullable, PackedStruct, PageLayout, }; @@ -125,6 +125,13 @@ impl ProtobufUtils { })), } } + pub fn bitpack2(uncompressed_bits_per_value: u64) -> ArrayEncoding { + ArrayEncoding { + array_encoding: Some(ArrayEncodingEnum::Bitpack2(Bitpack2 { + uncompressed_bits_per_value, + })), + } + } pub fn packed_struct( child_encodings: Vec,