Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: add full zip encoding for wide data types #3114

Merged
merged 3 commits into from
Nov 13, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 14 additions & 0 deletions protos/encodings.proto
Original file line number Diff line number Diff line change
Expand Up @@ -314,6 +314,19 @@ message MiniBlockLayout {
ArrayEncoding value_compression = 3;
}

/// A layout used for pages where the data is large
///
/// In this case the cost of transposing the data is relatively small (compared to the cost of writing the data)
/// and so we just zip the buffers together
message FullZipLayout {
// The number of bits of repetition info (0 if there is no repetition)
uint32 bits_rep = 1;
// The number of bits of definition info (0 if there is no definition)
uint32 bits_def = 2;
// Description of the compression of values
ArrayEncoding value_compression = 3;
}

/// A layout used for pages where all values are null
///
/// In addition, there can be no repetition levels and only a single definition level
Expand All @@ -327,5 +340,6 @@ message PageLayout {
oneof layout {
MiniBlockLayout mini_block_layout = 1;
AllNullLayout all_null_layout = 2;
FullZipLayout full_zip_layout = 3;
}
}
66 changes: 66 additions & 0 deletions rust/lance-core/src/utils/bit.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,3 +19,69 @@ pub fn pad_bytes_u64<const ALIGN: u64>(n: u64) -> u64 {
debug_assert!(is_pwr_two(ALIGN));
(ALIGN - (n & (ALIGN - 1))) & (ALIGN - 1)
}

// This is a lookup table for the log2 of the first 256 numbers
const LOG_TABLE_256: [u8; 256] = [
0, 1, 2, 2, 3, 3, 3, 3, 4, 4, 4, 4, 4, 4, 4, 4, 5, 5, 5, 5, 5, 5, 5, 5, 5, 5, 5, 5, 5, 5, 5, 5,
6, 6, 6, 6, 6, 6, 6, 6, 6, 6, 6, 6, 6, 6, 6, 6, 6, 6, 6, 6, 6, 6, 6, 6, 6, 6, 6, 6, 6, 6, 6, 6,
7, 7, 7, 7, 7, 7, 7, 7, 7, 7, 7, 7, 7, 7, 7, 7, 7, 7, 7, 7, 7, 7, 7, 7, 7, 7, 7, 7, 7, 7, 7, 7,
7, 7, 7, 7, 7, 7, 7, 7, 7, 7, 7, 7, 7, 7, 7, 7, 7, 7, 7, 7, 7, 7, 7, 7, 7, 7, 7, 7, 7, 7, 7, 7,
8, 8, 8, 8, 8, 8, 8, 8, 8, 8, 8, 8, 8, 8, 8, 8, 8, 8, 8, 8, 8, 8, 8, 8, 8, 8, 8, 8, 8, 8, 8, 8,
8, 8, 8, 8, 8, 8, 8, 8, 8, 8, 8, 8, 8, 8, 8, 8, 8, 8, 8, 8, 8, 8, 8, 8, 8, 8, 8, 8, 8, 8, 8, 8,
8, 8, 8, 8, 8, 8, 8, 8, 8, 8, 8, 8, 8, 8, 8, 8, 8, 8, 8, 8, 8, 8, 8, 8, 8, 8, 8, 8, 8, 8, 8, 8,
8, 8, 8, 8, 8, 8, 8, 8, 8, 8, 8, 8, 8, 8, 8, 8, 8, 8, 8, 8, 8, 8, 8, 8, 8, 8, 8, 8, 8, 8, 8, 8,
];

/// Returns the number of bits needed to represent the given number
///
/// Inspired by https://graphics.stanford.edu/~seander/bithacks.html
pub fn log_2_ceil(val: u32) -> u32 {
assert!(val > 0);
let upper_half = val >> 16;
if upper_half == 0 {
let third_quarter = val >> 8;
if third_quarter == 0 {
// Use lowest 8 bits (upper 24 are 0)
LOG_TABLE_256[val as usize] as u32
} else {
// Use bits 16..24 (0..16 are 0)
LOG_TABLE_256[third_quarter as usize] as u32 + 8
}
} else {
let first_quarter = upper_half >> 8;
if first_quarter == 0 {
// Use bits 8..16 (0..8 are 0)
16 + LOG_TABLE_256[upper_half as usize] as u32
} else {
// Use most significant bits (it's a big number!)
24 + LOG_TABLE_256[first_quarter as usize] as u32
}
}
}

#[cfg(test)]

pub mod tests {
use crate::utils::bit::log_2_ceil;

#[test]
fn test_log_2_ceil() {
fn classic_approach(mut val: u32) -> u32 {
let mut counter = 0;
while val > 0 {
val >>= 1;
counter += 1;
}
counter
}

for i in 1..(16 * 1024) {
assert_eq!(log_2_ceil(i), classic_approach(i));
}
assert_eq!(log_2_ceil(50 * 1024), classic_approach(50 * 1024));
assert_eq!(
log_2_ceil(1024 * 1024 * 1024),
classic_approach(1024 * 1024 * 1024)
);
}
}
5 changes: 5 additions & 0 deletions rust/lance-encoding/src/buffer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -222,6 +222,11 @@ impl LanceBuffer {
/// Reinterprets a LanceBuffer into a Vec<T>
///
/// If the underlying buffer is not properly aligned, this will involve a copy of the data
///
/// Note: doing this sort of re-interpretation generally makes assumptions about the endianness
/// of the data. Lance does not support big-endian machines so this is safe. However, if we end
/// up supporting big-endian machines in the future, then any use of this method will need to be
/// carefully reviewed.
pub fn borrow_to_typed_slice<T: ArrowNativeType>(&mut self) -> impl AsRef<[T]> {
let align = std::mem::align_of::<T>();
let is_aligned = self.as_ptr().align_offset(align) == 0;
Expand Down
36 changes: 35 additions & 1 deletion rust/lance-encoding/src/data.rs
Original file line number Diff line number Diff line change
Expand Up @@ -347,7 +347,7 @@ impl DataBlockBuilderImpl for VariableWidthDataBlockBuilder {
}
}

pub struct FixedWidthDataBlockBuilder {
struct FixedWidthDataBlockBuilder {
bits_per_value: u64,
bytes_per_value: u64,
values: Vec<u8>,
Expand Down Expand Up @@ -493,6 +493,33 @@ impl FixedSizeListBlock {
}
}

struct FixedSizeListBlockBuilder {
inner: Box<dyn DataBlockBuilderImpl>,
dimension: u64,
}

impl FixedSizeListBlockBuilder {
fn new(inner: Box<dyn DataBlockBuilderImpl>, dimension: u64) -> Self {
Self { inner, dimension }
}
}

impl DataBlockBuilderImpl for FixedSizeListBlockBuilder {
fn append(&mut self, data_block: &mut DataBlock, selection: Range<u64>) {
let selection = selection.start * self.dimension..selection.end * self.dimension;
let fsl = data_block.as_fixed_size_list_mut_ref().unwrap();
self.inner.append(fsl.child.as_mut(), selection);
}

fn finish(self: Box<Self>) -> DataBlock {
let inner_block = self.inner.finish();
DataBlock::FixedSizeList(FixedSizeListBlock {
child: Box::new(inner_block),
dimension: self.dimension,
})
}
}

/// A data block with no regular structure. There is no available spot to attach
/// validity / repdef information and it cannot be converted to Arrow without being
/// decoded
Expand Down Expand Up @@ -914,6 +941,13 @@ impl DataBlock {
todo!()
}
}
Self::FixedSizeList(inner) => {
let inner_builder = inner.child.make_builder(estimated_size_bytes);
Box::new(FixedSizeListBlockBuilder::new(
inner_builder,
inner.dimension,
))
}
_ => todo!(),
}
}
Expand Down
25 changes: 20 additions & 5 deletions rust/lance-encoding/src/decoder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -248,6 +248,7 @@ use crate::encodings::logical::r#struct::{
};
use crate::encodings::physical::binary::BinaryMiniBlockDecompressor;
use crate::encodings::physical::bitpack_fastlanes::BitpackMiniBlockDecompressor;
use crate::encodings::physical::fixed_size_list::FslPerValueDecompressor;
use crate::encodings::physical::value::{ConstantDecompressor, ValueDecompressor};
use crate::encodings::physical::{ColumnBuffers, FileBuffers};
use crate::format::pb::{self, column_encoding};
Expand Down Expand Up @@ -454,8 +455,14 @@ pub trait MiniBlockDecompressor: std::fmt::Debug + Send + Sync {
fn decompress(&self, data: LanceBuffer, num_values: u64) -> Result<DataBlock>;
}

pub trait FixedPerValueDecompressor: std::fmt::Debug + Send + Sync {
pub trait PerValueDecompressor: std::fmt::Debug + Send + Sync {
/// Decompress one or more values
fn decompress(&self, data: LanceBuffer, num_values: u64) -> Result<DataBlock>;
/// The number of bits in each value
///
/// Returns 0 if the data type is variable-width
///
/// Currently (and probably long term) this must be a multiple of 8
fn bits_per_value(&self) -> u64;
}

Expand All @@ -469,10 +476,10 @@ pub trait DecompressorStrategy: std::fmt::Debug + Send + Sync {
description: &pb::ArrayEncoding,
) -> Result<Box<dyn MiniBlockDecompressor>>;

fn create_fixed_per_value_decompressor(
fn create_per_value_decompressor(
&self,
description: &pb::ArrayEncoding,
) -> Result<Box<dyn FixedPerValueDecompressor>>;
) -> Result<Box<dyn PerValueDecompressor>>;

fn create_block_decompressor(
&self,
Expand Down Expand Up @@ -502,14 +509,22 @@ impl DecompressorStrategy for CoreDecompressorStrategy {
}
}

fn create_fixed_per_value_decompressor(
fn create_per_value_decompressor(
&self,
description: &pb::ArrayEncoding,
) -> Result<Box<dyn FixedPerValueDecompressor>> {
) -> Result<Box<dyn PerValueDecompressor>> {
match description.array_encoding.as_ref().unwrap() {
pb::array_encoding::ArrayEncoding::Flat(flat) => {
Ok(Box::new(ValueDecompressor::new(flat)))
}
pb::array_encoding::ArrayEncoding::FixedSizeList(fsl) => {
let items_decompressor =
self.create_per_value_decompressor(fsl.items.as_ref().unwrap())?;
Ok(Box::new(FslPerValueDecompressor::new(
items_decompressor,
fsl.dimension as u64,
)))
}
_ => todo!(),
}
}
Expand Down
93 changes: 53 additions & 40 deletions rust/lance-encoding/src/encoder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@ use arrow_array::{Array, ArrayRef, RecordBatch, UInt8Array};
use arrow_schema::DataType;
use bytes::{Bytes, BytesMut};
use futures::future::BoxFuture;
use lance_arrow::DataTypeExt;
use lance_core::datatypes::{
Field, Schema, BLOB_DESC_FIELD, BLOB_META_KEY, COMPRESSION_LEVEL_META_KEY,
COMPRESSION_META_KEY, PACKED_STRUCT_LEGACY_META_KEY, PACKED_STRUCT_META_KEY,
Expand All @@ -31,6 +30,7 @@ use crate::encodings::physical::bitpack_fastlanes::{
};
use crate::encodings::physical::block_compress::{CompressionConfig, CompressionScheme};
use crate::encodings::physical::dictionary::AlreadyDictionaryEncoder;
use crate::encodings::physical::fixed_size_list::FslPerValueCompressor;
use crate::encodings::physical::fsst::FsstArrayEncoder;
use crate::encodings::physical::packed_struct::PackedStructEncoder;
use crate::format::ProtobufUtils;
Expand Down Expand Up @@ -209,20 +209,30 @@ pub trait MiniBlockCompressor: std::fmt::Debug + Send + Sync {
fn compress(&self, page: DataBlock) -> Result<(MiniBlockCompressed, pb::ArrayEncoding)>;
}

/// Per-value compression must either:
///
/// A single buffer of fixed-width values
/// A single buffer of value data and a buffer of offsets
///
/// TODO: In the future we may allow metadata buffers
pub enum PerValueDataBlock {
Fixed(FixedWidthDataBlock),
Variable(VariableWidthBlock),
}

/// Trait for compression algorithms that are suitable for use in the zipped structural encoding
///
/// Compared to [`VariablePerValueCompressor`], these compressors are capable of compressing the data
/// so that every value has the exact same number of bits per value. For example, this is useful
/// for encoding vector embeddings where every value has a fixed size but the values themselves are
/// too large to use mini-block.
/// This compression must return either a FixedWidthDataBlock or a VariableWidthBlock. This is because
/// we need to zip the data and those are the only two blocks we know how to zip today.
///
/// The advantage of a fixed-bytes-per-value is that we can do random access in 1 IOP instead of 2
/// and do not need a repetition index.
pub trait FixedPerValueCompressor: std::fmt::Debug + Send + Sync {
/// Compress the data into a single buffer where each value is encoded with the same number of bits
/// In addition, the compressed data must be able to be decompressed in a random-access fashion.
/// This means that the decompression algorithm must be able to decompress any value without
/// decompressing all values before it.
pub trait PerValueCompressor: std::fmt::Debug + Send + Sync {
/// Compress the data into a single buffer
///
/// Also returns a description of the compression that can be used to decompress when reading the data back
fn compress(&self, data: DataBlock) -> Result<(FixedWidthDataBlock, pb::ArrayEncoding)>;
fn compress(&self, data: DataBlock) -> Result<(PerValueDataBlock, pb::ArrayEncoding)>;
}

/// Trait for compression algorithms that are suitable for use in the zipped structural encoding
Expand Down Expand Up @@ -407,11 +417,9 @@ pub trait ArrayEncodingStrategy: Send + Sync + std::fmt::Debug {
/// There are several different kinds of compression.
///
/// - Block compression is the most generic, but most difficult to use efficiently
/// - Fixed-per-value compression results in a fixed number of bits for each value
/// It is used for wide fixed-width types like vector embeddings.
/// - Variable-per-value compression results in two buffers, one buffer of offsets
/// and one buffer of data bytes. It is used for wide variable-width types
/// like strings, variable-length lists, binary, etc.
/// - Per-value compression results in either a fixed width data block or a variable
/// width data block. In other words, there is some number of bits per value.
/// In addition, each value should be independently decompressible.
/// - Mini-block compression results in a small block of opaque data for chunks
/// of rows. Each block is somewhere between 0 and 16KiB in size. This is
/// used for narrow data types (both fixed and variable length) where we can
Expand All @@ -424,19 +432,12 @@ pub trait CompressionStrategy: Send + Sync + std::fmt::Debug {
data: &DataBlock,
) -> Result<(Box<dyn BlockCompressor>, pb::ArrayEncoding)>;

/// Create a fixed-per-value compressor for the given data
fn create_fixed_per_value(
&self,
field: &Field,
data: &DataBlock,
) -> Result<Box<dyn FixedPerValueCompressor>>;

/// Create a variable-per-value compressor for the given data
fn create_variable_per_value(
/// Create a per-value compressor for the given data
fn create_per_value(
&self,
field: &Field,
data: &DataBlock,
) -> Result<Box<dyn VariablePerValueCompressor>>;
) -> Result<Box<dyn PerValueCompressor>>;

/// Create a mini-block compressor for the given data
fn create_miniblock_compressor(
Expand Down Expand Up @@ -816,23 +817,33 @@ impl CompressionStrategy for CoreArrayEncodingStrategy {
Ok(Box::new(ValueEncoder::default()))
}

fn create_fixed_per_value(
fn create_per_value(
&self,
field: &Field,
_data: &DataBlock,
) -> Result<Box<dyn FixedPerValueCompressor>> {
// Right now we only need block compressors for rep/def which is u16. Will need to expand
// this if we need block compression of other types.
assert!(field.data_type().byte_width() > 0);
Ok(Box::new(ValueEncoder::default()))
}

fn create_variable_per_value(
&self,
_field: &Field,
_data: &DataBlock,
) -> Result<Box<dyn VariablePerValueCompressor>> {
todo!()
data: &DataBlock,
) -> Result<Box<dyn PerValueCompressor>> {
match data {
DataBlock::FixedWidth(_) => {
let encoder = Box::new(ValueEncoder::default());
Ok(encoder)
}
DataBlock::VariableWidth(_variable_width) => {
todo!()
}
DataBlock::FixedSizeList(fsl) => {
let DataType::FixedSizeList(inner_field, field_dim) = field.data_type() else {
panic!("FSL data block without FSL field")
};
debug_assert_eq!(fsl.dimension, field_dim as u64);
let inner_compressor = self.create_per_value(
&inner_field.as_ref().try_into().unwrap(),
fsl.child.as_ref(),
)?;
let fsl_compressor = FslPerValueCompressor::new(inner_compressor, fsl.dimension);
Ok(Box::new(fsl_compressor))
}
_ => unreachable!(),
}
}

fn create_block_compressor(
Expand All @@ -841,6 +852,8 @@ impl CompressionStrategy for CoreArrayEncodingStrategy {
data: &DataBlock,
) -> Result<(Box<dyn BlockCompressor>, pb::ArrayEncoding)> {
match data {
// Right now we only need block compressors for rep/def which is u16. Will need to expand
// this if we need block compression of other types.
DataBlock::FixedWidth(fixed_width) => {
let encoder = Box::new(ValueEncoder::default());
let encoding = ProtobufUtils::flat_encoding(fixed_width.bits_per_value, 0, None);
Expand Down
Loading
Loading