-
Notifications
You must be signed in to change notification settings - Fork 245
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat: bitpack with miniblock #3067
Changes from all commits
964fee3
30d7f51
cc6e468
1f15274
22159e9
9f5dc6d
1ad38c1
edae11a
0f688fd
ca96e21
553e124
dcabd0f
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -8,6 +8,7 @@ use arrow::datatypes::{ | |
}; | ||
use arrow_array::{Array, PrimitiveArray}; | ||
use arrow_schema::DataType; | ||
use byteorder::{ByteOrder, LittleEndian}; | ||
use bytes::Bytes; | ||
use futures::future::{BoxFuture, FutureExt}; | ||
use log::trace; | ||
|
@@ -20,12 +21,16 @@ use crate::buffer::LanceBuffer; | |
use crate::compression_algo::fastlanes::BitPacking; | ||
use crate::data::{BlockInfo, UsedEncoding}; | ||
use crate::data::{DataBlock, FixedWidthDataBlock, NullableDataBlock}; | ||
use crate::decoder::{PageScheduler, PrimitivePageDecoder}; | ||
use crate::encoder::{ArrayEncoder, EncodedArray}; | ||
use crate::format::ProtobufUtils; | ||
use crate::decoder::{MiniBlockDecompressor, PageScheduler, PrimitivePageDecoder}; | ||
use crate::encoder::{ | ||
ArrayEncoder, EncodedArray, MiniBlockChunk, MiniBlockCompressed, MiniBlockCompressor, | ||
}; | ||
use crate::format::{pb, ProtobufUtils}; | ||
use crate::statistics::{GetStat, Stat}; | ||
use arrow::array::ArrayRef; | ||
use bytemuck::cast_slice; | ||
const ELEMS_PER_CHUNK: u64 = 1024; | ||
const LOG_ELEMS_PER_CHUNK: u8 = 10; | ||
|
||
// Compute the compressed_bit_width for a given array of integers | ||
// todo: compute all statistics before encoding | ||
|
@@ -1560,3 +1565,247 @@ mod tests { | |
// check_round_trip_bitpacked(arr).await; | ||
// } | ||
} | ||
|
||
// This macro chunks the FixedWidth DataBlock, bitpacks them with 1024 values per chunk, | ||
// it puts the bit-width parameter in front of each chunk, | ||
// and the bit-width parameter has the same bit-width as the uncompressed DataBlock | ||
// for example, if the input DataBlock has `bits_per_value` of `16`, there will be 2 bytes(16 bits) | ||
// in front of each chunk storing the `bit-width` parameter. | ||
Comment on lines
+1571
to
+1573
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Why not just always use 1 byte? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. yeah, this is related to the type of There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think for fastlanes bitpack compression to work, its start output buffer position must be aligned to the same input data type alignment. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think it will panic to squeeze in a |
||
macro_rules! chunk_data_impl { | ||
($data:expr, $data_type:ty) => {{ | ||
let data_buffer = $data.data.borrow_to_typed_slice::<$data_type>(); | ||
let data_buffer = data_buffer.as_ref(); | ||
|
||
let bit_widths = $data | ||
.get_stat(Stat::BitWidth) | ||
.expect("FixedWidthDataBlock should have valid bit width statistics"); | ||
let bit_widths_array = bit_widths | ||
.as_any() | ||
.downcast_ref::<PrimitiveArray<UInt64Type>>() | ||
.unwrap(); | ||
|
||
let (packed_chunk_sizes, total_size) = bit_widths_array | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Are There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. packed_chunk_sizes and total_size are "number of values" measurement, it needs to be this way because the function signature
the output size is |
||
.values() | ||
.iter() | ||
.map(|&bit_width| { | ||
let chunk_size = ((1024 * bit_width) / $data.bits_per_value) as usize; | ||
(chunk_size, chunk_size + 1) | ||
}) | ||
.fold( | ||
(Vec::with_capacity(bit_widths_array.len()), 0), | ||
|(mut sizes, total), (size, inc)| { | ||
sizes.push(size); | ||
(sizes, total + inc) | ||
}, | ||
); | ||
|
||
let mut output: Vec<$data_type> = Vec::with_capacity(total_size); | ||
let mut chunks = Vec::with_capacity(bit_widths_array.len()); | ||
|
||
for i in 0..bit_widths_array.len() - 1 { | ||
let start_elem = i * ELEMS_PER_CHUNK as usize; | ||
let bit_width = bit_widths_array.value(i) as $data_type; | ||
output.push(bit_width); | ||
let output_len = output.len(); | ||
unsafe { | ||
output.set_len(output_len + packed_chunk_sizes[i]); | ||
BitPacking::unchecked_pack( | ||
bit_width as usize, | ||
&data_buffer[start_elem..][..ELEMS_PER_CHUNK as usize], | ||
&mut output[output_len..][..packed_chunk_sizes[i]], | ||
); | ||
} | ||
chunks.push(MiniBlockChunk { | ||
num_bytes: ((1 + packed_chunk_sizes[i]) * std::mem::size_of::<$data_type>()) as u16, | ||
log_num_values: LOG_ELEMS_PER_CHUNK, | ||
}); | ||
} | ||
|
||
// Handle the last chunk | ||
let last_chunk_elem_num = if $data.num_values % ELEMS_PER_CHUNK == 0 { | ||
1024 | ||
} else { | ||
$data.num_values % ELEMS_PER_CHUNK | ||
}; | ||
let mut last_chunk = vec![0; ELEMS_PER_CHUNK as usize]; | ||
last_chunk[..last_chunk_elem_num as usize].clone_from_slice( | ||
&data_buffer[$data.num_values as usize - last_chunk_elem_num as usize..], | ||
); | ||
let bit_width = bit_widths_array.value(bit_widths_array.len() - 1) as $data_type; | ||
output.push(bit_width); | ||
let output_len = output.len(); | ||
unsafe { | ||
output.set_len(output_len + packed_chunk_sizes[bit_widths_array.len() - 1]); | ||
BitPacking::unchecked_pack( | ||
bit_width as usize, | ||
&last_chunk, | ||
&mut output[output_len..][..packed_chunk_sizes[bit_widths_array.len() - 1]], | ||
); | ||
} | ||
chunks.push(MiniBlockChunk { | ||
num_bytes: ((1 + packed_chunk_sizes[bit_widths_array.len() - 1]) | ||
* std::mem::size_of::<$data_type>()) as u16, | ||
log_num_values: 0, | ||
}); | ||
|
||
( | ||
MiniBlockCompressed { | ||
data: LanceBuffer::reinterpret_vec(output), | ||
chunks, | ||
num_values: $data.num_values, | ||
}, | ||
ProtobufUtils::bitpack2($data.bits_per_value), | ||
) | ||
}}; | ||
} | ||
|
||
#[derive(Debug, Default)] | ||
pub struct BitpackMiniBlockEncoder {} | ||
|
||
impl BitpackMiniBlockEncoder { | ||
fn chunk_data( | ||
&self, | ||
mut data: FixedWidthDataBlock, | ||
) -> (MiniBlockCompressed, crate::format::pb::ArrayEncoding) { | ||
assert!(data.bits_per_value % 8 == 0); | ||
match data.bits_per_value { | ||
8 => chunk_data_impl!(data, u8), | ||
16 => chunk_data_impl!(data, u16), | ||
32 => chunk_data_impl!(data, u32), | ||
64 => chunk_data_impl!(data, u64), | ||
_ => unreachable!(), | ||
} | ||
} | ||
} | ||
|
||
impl MiniBlockCompressor for BitpackMiniBlockEncoder { | ||
fn compress( | ||
&self, | ||
chunk: DataBlock, | ||
) -> Result<(MiniBlockCompressed, crate::format::pb::ArrayEncoding)> { | ||
match chunk { | ||
DataBlock::FixedWidth(fixed_width) => Ok(self.chunk_data(fixed_width)), | ||
_ => Err(Error::InvalidInput { | ||
source: format!( | ||
"Cannot compress a data block of type {} with BitpackMiniBlockEncoder", | ||
chunk.name() | ||
) | ||
.into(), | ||
location: location!(), | ||
}), | ||
} | ||
} | ||
} | ||
|
||
/// A decompressor for fixed-width data that has | ||
/// been written, as-is, to disk in single contiguous array | ||
#[derive(Debug)] | ||
pub struct BitpackMiniBlockDecompressor { | ||
uncompressed_bit_width: u64, | ||
} | ||
|
||
impl BitpackMiniBlockDecompressor { | ||
pub fn new(description: &pb::Bitpack2) -> Self { | ||
Self { | ||
uncompressed_bit_width: description.uncompressed_bits_per_value, | ||
} | ||
} | ||
} | ||
|
||
impl MiniBlockDecompressor for BitpackMiniBlockDecompressor { | ||
fn decompress(&self, data: LanceBuffer, num_values: u64) -> Result<DataBlock> { | ||
assert!(data.len() >= 8); | ||
assert!(num_values <= ELEMS_PER_CHUNK); | ||
|
||
// This macro decompresses a chunk(1024 values) of bitpacked values. | ||
macro_rules! decompress_impl { | ||
($type:ty) => {{ | ||
let uncompressed_bit_width = std::mem::size_of::<$type>() * 8; | ||
let mut decompressed = vec![0 as $type; ELEMS_PER_CHUNK as usize]; | ||
|
||
// Copy for memory alignment | ||
let chunk_in_u8: Vec<u8> = data.to_vec(); | ||
Comment on lines
+1726
to
+1727
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I see the memory alignment concern now. Let's not worry about it now. In the future to fix this I think we need...
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. here, I need to cast the raw data fetched from memory into slice of |
||
let bit_width_bytes = &chunk_in_u8[..std::mem::size_of::<$type>()]; | ||
let bit_width_value = LittleEndian::read_uint(bit_width_bytes, std::mem::size_of::<$type>()); | ||
let chunk = cast_slice(&chunk_in_u8[std::mem::size_of::<$type>()..]); | ||
|
||
// The bit-packed chunk should have number of bytes (bit_width_value * ELEMS_PER_CHUNK / 8) | ||
assert!(chunk.len() * std::mem::size_of::<$type>() == (bit_width_value * ELEMS_PER_CHUNK as u64) as usize / 8); | ||
|
||
unsafe { | ||
BitPacking::unchecked_unpack( | ||
bit_width_value as usize, | ||
chunk, | ||
&mut decompressed, | ||
); | ||
} | ||
|
||
decompressed.shrink_to(num_values as usize); | ||
Ok(DataBlock::FixedWidth(FixedWidthDataBlock { | ||
data: LanceBuffer::reinterpret_vec(decompressed), | ||
bits_per_value: uncompressed_bit_width as u64, | ||
num_values, | ||
block_info: BlockInfo::new(), | ||
used_encoding: UsedEncoding::new(), | ||
})) | ||
}}; | ||
} | ||
|
||
match self.uncompressed_bit_width { | ||
8 => decompress_impl!(u8), | ||
16 => decompress_impl!(u16), | ||
32 => decompress_impl!(u32), | ||
64 => decompress_impl!(u64), | ||
_ => todo!(), | ||
} | ||
} | ||
} | ||
|
||
#[cfg(test)] | ||
mod test { | ||
use std::{collections::HashMap, sync::Arc}; | ||
|
||
use arrow_array::{Int64Array, Int8Array}; | ||
|
||
use arrow_schema::DataType; | ||
|
||
use arrow_array::Array; | ||
|
||
use crate::{ | ||
testing::{check_round_trip_encoding_of_data, TestCases}, | ||
version::LanceFileVersion, | ||
}; | ||
|
||
#[test_log::test(tokio::test)] | ||
async fn test_miniblock_bitpack() { | ||
let test_cases = TestCases::default().with_file_version(LanceFileVersion::V2_1); | ||
|
||
let arrays = vec![ | ||
Arc::new(Int8Array::from(vec![100; 1024])) as Arc<dyn Array>, | ||
Arc::new(Int8Array::from(vec![1; 1024])) as Arc<dyn Array>, | ||
Arc::new(Int8Array::from(vec![16; 1024])) as Arc<dyn Array>, | ||
Arc::new(Int8Array::from(vec![-1; 1024])) as Arc<dyn Array>, | ||
Arc::new(Int8Array::from(vec![5; 1])) as Arc<dyn Array>, | ||
]; | ||
check_round_trip_encoding_of_data(arrays, &test_cases, HashMap::new()).await; | ||
|
||
for data_type in [DataType::Int16, DataType::Int32, DataType::Int64] { | ||
let int64_arrays = vec![ | ||
Int64Array::from(vec![3; 1024]), | ||
Int64Array::from(vec![8; 1024]), | ||
Int64Array::from(vec![16; 1024]), | ||
Int64Array::from(vec![100; 1024]), | ||
Int64Array::from(vec![512; 1024]), | ||
Int64Array::from(vec![1000; 1024]), | ||
Int64Array::from(vec![2000; 1024]), | ||
Int64Array::from(vec![-1; 10]), | ||
]; | ||
|
||
let mut arrays = vec![]; | ||
for int64_array in int64_arrays { | ||
arrays.push(arrow_cast::cast(&int64_array, &data_type).unwrap()); | ||
} | ||
check_round_trip_encoding_of_data(arrays, &test_cases, HashMap::new()).await; | ||
} | ||
} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We don't need it for this PR but I'm thinking of putting the new 2.1 encodings in
message Compression { }
instead ofArrayEncoding
. On the plus side, we can probably name itbitpack
and notbitpack2
once we do that. Let's save for future work though.There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
gotcha, good idea.