Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: bitpack with miniblock #3067

Merged
merged 12 commits into from
Oct 31, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions protos/encodings.proto
Original file line number Diff line number Diff line change
Expand Up @@ -211,6 +211,11 @@ message BitpackedForNonNeg {
Buffer buffer = 3;
}

message Bitpack2 {
// the number of bits of the uncompressed value. e.g. for a u32, this will be 32
uint64 uncompressed_bits_per_value = 2;
}

// An array encoding for shredded structs that will never be null
//
// There is no actual data in this column.
Expand Down Expand Up @@ -263,6 +268,7 @@ message ArrayEncoding {
FixedSizeBinary fixed_size_binary = 11;
BitpackedForNonNeg bitpacked_for_non_neg = 12;
Constant constant = 13;
Bitpack2 bitpack2 = 14;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We don't need it for this PR but I'm thinking of putting the new 2.1 encodings in message Compression { } instead of ArrayEncoding. On the plus side, we can probably name it bitpack and not bitpack2 once we do that. Let's save for future work though.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

gotcha, good idea.

}
}

Expand Down
1 change: 1 addition & 0 deletions rust/lance-encoding/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ bytemuck = "=1.18.0"
arrayref = "0.3.7"
paste = "1.0.15"
seq-macro = "0.3.5"
byteorder.workspace = true

[dev-dependencies]
lance-testing.workspace = true
Expand Down
4 changes: 4 additions & 0 deletions rust/lance-encoding/src/decoder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -246,6 +246,7 @@ use crate::encodings::logical::primitive::{
use crate::encodings::logical::r#struct::{
SimpleStructDecoder, SimpleStructScheduler, StructuralStructDecoder, StructuralStructScheduler,
};
use crate::encodings::physical::bitpack_fastlanes::BitpackMiniBlockDecompressor;
use crate::encodings::physical::value::{ConstantDecompressor, ValueDecompressor};
use crate::encodings::physical::{ColumnBuffers, FileBuffers};
use crate::format::pb::{self, column_encoding};
Expand Down Expand Up @@ -490,6 +491,9 @@ impl DecompressorStrategy for CoreDecompressorStrategy {
pb::array_encoding::ArrayEncoding::Flat(flat) => {
Ok(Box::new(ValueDecompressor::new(flat)))
}
pb::array_encoding::ArrayEncoding::Bitpack2(description) => {
Ok(Box::new(BitpackMiniBlockDecompressor::new(description)))
}
_ => todo!(),
}
}
Expand Down
15 changes: 13 additions & 2 deletions rust/lance-encoding/src/encoder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,10 @@ use crate::encodings::logical::blob::BlobFieldEncoder;
use crate::encodings::logical::primitive::PrimitiveStructuralEncoder;
use crate::encodings::logical::r#struct::StructFieldEncoder;
use crate::encodings::logical::r#struct::StructStructuralEncoder;
use crate::encodings::physical::bitpack_fastlanes::compute_compressed_bit_width_for_non_neg;
use crate::encodings::physical::bitpack_fastlanes::BitpackedForNonNegArrayEncoder;
use crate::encodings::physical::bitpack_fastlanes::{
compute_compressed_bit_width_for_non_neg, BitpackMiniBlockEncoder,
};
use crate::encodings::physical::block_compress::{CompressionConfig, CompressionScheme};
use crate::encodings::physical::dictionary::AlreadyDictionaryEncoder;
use crate::encodings::physical::fsst::FsstArrayEncoder;
Expand Down Expand Up @@ -773,9 +775,18 @@ impl CompressionStrategy for CoreArrayEncodingStrategy {
fn create_miniblock_compressor(
&self,
field: &Field,
_data: &DataBlock,
data: &DataBlock,
) -> Result<Box<dyn MiniBlockCompressor>> {
assert!(field.data_type().byte_width() > 0);
if let DataBlock::FixedWidth(ref fixed_width_data) = data {
if fixed_width_data.bits_per_value == 8
|| fixed_width_data.bits_per_value == 16
|| fixed_width_data.bits_per_value == 32
|| fixed_width_data.bits_per_value == 64
{
return Ok(Box::new(BitpackMiniBlockEncoder::default()));
}
}
Ok(Box::new(ValueEncoder::default()))
}

Expand Down
1 change: 1 addition & 0 deletions rust/lance-encoding/src/encodings/physical.rs
Original file line number Diff line number Diff line change
Expand Up @@ -283,6 +283,7 @@ pub fn decoder_from_array_encoding(
pb::array_encoding::ArrayEncoding::Struct(_) => unreachable!(),
// 2.1 only
pb::array_encoding::ArrayEncoding::Constant(_) => unreachable!(),
pb::array_encoding::ArrayEncoding::Bitpack2(_) => unreachable!(),
}
}

Expand Down
255 changes: 252 additions & 3 deletions rust/lance-encoding/src/encodings/physical/bitpack_fastlanes.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ use arrow::datatypes::{
};
use arrow_array::{Array, PrimitiveArray};
use arrow_schema::DataType;
use byteorder::{ByteOrder, LittleEndian};
use bytes::Bytes;
use futures::future::{BoxFuture, FutureExt};
use log::trace;
Expand All @@ -20,12 +21,16 @@ use crate::buffer::LanceBuffer;
use crate::compression_algo::fastlanes::BitPacking;
use crate::data::{BlockInfo, UsedEncoding};
use crate::data::{DataBlock, FixedWidthDataBlock, NullableDataBlock};
use crate::decoder::{PageScheduler, PrimitivePageDecoder};
use crate::encoder::{ArrayEncoder, EncodedArray};
use crate::format::ProtobufUtils;
use crate::decoder::{MiniBlockDecompressor, PageScheduler, PrimitivePageDecoder};
use crate::encoder::{
ArrayEncoder, EncodedArray, MiniBlockChunk, MiniBlockCompressed, MiniBlockCompressor,
};
use crate::format::{pb, ProtobufUtils};
use crate::statistics::{GetStat, Stat};
use arrow::array::ArrayRef;
use bytemuck::cast_slice;
const ELEMS_PER_CHUNK: u64 = 1024;
const LOG_ELEMS_PER_CHUNK: u8 = 10;

// Compute the compressed_bit_width for a given array of integers
// todo: compute all statistics before encoding
Expand Down Expand Up @@ -1560,3 +1565,247 @@ mod tests {
// check_round_trip_bitpacked(arr).await;
// }
}

// This macro chunks the FixedWidth DataBlock, bitpacks them with 1024 values per chunk,
// it puts the bit-width parameter in front of each chunk,
// and the bit-width parameter has the same bit-width as the uncompressed DataBlock
// for example, if the input DataBlock has `bits_per_value` of `16`, there will be 2 bytes(16 bits)
// in front of each chunk storing the `bit-width` parameter.
Comment on lines +1571 to +1573
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why not just always use 1 byte?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yeah, this is related to the type of output vector, it's type is the same as input data type(to satisfy signature of Bitpack::unchecked_pack), so when we are filling in bit-width into output, it's type is same as input data type

Copy link
Contributor Author

@broccoliSpicy broccoliSpicy Oct 30, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think for fastlanes bitpack compression to work, its start output buffer position must be aligned to the same input data type alignment.
I will do same experiments to test it.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it will panic to squeeze in a u8 into a vector of u64 and then treat the rest of vector as u64, because the alignment requirement doesn't hold anymore.

macro_rules! chunk_data_impl {
($data:expr, $data_type:ty) => {{
let data_buffer = $data.data.borrow_to_typed_slice::<$data_type>();
let data_buffer = data_buffer.as_ref();

let bit_widths = $data
.get_stat(Stat::BitWidth)
.expect("FixedWidthDataBlock should have valid bit width statistics");
let bit_widths_array = bit_widths
.as_any()
.downcast_ref::<PrimitiveArray<UInt64Type>>()
.unwrap();

let (packed_chunk_sizes, total_size) = bit_widths_array
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Are chunk_size and total_size a "number of values" measurement or a "number of bytes" measurement?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

packed_chunk_sizes and total_size are "number of values" measurement, it needs to be this way because the function signature Bitpack::unchecked_pack requires the output slice type to be the same as the input slice, so when we use total_size to

      let mut output: Vec<$data_type> = Vec::with_capacity(total_size);

the output size is total_size with number of bytes of total_size * sizeof<each element>

.values()
.iter()
.map(|&bit_width| {
let chunk_size = ((1024 * bit_width) / $data.bits_per_value) as usize;
(chunk_size, chunk_size + 1)
})
.fold(
(Vec::with_capacity(bit_widths_array.len()), 0),
|(mut sizes, total), (size, inc)| {
sizes.push(size);
(sizes, total + inc)
},
);

let mut output: Vec<$data_type> = Vec::with_capacity(total_size);
let mut chunks = Vec::with_capacity(bit_widths_array.len());

for i in 0..bit_widths_array.len() - 1 {
let start_elem = i * ELEMS_PER_CHUNK as usize;
let bit_width = bit_widths_array.value(i) as $data_type;
output.push(bit_width);
let output_len = output.len();
unsafe {
output.set_len(output_len + packed_chunk_sizes[i]);
BitPacking::unchecked_pack(
bit_width as usize,
&data_buffer[start_elem..][..ELEMS_PER_CHUNK as usize],
&mut output[output_len..][..packed_chunk_sizes[i]],
);
}
chunks.push(MiniBlockChunk {
num_bytes: ((1 + packed_chunk_sizes[i]) * std::mem::size_of::<$data_type>()) as u16,
log_num_values: LOG_ELEMS_PER_CHUNK,
});
}

// Handle the last chunk
let last_chunk_elem_num = if $data.num_values % ELEMS_PER_CHUNK == 0 {
1024
} else {
$data.num_values % ELEMS_PER_CHUNK
};
let mut last_chunk = vec![0; ELEMS_PER_CHUNK as usize];
last_chunk[..last_chunk_elem_num as usize].clone_from_slice(
&data_buffer[$data.num_values as usize - last_chunk_elem_num as usize..],
);
let bit_width = bit_widths_array.value(bit_widths_array.len() - 1) as $data_type;
output.push(bit_width);
let output_len = output.len();
unsafe {
output.set_len(output_len + packed_chunk_sizes[bit_widths_array.len() - 1]);
BitPacking::unchecked_pack(
bit_width as usize,
&last_chunk,
&mut output[output_len..][..packed_chunk_sizes[bit_widths_array.len() - 1]],
);
}
chunks.push(MiniBlockChunk {
num_bytes: ((1 + packed_chunk_sizes[bit_widths_array.len() - 1])
* std::mem::size_of::<$data_type>()) as u16,
log_num_values: 0,
});

(
MiniBlockCompressed {
data: LanceBuffer::reinterpret_vec(output),
chunks,
num_values: $data.num_values,
},
ProtobufUtils::bitpack2($data.bits_per_value),
)
}};
}

#[derive(Debug, Default)]
pub struct BitpackMiniBlockEncoder {}

impl BitpackMiniBlockEncoder {
fn chunk_data(
&self,
mut data: FixedWidthDataBlock,
) -> (MiniBlockCompressed, crate::format::pb::ArrayEncoding) {
assert!(data.bits_per_value % 8 == 0);
match data.bits_per_value {
8 => chunk_data_impl!(data, u8),
16 => chunk_data_impl!(data, u16),
32 => chunk_data_impl!(data, u32),
64 => chunk_data_impl!(data, u64),
_ => unreachable!(),
}
}
}

impl MiniBlockCompressor for BitpackMiniBlockEncoder {
fn compress(
&self,
chunk: DataBlock,
) -> Result<(MiniBlockCompressed, crate::format::pb::ArrayEncoding)> {
match chunk {
DataBlock::FixedWidth(fixed_width) => Ok(self.chunk_data(fixed_width)),
_ => Err(Error::InvalidInput {
source: format!(
"Cannot compress a data block of type {} with BitpackMiniBlockEncoder",
chunk.name()
)
.into(),
location: location!(),
}),
}
}
}

/// A decompressor for fixed-width data that has
/// been written, as-is, to disk in single contiguous array
#[derive(Debug)]
pub struct BitpackMiniBlockDecompressor {
uncompressed_bit_width: u64,
}

impl BitpackMiniBlockDecompressor {
pub fn new(description: &pb::Bitpack2) -> Self {
Self {
uncompressed_bit_width: description.uncompressed_bits_per_value,
}
}
}

impl MiniBlockDecompressor for BitpackMiniBlockDecompressor {
fn decompress(&self, data: LanceBuffer, num_values: u64) -> Result<DataBlock> {
assert!(data.len() >= 8);
assert!(num_values <= ELEMS_PER_CHUNK);

// This macro decompresses a chunk(1024 values) of bitpacked values.
macro_rules! decompress_impl {
($type:ty) => {{
let uncompressed_bit_width = std::mem::size_of::<$type>() * 8;
let mut decompressed = vec![0 as $type; ELEMS_PER_CHUNK as usize];

// Copy for memory alignment
let chunk_in_u8: Vec<u8> = data.to_vec();
Comment on lines +1726 to +1727
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see the memory alignment concern now. Let's not worry about it now. In the future to fix this I think we need...

  • Each page must be aligned (preferably to something largish like 64 bytes)
  • The miniblock layout is responsible for ensuring that the mini-block buffer (that we get from the compressor) is either at the start of the page or is padded to sufficient alignment (again, can just do 64)
  • Each miniblock chunk should be aligned. The compressor can return a preferred alignment (we want to be more conservative here and use the smallest alignment we can) and the layout can be responsible for doing the actual padding.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

here, I need to cast the raw data fetched from memory into slice of u8, u16, u32, or u64 based on their uncompressed_bit_width, to make this cast successful, I need to make sure the data being cast has the alignment requirement that a slice of u8, u16, u32, u64 needs(multiple of 8 bytes).
the bit-packed chunk itself guarantees that(it's a multiple of 1024 bits(128 bytes), if mini-chunk layout can guarantee that, then this copy may be eliminated

let bit_width_bytes = &chunk_in_u8[..std::mem::size_of::<$type>()];
let bit_width_value = LittleEndian::read_uint(bit_width_bytes, std::mem::size_of::<$type>());
let chunk = cast_slice(&chunk_in_u8[std::mem::size_of::<$type>()..]);

// The bit-packed chunk should have number of bytes (bit_width_value * ELEMS_PER_CHUNK / 8)
assert!(chunk.len() * std::mem::size_of::<$type>() == (bit_width_value * ELEMS_PER_CHUNK as u64) as usize / 8);

unsafe {
BitPacking::unchecked_unpack(
bit_width_value as usize,
chunk,
&mut decompressed,
);
}

decompressed.shrink_to(num_values as usize);
Ok(DataBlock::FixedWidth(FixedWidthDataBlock {
data: LanceBuffer::reinterpret_vec(decompressed),
bits_per_value: uncompressed_bit_width as u64,
num_values,
block_info: BlockInfo::new(),
used_encoding: UsedEncoding::new(),
}))
}};
}

match self.uncompressed_bit_width {
8 => decompress_impl!(u8),
16 => decompress_impl!(u16),
32 => decompress_impl!(u32),
64 => decompress_impl!(u64),
_ => todo!(),
}
}
}

#[cfg(test)]
mod test {
use std::{collections::HashMap, sync::Arc};

use arrow_array::{Int64Array, Int8Array};

use arrow_schema::DataType;

use arrow_array::Array;

use crate::{
testing::{check_round_trip_encoding_of_data, TestCases},
version::LanceFileVersion,
};

#[test_log::test(tokio::test)]
async fn test_miniblock_bitpack() {
let test_cases = TestCases::default().with_file_version(LanceFileVersion::V2_1);

let arrays = vec![
Arc::new(Int8Array::from(vec![100; 1024])) as Arc<dyn Array>,
Arc::new(Int8Array::from(vec![1; 1024])) as Arc<dyn Array>,
Arc::new(Int8Array::from(vec![16; 1024])) as Arc<dyn Array>,
Arc::new(Int8Array::from(vec![-1; 1024])) as Arc<dyn Array>,
Arc::new(Int8Array::from(vec![5; 1])) as Arc<dyn Array>,
];
check_round_trip_encoding_of_data(arrays, &test_cases, HashMap::new()).await;

for data_type in [DataType::Int16, DataType::Int32, DataType::Int64] {
let int64_arrays = vec![
Int64Array::from(vec![3; 1024]),
Int64Array::from(vec![8; 1024]),
Int64Array::from(vec![16; 1024]),
Int64Array::from(vec![100; 1024]),
Int64Array::from(vec![512; 1024]),
Int64Array::from(vec![1000; 1024]),
Int64Array::from(vec![2000; 1024]),
Int64Array::from(vec![-1; 10]),
];

let mut arrays = vec![];
for int64_array in int64_arrays {
arrays.push(arrow_cast::cast(&int64_array, &data_type).unwrap());
}
check_round_trip_encoding_of_data(arrays, &test_cases, HashMap::new()).await;
}
}
}
9 changes: 8 additions & 1 deletion rust/lance-encoding/src/format.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ use pb::{
buffer::BufferType,
nullable::{AllNull, NoNull, Nullability, SomeNull},
page_layout::Layout,
AllNullLayout, ArrayEncoding, Binary, Bitpacked, BitpackedForNonNeg, Dictionary,
AllNullLayout, ArrayEncoding, Binary, Bitpack2, Bitpacked, BitpackedForNonNeg, Dictionary,
FixedSizeBinary, FixedSizeList, Flat, Fsst, MiniBlockLayout, Nullable, PackedStruct,
PageLayout,
};
Expand Down Expand Up @@ -125,6 +125,13 @@ impl ProtobufUtils {
})),
}
}
pub fn bitpack2(uncompressed_bits_per_value: u64) -> ArrayEncoding {
ArrayEncoding {
array_encoding: Some(ArrayEncodingEnum::Bitpack2(Bitpack2 {
uncompressed_bits_per_value,
})),
}
}

pub fn packed_struct(
child_encodings: Vec<ArrayEncoding>,
Expand Down
Loading