From 692a1368235fdc608eeab482ba185c61bc218e32 Mon Sep 17 00:00:00 2001 From: Weston Pace Date: Mon, 11 Nov 2024 06:08:54 -0800 Subject: [PATCH 1/3] Added a full-zip structural layout for wide data types --- protos/encodings.proto | 14 + rust/lance-core/src/utils/bit.rs | 75 +++ rust/lance-encoding/src/buffer.rs | 6 +- rust/lance-encoding/src/data.rs | 36 +- rust/lance-encoding/src/decoder.rs | 25 +- rust/lance-encoding/src/encoder.rs | 93 +-- .../src/encodings/logical/primitive.rs | 389 ++++++++++++- .../src/encodings/physical/fixed_size_list.rs | 105 +++- .../src/encodings/physical/value.rs | 33 +- rust/lance-encoding/src/format.rs | 18 +- rust/lance-encoding/src/repdef.rs | 544 +++++++++++++++++- 11 files changed, 1263 insertions(+), 75 deletions(-) diff --git a/protos/encodings.proto b/protos/encodings.proto index c44ec9d61a..0f02c1b8d3 100644 --- a/protos/encodings.proto +++ b/protos/encodings.proto @@ -314,6 +314,19 @@ message MiniBlockLayout { ArrayEncoding value_compression = 3; } +/// A layout used for pages where the data is large +/// +/// In this case the cost of transposing the data is relatively small (compared to the cost of writing the data) +/// and so we just zip the buffers together +message FullZipLayout { + // The number of bits of repetition info (0 if there is no repetition) + uint32 bits_rep = 1; + // The number of bits of definition info (0 if there is no definition) + uint32 bits_def = 2; + // Description of the compression of values + ArrayEncoding value_compression = 3; +} + /// A layout used for pages where all values are null /// /// In addition, there can be no repetition levels and only a single definition level @@ -327,5 +340,6 @@ message PageLayout { oneof layout { MiniBlockLayout mini_block_layout = 1; AllNullLayout all_null_layout = 2; + FullZipLayout full_zip_layout = 3; } } \ No newline at end of file diff --git a/rust/lance-core/src/utils/bit.rs b/rust/lance-core/src/utils/bit.rs index a33f89fa9a..345e5f67cc 100644 --- a/rust/lance-core/src/utils/bit.rs +++ b/rust/lance-core/src/utils/bit.rs @@ -19,3 +19,78 @@ pub fn pad_bytes_u64(n: u64) -> u64 { debug_assert!(is_pwr_two(ALIGN)); (ALIGN - (n & (ALIGN - 1))) & (ALIGN - 1) } + +// This is a lookup table for the log2 of the first 256 numbers +const LOG_TABLE_256: [u8; 256] = [ + 0, 1, 2, 2, 3, 3, 3, 3, 4, 4, 4, 4, 4, 4, 4, 4, 5, 5, 5, 5, 5, 5, 5, 5, 5, 5, 5, 5, 5, 5, 5, 5, + 6, 6, 6, 6, 6, 6, 6, 6, 6, 6, 6, 6, 6, 6, 6, 6, 6, 6, 6, 6, 6, 6, 6, 6, 6, 6, 6, 6, 6, 6, 6, 6, + 7, 7, 7, 7, 7, 7, 7, 7, 7, 7, 7, 7, 7, 7, 7, 7, 7, 7, 7, 7, 7, 7, 7, 7, 7, 7, 7, 7, 7, 7, 7, 7, + 7, 7, 7, 7, 7, 7, 7, 7, 7, 7, 7, 7, 7, 7, 7, 7, 7, 7, 7, 7, 7, 7, 7, 7, 7, 7, 7, 7, 7, 7, 7, 7, + 8, 8, 8, 8, 8, 8, 8, 8, 8, 8, 8, 8, 8, 8, 8, 8, 8, 8, 8, 8, 8, 8, 8, 8, 8, 8, 8, 8, 8, 8, 8, 8, + 8, 8, 8, 8, 8, 8, 8, 8, 8, 8, 8, 8, 8, 8, 8, 8, 8, 8, 8, 8, 8, 8, 8, 8, 8, 8, 8, 8, 8, 8, 8, 8, + 8, 8, 8, 8, 8, 8, 8, 8, 8, 8, 8, 8, 8, 8, 8, 8, 8, 8, 8, 8, 8, 8, 8, 8, 8, 8, 8, 8, 8, 8, 8, 8, + 8, 8, 8, 8, 8, 8, 8, 8, 8, 8, 8, 8, 8, 8, 8, 8, 8, 8, 8, 8, 8, 8, 8, 8, 8, 8, 8, 8, 8, 8, 8, 8, +]; + +/// Returns the number of bits needed to represent the given number +/// +/// Inspired by https://graphics.stanford.edu/~seander/bithacks.html +pub fn log_2_ceil(val: u32) -> u32 { + assert!(val > 0); + let upper_half = val >> 16; + if upper_half == 0 { + let third_quarter = val >> 8; + if third_quarter == 0 { + // Use lowest 8 bits (upper 24 are 0) + LOG_TABLE_256[val as usize] as u32 + } else { + // Use bits 16..24 (0..16 are 0) + LOG_TABLE_256[third_quarter as usize] as u32 + 8 + } + } else { + let first_quarter = upper_half >> 8; + if first_quarter == 0 { + // Use bits 8..16 (0..8 are 0) + 16 + LOG_TABLE_256[upper_half as usize] as u32 + } else { + // Use most significant bits (it's a big number!) + 24 + LOG_TABLE_256[first_quarter as usize] as u32 + } + } +} + +/// Asserts if the currently running system is not little_endian +pub fn check_little_endian() { + assert_eq!( + 1u16.to_le_bytes(), + [1, 0], + "This code is for little-endian systems only" + ); +} + +#[cfg(test)] + +pub mod tests { + use crate::utils::bit::log_2_ceil; + + #[test] + fn test_log_2_ceil() { + fn classic_approach(mut val: u32) -> u32 { + let mut counter = 0; + while val > 0 { + val >>= 1; + counter += 1; + } + counter + } + + for i in 1..(16 * 1024) { + assert_eq!(log_2_ceil(i), classic_approach(i)); + } + assert_eq!(log_2_ceil(50 * 1024), classic_approach(50 * 1024)); + assert_eq!( + log_2_ceil(1024 * 1024 * 1024), + classic_approach(1024 * 1024 * 1024) + ); + } +} diff --git a/rust/lance-encoding/src/buffer.rs b/rust/lance-encoding/src/buffer.rs index 617e3bd6e6..abca880cd9 100644 --- a/rust/lance-encoding/src/buffer.rs +++ b/rust/lance-encoding/src/buffer.rs @@ -8,7 +8,10 @@ use std::{ops::Deref, ptr::NonNull, sync::Arc}; use arrow_buffer::{ArrowNativeType, Buffer, MutableBuffer, ScalarBuffer}; use snafu::{location, Location}; -use lance_core::{utils::bit::is_pwr_two, Error, Result}; +use lance_core::{ + utils::bit::{check_little_endian, is_pwr_two}, + Error, Result, +}; /// A copy-on-write byte buffer /// @@ -223,6 +226,7 @@ impl LanceBuffer { /// /// If the underlying buffer is not properly aligned, this will involve a copy of the data pub fn borrow_to_typed_slice(&mut self) -> impl AsRef<[T]> { + check_little_endian(); let align = std::mem::align_of::(); let is_aligned = self.as_ptr().align_offset(align) == 0; if self.len() % std::mem::size_of::() != 0 { diff --git a/rust/lance-encoding/src/data.rs b/rust/lance-encoding/src/data.rs index 2bd9967b7a..495f8eb069 100644 --- a/rust/lance-encoding/src/data.rs +++ b/rust/lance-encoding/src/data.rs @@ -347,7 +347,7 @@ impl DataBlockBuilderImpl for VariableWidthDataBlockBuilder { } } -pub struct FixedWidthDataBlockBuilder { +struct FixedWidthDataBlockBuilder { bits_per_value: u64, bytes_per_value: u64, values: Vec, @@ -493,6 +493,33 @@ impl FixedSizeListBlock { } } +struct FixedSizeListBlockBuilder { + inner: Box, + dimension: u64, +} + +impl FixedSizeListBlockBuilder { + fn new(inner: Box, dimension: u64) -> Self { + Self { inner, dimension } + } +} + +impl DataBlockBuilderImpl for FixedSizeListBlockBuilder { + fn append(&mut self, data_block: &mut DataBlock, selection: Range) { + let selection = selection.start * self.dimension..selection.end * self.dimension; + let fsl = data_block.as_fixed_size_list_mut_ref().unwrap(); + self.inner.append(fsl.child.as_mut(), selection); + } + + fn finish(self: Box) -> DataBlock { + let inner_block = self.inner.finish(); + DataBlock::FixedSizeList(FixedSizeListBlock { + child: Box::new(inner_block), + dimension: self.dimension, + }) + } +} + /// A data block with no regular structure. There is no available spot to attach /// validity / repdef information and it cannot be converted to Arrow without being /// decoded @@ -914,6 +941,13 @@ impl DataBlock { todo!() } } + Self::FixedSizeList(inner) => { + let inner_builder = inner.child.make_builder(estimated_size_bytes); + Box::new(FixedSizeListBlockBuilder::new( + inner_builder, + inner.dimension, + )) + } _ => todo!(), } } diff --git a/rust/lance-encoding/src/decoder.rs b/rust/lance-encoding/src/decoder.rs index 0e0f366a46..949f9ad9d3 100644 --- a/rust/lance-encoding/src/decoder.rs +++ b/rust/lance-encoding/src/decoder.rs @@ -248,6 +248,7 @@ use crate::encodings::logical::r#struct::{ }; use crate::encodings::physical::binary::BinaryMiniBlockDecompressor; use crate::encodings::physical::bitpack_fastlanes::BitpackMiniBlockDecompressor; +use crate::encodings::physical::fixed_size_list::FslPerValueDecompressor; use crate::encodings::physical::value::{ConstantDecompressor, ValueDecompressor}; use crate::encodings::physical::{ColumnBuffers, FileBuffers}; use crate::format::pb::{self, column_encoding}; @@ -454,8 +455,14 @@ pub trait MiniBlockDecompressor: std::fmt::Debug + Send + Sync { fn decompress(&self, data: LanceBuffer, num_values: u64) -> Result; } -pub trait FixedPerValueDecompressor: std::fmt::Debug + Send + Sync { +pub trait PerValueDecompressor: std::fmt::Debug + Send + Sync { + /// Decompress one or more values fn decompress(&self, data: LanceBuffer, num_values: u64) -> Result; + /// The number of bits in each value + /// + /// Returns 0 if the data type is variable-width + /// + /// Currently (and probably long term) this must be a multiple of 8 fn bits_per_value(&self) -> u64; } @@ -469,10 +476,10 @@ pub trait DecompressorStrategy: std::fmt::Debug + Send + Sync { description: &pb::ArrayEncoding, ) -> Result>; - fn create_fixed_per_value_decompressor( + fn create_per_value_decompressor( &self, description: &pb::ArrayEncoding, - ) -> Result>; + ) -> Result>; fn create_block_decompressor( &self, @@ -502,14 +509,22 @@ impl DecompressorStrategy for CoreDecompressorStrategy { } } - fn create_fixed_per_value_decompressor( + fn create_per_value_decompressor( &self, description: &pb::ArrayEncoding, - ) -> Result> { + ) -> Result> { match description.array_encoding.as_ref().unwrap() { pb::array_encoding::ArrayEncoding::Flat(flat) => { Ok(Box::new(ValueDecompressor::new(flat))) } + pb::array_encoding::ArrayEncoding::FixedSizeList(fsl) => { + let items_decompressor = + self.create_per_value_decompressor(fsl.items.as_ref().unwrap())?; + Ok(Box::new(FslPerValueDecompressor::new( + items_decompressor, + fsl.dimension as u64, + ))) + } _ => todo!(), } } diff --git a/rust/lance-encoding/src/encoder.rs b/rust/lance-encoding/src/encoder.rs index 57c73ff8e3..f3b85000dc 100644 --- a/rust/lance-encoding/src/encoder.rs +++ b/rust/lance-encoding/src/encoder.rs @@ -8,7 +8,6 @@ use arrow_array::{Array, ArrayRef, RecordBatch, UInt8Array}; use arrow_schema::DataType; use bytes::{Bytes, BytesMut}; use futures::future::BoxFuture; -use lance_arrow::DataTypeExt; use lance_core::datatypes::{ Field, Schema, BLOB_DESC_FIELD, BLOB_META_KEY, COMPRESSION_LEVEL_META_KEY, COMPRESSION_META_KEY, PACKED_STRUCT_LEGACY_META_KEY, PACKED_STRUCT_META_KEY, @@ -31,6 +30,7 @@ use crate::encodings::physical::bitpack_fastlanes::{ }; use crate::encodings::physical::block_compress::{CompressionConfig, CompressionScheme}; use crate::encodings::physical::dictionary::AlreadyDictionaryEncoder; +use crate::encodings::physical::fixed_size_list::FslPerValueCompressor; use crate::encodings::physical::fsst::FsstArrayEncoder; use crate::encodings::physical::packed_struct::PackedStructEncoder; use crate::format::ProtobufUtils; @@ -209,20 +209,30 @@ pub trait MiniBlockCompressor: std::fmt::Debug + Send + Sync { fn compress(&self, page: DataBlock) -> Result<(MiniBlockCompressed, pb::ArrayEncoding)>; } +/// Per-value compression must either: +/// +/// A single buffer of fixed-width values +/// A single buffer of value data and a buffer of offsets +/// +/// TODO: In the future we may allow metadata buffers +pub enum PerValueDataBlock { + Fixed(FixedWidthDataBlock), + Variable(VariableWidthBlock), +} + /// Trait for compression algorithms that are suitable for use in the zipped structural encoding /// -/// Compared to [`VariablePerValueCompressor`], these compressors are capable of compressing the data -/// so that every value has the exact same number of bits per value. For example, this is useful -/// for encoding vector embeddings where every value has a fixed size but the values themselves are -/// too large to use mini-block. +/// This compression must return either a FixedWidthDataBlock or a VariableWidthBlock. This is because +/// we need to zip the data and those are the only two blocks we know how to zip today. /// -/// The advantage of a fixed-bytes-per-value is that we can do random access in 1 IOP instead of 2 -/// and do not need a repetition index. -pub trait FixedPerValueCompressor: std::fmt::Debug + Send + Sync { - /// Compress the data into a single buffer where each value is encoded with the same number of bits +/// In addition, the compressed data must be able to be decompressed in a random-access fashion. +/// This means that the decompression algorithm must be able to decompress any value without +/// decompressing all values before it. +pub trait PerValueCompressor: std::fmt::Debug + Send + Sync { + /// Compress the data into a single buffer /// /// Also returns a description of the compression that can be used to decompress when reading the data back - fn compress(&self, data: DataBlock) -> Result<(FixedWidthDataBlock, pb::ArrayEncoding)>; + fn compress(&self, data: DataBlock) -> Result<(PerValueDataBlock, pb::ArrayEncoding)>; } /// Trait for compression algorithms that are suitable for use in the zipped structural encoding @@ -407,11 +417,9 @@ pub trait ArrayEncodingStrategy: Send + Sync + std::fmt::Debug { /// There are several different kinds of compression. /// /// - Block compression is the most generic, but most difficult to use efficiently -/// - Fixed-per-value compression results in a fixed number of bits for each value -/// It is used for wide fixed-width types like vector embeddings. -/// - Variable-per-value compression results in two buffers, one buffer of offsets -/// and one buffer of data bytes. It is used for wide variable-width types -/// like strings, variable-length lists, binary, etc. +/// - Per-value compression results in either a fixed width data block or a variable +/// width data block. In other words, there is some number of bits per value. +/// In addition, each value should be independently decompressible. /// - Mini-block compression results in a small block of opaque data for chunks /// of rows. Each block is somewhere between 0 and 16KiB in size. This is /// used for narrow data types (both fixed and variable length) where we can @@ -424,19 +432,12 @@ pub trait CompressionStrategy: Send + Sync + std::fmt::Debug { data: &DataBlock, ) -> Result<(Box, pb::ArrayEncoding)>; - /// Create a fixed-per-value compressor for the given data - fn create_fixed_per_value( - &self, - field: &Field, - data: &DataBlock, - ) -> Result>; - - /// Create a variable-per-value compressor for the given data - fn create_variable_per_value( + /// Create a per-value compressor for the given data + fn create_per_value( &self, field: &Field, data: &DataBlock, - ) -> Result>; + ) -> Result>; /// Create a mini-block compressor for the given data fn create_miniblock_compressor( @@ -816,23 +817,33 @@ impl CompressionStrategy for CoreArrayEncodingStrategy { Ok(Box::new(ValueEncoder::default())) } - fn create_fixed_per_value( + fn create_per_value( &self, field: &Field, - _data: &DataBlock, - ) -> Result> { - // Right now we only need block compressors for rep/def which is u16. Will need to expand - // this if we need block compression of other types. - assert!(field.data_type().byte_width() > 0); - Ok(Box::new(ValueEncoder::default())) - } - - fn create_variable_per_value( - &self, - _field: &Field, - _data: &DataBlock, - ) -> Result> { - todo!() + data: &DataBlock, + ) -> Result> { + match data { + DataBlock::FixedWidth(_) => { + let encoder = Box::new(ValueEncoder::default()); + Ok(encoder) + } + DataBlock::VariableWidth(_variable_width) => { + todo!() + } + DataBlock::FixedSizeList(fsl) => { + let DataType::FixedSizeList(inner_field, field_dim) = field.data_type() else { + panic!("FSL data block without FSL field") + }; + debug_assert_eq!(fsl.dimension, field_dim as u64); + let inner_compressor = self.create_per_value( + &inner_field.as_ref().try_into().unwrap(), + fsl.child.as_ref(), + )?; + let fsl_compressor = FslPerValueCompressor::new(inner_compressor, fsl.dimension); + Ok(Box::new(fsl_compressor)) + } + _ => unreachable!(), + } } fn create_block_compressor( @@ -841,6 +852,8 @@ impl CompressionStrategy for CoreArrayEncodingStrategy { data: &DataBlock, ) -> Result<(Box, pb::ArrayEncoding)> { match data { + // Right now we only need block compressors for rep/def which is u16. Will need to expand + // this if we need block compression of other types. DataBlock::FixedWidth(fixed_width) => { let encoder = Box::new(ValueEncoder::default()); let encoding = ProtobufUtils::flat_encoding(fixed_width.bits_per_value, 0, None); diff --git a/rust/lance-encoding/src/encodings/logical/primitive.rs b/rust/lance-encoding/src/encodings/logical/primitive.rs index 434308486f..c25d830301 100644 --- a/rust/lance-encoding/src/encodings/logical/primitive.rs +++ b/rust/lance-encoding/src/encodings/logical/primitive.rs @@ -13,7 +13,10 @@ use lance_core::utils::bit::pad_bytes; use log::{debug, trace}; use snafu::{location, Location}; -use crate::data::{AllNullDataBlock, DataBlock}; +use crate::data::{AllNullDataBlock, DataBlock, VariableWidthBlock}; +use crate::decoder::PerValueDecompressor; +use crate::encoder::PerValueDataBlock; +use crate::repdef::{build_control_word_iterator, ControlWordIterator, ControlWordParser}; use crate::statistics::{GetStat, Stat}; use lance_core::{datatypes::Field, utils::tokio::spawn_cpu, Result}; @@ -798,6 +801,226 @@ impl StructuralPageScheduler for MiniBlockScheduler { } } +/// A scheduler for full-zip encoded data +/// +/// When the data type has a fixed-width then we simply need to map from +/// row ranges to byte ranges using the fixed-width of the data type. +/// +/// When the data type is variable-width or has any repetition then a +/// repetition index is required. +#[derive(Debug)] +pub struct FullZipScheduler { + data_buf_position: u64, + priority: u64, + rows_in_page: u64, + value_decompressor: Arc, + ctrl_word_parser: ControlWordParser, +} + +impl FullZipScheduler { + fn try_new( + buffer_offsets_and_sizes: &[(u64, u64)], + priority: u64, + rows_in_page: u64, + layout: &pb::FullZipLayout, + decompressors: &dyn DecompressorStrategy, + ) -> Result { + // We don't need the data_buf_size because we either the data type is + // fixed-width (and we can tell size from rows_in_page) or it is not + // and we have a repetition index. + let (data_buf_position, _) = buffer_offsets_and_sizes[0]; + let value_decompressor = decompressors + .create_per_value_decompressor(layout.value_compression.as_ref().unwrap())?; + let ctrl_word_parser = ControlWordParser::new( + layout.bits_rep.try_into().unwrap(), + layout.bits_def.try_into().unwrap(), + ); + Ok(Self { + data_buf_position, + value_decompressor: value_decompressor.into(), + priority, + rows_in_page, + ctrl_word_parser, + }) + } +} + +impl StructuralPageScheduler for FullZipScheduler { + fn initialize<'a>(&'a mut self, _io: &Arc) -> BoxFuture<'a, Result<()>> { + std::future::ready(Ok(())).boxed() + } + + fn schedule_ranges( + &self, + ranges: &[Range], + io: &dyn EncodingsIo, + ) -> Result>>> { + let bits_per_value = self.value_decompressor.bits_per_value(); + assert_eq!(bits_per_value % 8, 0); + let bytes_per_value = bits_per_value / 8; + let bytes_per_cw = self.ctrl_word_parser.bytes_per_word(); + let total_bytes_per_value = bytes_per_value + bytes_per_cw as u64; + // We simply map row ranges into byte ranges + let byte_ranges = ranges.iter().map(|r| { + debug_assert!(r.end <= self.rows_in_page); + let start = self.data_buf_position + r.start * total_bytes_per_value; + let end = self.data_buf_position + r.end * total_bytes_per_value; + start..end + }); + let data = io.submit_request(byte_ranges.collect(), self.priority); + let value_decompressor = self.value_decompressor.clone(); + let num_rows = ranges.iter().map(|r| r.end - r.start).sum(); + let ctrl_word_parser = self.ctrl_word_parser; + Ok(async move { + let data = data.await?; + let data = data + .into_iter() + .map(|d| LanceBuffer::from_bytes(d, 1)) + .collect(); + Ok(Box::new(FixedFullZipDecoder { + value_decompressor, + data, + num_rows, + ctrl_word_parser, + offset_in_current: 0, + bytes_per_value: bytes_per_value as usize, + total_bytes_per_value: total_bytes_per_value as usize, + }) as Box) + } + .boxed()) + } +} + +/// A decoder for full-zip encoded data when the data has a fixed-width +/// +/// Here we need to unzip the control words from the values themselves and +/// then decompress the requested values. +/// +/// We use a PerValueDecompressor because we will only be decompressing the +/// requested data. This decoder / scheduler does not do any read amplification. +#[derive(Debug)] +struct FixedFullZipDecoder { + value_decompressor: Arc, + ctrl_word_parser: ControlWordParser, + data: VecDeque, + offset_in_current: usize, + bytes_per_value: usize, + total_bytes_per_value: usize, + num_rows: u64, +} + +impl StructuralPageDecoder for FixedFullZipDecoder { + fn drain(&mut self, num_rows: u64) -> Result> { + let mut task_data = Vec::with_capacity(self.data.len()); + let mut remaining = num_rows; + while remaining > 0 { + let cur_buf = self.data.front_mut().unwrap(); + let bytes_avail = cur_buf.len() - self.offset_in_current; + + let bytes_needed = remaining as usize * self.total_bytes_per_value; + let bytes_to_take = bytes_needed.min(bytes_avail); + + let task_slice = cur_buf.slice_with_length(self.offset_in_current, bytes_to_take); + let rows_in_task = (bytes_to_take / self.total_bytes_per_value) as u64; + + task_data.push((task_slice, rows_in_task)); + + remaining -= rows_in_task; + if bytes_to_take + self.offset_in_current == cur_buf.len() { + self.data.pop_front(); + self.offset_in_current = 0; + } else { + self.offset_in_current += bytes_to_take; + } + } + let num_rows = task_data.iter().map(|td| td.1).sum::() as usize; + Ok(Box::new(FixedFullZipDecodeTask { + value_decompressor: self.value_decompressor.clone(), + ctrl_word_parser: self.ctrl_word_parser, + data: task_data, + bytes_per_value: self.bytes_per_value, + num_rows, + })) + } + + fn num_rows(&self) -> u64 { + self.num_rows + } +} + +/// A task to unzip and decompress full-zip encoded data when that data +/// has a fixed-width. +#[derive(Debug)] +struct FixedFullZipDecodeTask { + value_decompressor: Arc, + ctrl_word_parser: ControlWordParser, + data: Vec<(LanceBuffer, u64)>, + num_rows: usize, + bytes_per_value: usize, +} + +impl DecodePageTask for FixedFullZipDecodeTask { + fn decode(self: Box) -> Result { + // Multiply by 2 to make a stab at the size of the output buffer (which will be decompressed and thus bigger) + let estimated_size_bytes = self.data.iter().map(|data| data.0.len()).sum::() * 2; + let mut data_builder = + DataBlockBuilder::with_capacity_estimate(estimated_size_bytes as u64); + + if self.ctrl_word_parser.bytes_per_word() == 0 { + // Fast path, no need to unzip because there is no rep/def + // + // We decompress each buffer and add it to our output buffer + for (buf, rows_in_buf) in self.data.into_iter() { + let mut decompressed = self.value_decompressor.decompress(buf, rows_in_buf)?; + data_builder.append(&mut decompressed, 0..rows_in_buf); + } + + Ok(DecodedPage { + data: data_builder.finish(), + repetition: None, + definition: None, + }) + } else { + // Slow path, unzipping needed + let mut rep = Vec::with_capacity(self.num_rows); + let mut def = Vec::with_capacity(self.num_rows); + + for (buf, rows_in_buf) in self.data.into_iter() { + let mut buf_slice = buf.as_ref(); + // We will be unzipping repdef in to `rep` and `def` and the + // values into `values` (which contains the compressed values) + let mut values = Vec::with_capacity( + buf.len() - (self.ctrl_word_parser.bytes_per_word() * rows_in_buf as usize), + ); + for _ in 0..rows_in_buf { + // Extract rep/def + self.ctrl_word_parser.parse(buf_slice, &mut rep, &mut def); + buf_slice = &buf_slice[self.ctrl_word_parser.bytes_per_word()..]; + // Extract value + values.extend_from_slice(buf_slice[..self.bytes_per_value].as_ref()); + buf_slice = &buf_slice[self.bytes_per_value..]; + } + + // Finally, we decompress the values and add them to our output buffer + let values_buf = LanceBuffer::Owned(values); + let mut decompressed = self + .value_decompressor + .decompress(values_buf, rows_in_buf)?; + data_builder.append(&mut decompressed, 0..rows_in_buf); + } + + let repetition = if rep.is_empty() { None } else { Some(rep) }; + let definition = if def.is_empty() { None } else { Some(def) }; + + Ok(DecodedPage { + data: data_builder.finish(), + repetition, + definition, + }) + } + } +} + #[derive(Debug)] struct StructuralPrimitiveFieldSchedulingJob<'a> { scheduler: &'a StructuralPrimitiveFieldScheduler, @@ -960,6 +1183,15 @@ impl StructuralPrimitiveFieldScheduler { decompressors, )?) } + Some(pb::page_layout::Layout::FullZipLayout(full_zip)) => { + Box::new(FullZipScheduler::try_new( + &page_info.buffer_offsets_and_sizes, + page_info.priority, + page_info.num_rows, + full_zip, + decompressors, + )?) + } Some(pb::page_layout::Layout::AllNullLayout(_)) => { Box::new(SimpleAllNullScheduler::default()) as Box } @@ -1708,7 +1940,7 @@ impl PrimitiveStructuralEncoder { ) } - /// Compresses a buffer of levels + /// Compresses a buffer of levels into chunks /// /// TODO: Use bit-packing here fn compress_levels( @@ -1834,6 +2066,131 @@ impl PrimitiveStructuralEncoder { }) } + // For fixed-size data we encode < control word | data > for each value + fn serialize_full_zip_fixed( + fixed: FixedWidthDataBlock, + mut repdef: ControlWordIterator, + ) -> LanceBuffer { + let len = fixed.data.len() + repdef.bytes_per_word() * fixed.num_values as usize; + let mut buf = Vec::with_capacity(len); + + // I suppose we can just pad to the nearest byte but I'm not sure we need to worry about this anytime soon + // because it is unlikely compression of large values is going to yield a result that is not byte aligned + assert_eq!( + fixed.bits_per_value % 8, + 0, + "Non-byte aligned full-zip compression not yet supported" + ); + + let bytes_per_value = fixed.bits_per_value as usize / 8; + + for value in fixed.data.chunks_exact(bytes_per_value) { + repdef.append_next(&mut buf); + buf.extend_from_slice(value); + } + + LanceBuffer::Owned(buf) + } + + // For variable-size data we encode < control word | length | data > for each value + fn serialize_full_zip_variable( + mut variable: VariableWidthBlock, + mut repdef: ControlWordIterator, + ) -> LanceBuffer { + let bytes_per_offset = variable.bits_per_offset as usize / 8; + assert_eq!( + variable.bits_per_offset % 8, + 0, + "Only byte-aligned offsets supported" + ); + let len = variable.data.len() + + repdef.bytes_per_word() * variable.num_values as usize + + bytes_per_offset * variable.num_values as usize; + let mut buf = Vec::with_capacity(len); + + // TODO: We may want to bit-pack lengths in the future. We probably don't need + // full bitpacking (which would cause the data to become unaligned) but we could + // bitpack to the nearest word size (e.g. u8 / u16 / u32) + match bytes_per_offset { + 4 => { + let offs = variable.offsets.borrow_to_typed_slice::(); + for offsets in offs.as_ref().windows(2) { + repdef.append_next(&mut buf); + buf.extend_from_slice(&(offsets[1] - offsets[0]).to_le_bytes()); + buf.extend_from_slice(&variable.data[offsets[0] as usize..offsets[1] as usize]); + } + } + 8 => { + let offs = variable.offsets.borrow_to_typed_slice::(); + for offsets in offs.as_ref().windows(2) { + repdef.append_next(&mut buf); + buf.extend_from_slice(&(offsets[1] - offsets[0]).to_le_bytes()); + buf.extend_from_slice(&variable.data[offsets[0] as usize..offsets[1] as usize]); + } + } + _ => panic!("Unsupported offset size"), + } + + LanceBuffer::Owned(buf) + } + + /// Serializes data into a single buffer according to the full-zip format which zips + /// together the repetition, definition, and value data into a single buffer. + fn serialize_full_zip( + compressed_data: PerValueDataBlock, + repdef: ControlWordIterator, + ) -> LanceBuffer { + match compressed_data { + PerValueDataBlock::Fixed(fixed) => Self::serialize_full_zip_fixed(fixed, repdef), + PerValueDataBlock::Variable(var) => Self::serialize_full_zip_variable(var, repdef), + } + } + + fn encode_full_zip( + column_idx: u32, + field: &Field, + compression_strategy: &dyn CompressionStrategy, + data: DataBlock, + repdefs: Vec, + row_number: u64, + ) -> Result { + let repdef = RepDefBuilder::serialize(repdefs); + let max_rep = repdef + .repetition_levels + .as_ref() + .map_or(0, |r| r.iter().max().copied().unwrap_or(0)); + let max_def = repdef + .definition_levels + .as_ref() + .map_or(0, |d| d.iter().max().copied().unwrap_or(0)); + let repdef_iter = build_control_word_iterator( + repdef.repetition_levels, + max_rep, + repdef.definition_levels, + max_def, + ); + let bits_rep = repdef_iter.bits_rep(); + let bits_def = repdef_iter.bits_def(); + + let num_values = data.num_values(); + // The validity is encoded in repdef so we can remove it + let data = data.remove_validity(); + + let compressor = compression_strategy.create_per_value(field, &data)?; + let (compressed_data, value_encoding) = compressor.compress(data)?; + + let zipped = Self::serialize_full_zip(compressed_data, repdef_iter); + + let description = ProtobufUtils::full_zip_layout(bits_rep, bits_def, value_encoding); + Ok(EncodedPage { + num_rows: num_values, + column_idx, + data: vec![zipped], + description: PageEncoding::Structural(description), + row_number, + }) + } + // Creates an encode task, consuming all buffered data fn do_flush( &mut self, @@ -1860,12 +2217,12 @@ impl PrimitiveStructuralEncoder { Self::encode_simple_all_null(column_idx, num_values, row_number) } else { let data_block = DataBlock::from_arrays(&arrays, num_values); - log::debug!( - "Encoding column {} with {} rows using mini-block layout", - column_idx, - num_values - ); if Self::is_narrow(&data_block) { + log::debug!( + "Encoding column {} with {} rows using mini-block layout", + column_idx, + num_values + ); Self::encode_miniblock( column_idx, &field, @@ -1875,7 +2232,19 @@ impl PrimitiveStructuralEncoder { row_number, ) } else { - todo!("Full zipped encoding") + log::debug!( + "Encoding column {} with {} rows using full-zip layout", + column_idx, + num_values + ); + Self::encode_full_zip( + column_idx, + &field, + compression_strategy.as_ref(), + data_block, + repdefs, + row_number, + ) } } }) @@ -1896,10 +2265,6 @@ impl PrimitiveStructuralEncoder { DataType::Null => { repdef.add_validity_bitmap(NullBuffer::new(BooleanBuffer::new_unset(array.len()))); } - DataType::FixedSizeList(_, _) => { - Self::extract_validity_buf(array, repdef); - Self::extract_validity(array.as_fixed_size_list().values(), repdef); - } DataType::Dictionary(_, _) => { unreachable!() } diff --git a/rust/lance-encoding/src/encodings/physical/fixed_size_list.rs b/rust/lance-encoding/src/encodings/physical/fixed_size_list.rs index a6c63d0c40..f827f9a914 100644 --- a/rust/lance-encoding/src/encodings/physical/fixed_size_list.rs +++ b/rust/lance-encoding/src/encodings/physical/fixed_size_list.rs @@ -9,10 +9,10 @@ use lance_core::Result; use log::trace; use crate::{ - data::{DataBlock, FixedSizeListBlock}, - decoder::{PageScheduler, PrimitivePageDecoder}, - encoder::{ArrayEncoder, EncodedArray}, - format::ProtobufUtils, + data::{BlockInfo, DataBlock, FixedSizeListBlock, FixedWidthDataBlock, UsedEncoding}, + decoder::{PageScheduler, PerValueDecompressor, PrimitivePageDecoder}, + encoder::{ArrayEncoder, EncodedArray, PerValueCompressor, PerValueDataBlock}, + format::{pb, ProtobufUtils}, EncodingsIo, }; @@ -123,11 +123,106 @@ impl ArrayEncoder for FslEncoder { dimension: self.dimension as u64, }); - let encoding = ProtobufUtils::fixed_size_list(encoded_data.encoding, self.dimension); + let encoding = ProtobufUtils::fixed_size_list(encoded_data.encoding, self.dimension as u64); Ok(EncodedArray { data, encoding }) } } +/// A compressor for primitive FSLs that flattens each list into a +/// single value. If the inner list has validity then the validity +/// is zipped in with the values. +/// +/// In other words, if the list is FSL [[0, NULL], [4, 10]] then the +/// two buffers start as: +/// +/// values: 0x00 0x?? 0x04 0x0A +/// validity: 0b1011 +/// +/// The output will be: +/// +/// zipped: 0x01 0x00 0x00 0x?? 0x01 0x04 0x01 0x0A +/// +/// Note that we expand validity to be at least a byte per value so this +/// approach is not ideal for small lists, though we should be using mini-block +/// for small lists anyways. +#[derive(Debug)] +pub struct FslPerValueCompressor { + items_compressor: Box, + dimension: u64, +} + +impl FslPerValueCompressor { + pub fn new(items_compressor: Box, dimension: u64) -> Self { + Self { + items_compressor, + dimension, + } + } +} + +impl PerValueCompressor for FslPerValueCompressor { + fn compress(&self, data: DataBlock) -> Result<(PerValueDataBlock, pb::ArrayEncoding)> { + let mut data = data.as_fixed_size_list().unwrap(); + let flattened = match data.child.as_mut() { + DataBlock::FixedWidth(fixed_width) => DataBlock::FixedWidth(FixedWidthDataBlock { + bits_per_value: fixed_width.bits_per_value * self.dimension, + data: fixed_width.data.borrow_and_clone(), + block_info: BlockInfo::new(), + num_values: fixed_width.num_values / self.dimension, + used_encoding: UsedEncoding::new(), + }), + DataBlock::VariableWidth(_) => todo!("GH-3111: FSL with variable inner type"), + DataBlock::Nullable(_) => todo!("GH-3112: FSL with nullable inner type"), + DataBlock::FixedSizeList(_) => todo!("GH-3113: Nested FSLs"), + _ => unreachable!(), + }; + let (compressed, encoding) = self.items_compressor.compress(flattened)?; + let wrapped_encoding = ProtobufUtils::fixed_size_list(encoding, self.dimension); + + Ok((compressed, wrapped_encoding)) + } +} + +/// Reversed the process described in [`FslPerValueCompressor`] +#[derive(Debug)] +pub struct FslPerValueDecompressor { + items_decompressor: Box, + dimension: u64, +} + +impl FslPerValueDecompressor { + pub fn new(items_decompressor: Box, dimension: u64) -> Self { + Self { + items_decompressor, + dimension, + } + } +} + +impl PerValueDecompressor for FslPerValueDecompressor { + fn decompress(&self, data: crate::buffer::LanceBuffer, num_values: u64) -> Result { + let decompressed = self.items_decompressor.decompress(data, num_values)?; + let unflattened = match decompressed { + DataBlock::FixedWidth(fixed_width) => DataBlock::FixedWidth(FixedWidthDataBlock { + bits_per_value: fixed_width.bits_per_value / self.dimension, + data: fixed_width.data, + block_info: BlockInfo::new(), + num_values: fixed_width.num_values * self.dimension, + used_encoding: UsedEncoding::new(), + }), + _ => todo!(), + }; + Ok(DataBlock::FixedSizeList(FixedSizeListBlock { + child: Box::new(unflattened), + dimension: self.dimension, + })) + } + + fn bits_per_value(&self) -> u64 { + self.items_decompressor.bits_per_value() + } +} + #[cfg(test)] mod tests { use std::sync::Arc; diff --git a/rust/lance-encoding/src/encodings/physical/value.rs b/rust/lance-encoding/src/encodings/physical/value.rs index f9466e61f8..488babdbe1 100644 --- a/rust/lance-encoding/src/encodings/physical/value.rs +++ b/rust/lance-encoding/src/encodings/physical/value.rs @@ -12,10 +12,10 @@ use std::sync::{Arc, Mutex}; use crate::buffer::LanceBuffer; use crate::data::{BlockInfo, ConstantDataBlock, DataBlock, FixedWidthDataBlock, UsedEncoding}; -use crate::decoder::{BlockDecompressor, FixedPerValueDecompressor, MiniBlockDecompressor}; +use crate::decoder::{BlockDecompressor, MiniBlockDecompressor, PerValueDecompressor}; use crate::encoder::{ - BlockCompressor, FixedPerValueCompressor, MiniBlockChunk, MiniBlockCompressed, - MiniBlockCompressor, MAX_MINIBLOCK_BYTES, MAX_MINIBLOCK_VALUES, + BlockCompressor, MiniBlockChunk, MiniBlockCompressed, MiniBlockCompressor, PerValueCompressor, + PerValueDataBlock, MAX_MINIBLOCK_BYTES, MAX_MINIBLOCK_VALUES, }; use crate::format::pb::{self, ArrayEncoding}; use crate::format::ProtobufUtils; @@ -425,7 +425,7 @@ impl MiniBlockDecompressor for ValueDecompressor { } } -impl FixedPerValueDecompressor for ValueDecompressor { +impl PerValueDecompressor for ValueDecompressor { fn decompress(&self, data: LanceBuffer, num_values: u64) -> Result { MiniBlockDecompressor::decompress(self, data, num_values) } @@ -435,12 +435,12 @@ impl FixedPerValueDecompressor for ValueDecompressor { } } -impl FixedPerValueCompressor for ValueEncoder { - fn compress(&self, data: DataBlock) -> Result<(FixedWidthDataBlock, ArrayEncoding)> { +impl PerValueCompressor for ValueEncoder { + fn compress(&self, data: DataBlock) -> Result<(PerValueDataBlock, ArrayEncoding)> { let (data, encoding) = match data { DataBlock::FixedWidth(fixed_width) => { let encoding = ProtobufUtils::flat_encoding(fixed_width.bits_per_value, 0, None); - (fixed_width, encoding) + (PerValueDataBlock::Fixed(fixed_width), encoding) } _ => unimplemented!( "Cannot compress block of type {} with ValueEncoder", @@ -504,6 +504,25 @@ pub(crate) mod tests { } } + lazy_static::lazy_static! { + static ref LARGE_TYPES: Vec = vec![DataType::FixedSizeList( + Arc::new(Field::new("", DataType::Int32, false)), + 128, + )]; + } + + #[rstest] + #[test_log::test(tokio::test)] + async fn test_large_primitive( + #[values(LanceFileVersion::V2_0, LanceFileVersion::V2_1)] version: LanceFileVersion, + ) { + for data_type in LARGE_TYPES.iter() { + log::info!("Testing encoding for {:?}", data_type); + let field = Field::new("", data_type.clone(), false); + check_round_trip_encoding_random(field, version).await; + } + } + #[test_log::test(tokio::test)] async fn test_miniblock_stress() { // Tests for strange page sizes and batch sizes and validity scenarios for miniblock diff --git a/rust/lance-encoding/src/format.rs b/rust/lance-encoding/src/format.rs index 83b86d5cca..1b21210226 100644 --- a/rust/lance-encoding/src/format.rs +++ b/rust/lance-encoding/src/format.rs @@ -193,10 +193,10 @@ impl ProtobufUtils { } } - pub fn fixed_size_list(data: ArrayEncoding, dimension: u32) -> ArrayEncoding { + pub fn fixed_size_list(data: ArrayEncoding, dimension: u64) -> ArrayEncoding { ArrayEncoding { array_encoding: Some(ArrayEncodingEnum::FixedSizeList(Box::new(FixedSizeList { - dimension, + dimension: dimension.try_into().unwrap(), items: Some(Box::new(data)), }))), } @@ -225,6 +225,20 @@ impl ProtobufUtils { } } + pub fn full_zip_layout( + bits_rep: u8, + bits_def: u8, + value_encoding: ArrayEncoding, + ) -> PageLayout { + PageLayout { + layout: Some(Layout::FullZipLayout(pb::FullZipLayout { + bits_rep: bits_rep as u32, + bits_def: bits_def as u32, + value_compression: Some(value_encoding), + })), + } + } + pub fn simple_all_null_layout() -> PageLayout { PageLayout { layout: Some(Layout::AllNullLayout(AllNullLayout {})), diff --git a/rust/lance-encoding/src/repdef.rs b/rust/lance-encoding/src/repdef.rs index c119f309da..82bbe68025 100644 --- a/rust/lance-encoding/src/repdef.rs +++ b/rust/lance-encoding/src/repdef.rs @@ -92,13 +92,13 @@ // This means we end up with 3 bits per level instead of 2. We could instead record // the layers that are all null somewhere else and not require wider rep levels. -use std::sync::Arc; +use std::{iter::Zip, sync::Arc}; use arrow_array::OffsetSizeTrait; use arrow_buffer::{ ArrowNativeType, BooleanBuffer, BooleanBufferBuilder, NullBuffer, OffsetBuffer, ScalarBuffer, }; -use lance_core::{Error, Result}; +use lance_core::{utils::bit::log_2_ceil, Error, Result}; use snafu::{location, Location}; // We assume 16 bits is good enough for rep-def levels. This gives us @@ -541,6 +541,442 @@ impl RepDefUnraveler { } } +/// A [`ControlWordIterator`] when there are both repetition and definition levels +/// +/// The iterator will put the repetition level in the upper bits and the definition +/// level in the lower bits. The number of bits used for each level is determined +/// by the width of the repetition and definition levels. +#[derive(Debug)] +pub struct BinaryControlWordIterator, W> { + repdef: I, + def_width: usize, + rep_mask: u16, + def_mask: u16, + bits_rep: u8, + bits_def: u8, + phantom: std::marker::PhantomData, +} + +impl> BinaryControlWordIterator { + fn append_next(&mut self, buf: &mut Vec) { + let next = self.repdef.next().unwrap(); + let control_word: u8 = + (((next.0 & self.rep_mask) as u8) << self.def_width) + ((next.1 & self.def_mask) as u8); + buf.push(control_word); + } +} + +impl> BinaryControlWordIterator { + fn append_next(&mut self, buf: &mut Vec) { + let next = self.repdef.next().unwrap(); + let control_word: u16 = + ((next.0 & self.rep_mask) << self.def_width) + (next.1 & self.def_mask); + let control_word = control_word.to_le_bytes(); + buf.push(control_word[0]); + buf.push(control_word[1]); + } +} + +impl> BinaryControlWordIterator { + fn append_next(&mut self, buf: &mut Vec) { + let next = self.repdef.next().unwrap(); + let control_word: u32 = (((next.0 & self.rep_mask) as u32) << self.def_width) + + ((next.1 & self.def_mask) as u32); + let control_word = control_word.to_le_bytes(); + buf.push(control_word[0]); + buf.push(control_word[1]); + buf.push(control_word[2]); + buf.push(control_word[3]); + } +} + +/// A [`ControlWordIterator`] when there are only definition levels or only repetition levels +#[derive(Debug)] +pub struct UnaryControlWordIterator, W> { + repdef: I, + level_mask: u16, + bits_rep: u8, + bits_def: u8, + phantom: std::marker::PhantomData, +} + +impl> UnaryControlWordIterator { + fn append_next(&mut self, buf: &mut Vec) { + let next = self.repdef.next().unwrap(); + buf.push((next & self.level_mask) as u8); + } +} + +impl> UnaryControlWordIterator { + fn append_next(&mut self, buf: &mut Vec) { + let next = self.repdef.next().unwrap() & self.level_mask; + let control_word = next.to_le_bytes(); + buf.push(control_word[0]); + buf.push(control_word[1]); + } +} + +impl> UnaryControlWordIterator { + fn append_next(&mut self, buf: &mut Vec) { + let next = (self.repdef.next().unwrap() & self.level_mask) as u32; + let control_word = next.to_le_bytes(); + buf.push(control_word[0]); + buf.push(control_word[1]); + buf.push(control_word[2]); + buf.push(control_word[3]); + } +} + +/// A [`ControlWordIterator`] when there are no repetition or definition levels +#[derive(Debug)] +pub struct NilaryControlWordIterator; + +/// Helper function to get a bit mask of the given width +fn get_mask(width: u16) -> u16 { + (1 << width) - 1 +} + +/// An iterator that generates control words from repetition and definition levels +/// +/// "Control word" is just a fancy term for a single u8/u16/u32 that contains both +/// the repetition and definition in it. +/// +/// In the large majority of case we only need a single byte to represent both the +/// repetition and definition levels. However, if there is deep nesting then we may +/// need two bytes. In the worst case we need 4 bytes though this suggests hundreds of +/// levels of nesting which seems unlikely to encounter in practice. +#[derive(Debug)] +pub enum ControlWordIterator { + Binary8(BinaryControlWordIterator, std::vec::IntoIter>, u8>), + Binary16(BinaryControlWordIterator, std::vec::IntoIter>, u16>), + Binary32(BinaryControlWordIterator, std::vec::IntoIter>, u32>), + Unary8(UnaryControlWordIterator, u8>), + Unary16(UnaryControlWordIterator, u16>), + Unary32(UnaryControlWordIterator, u32>), + Nilary(NilaryControlWordIterator), +} + +impl ControlWordIterator { + /// Appends the next control word to the buffer + pub fn append_next(&mut self, buf: &mut Vec) { + match self { + Self::Binary8(iter) => iter.append_next(buf), + Self::Binary16(iter) => iter.append_next(buf), + Self::Binary32(iter) => iter.append_next(buf), + Self::Unary8(iter) => iter.append_next(buf), + Self::Unary16(iter) => iter.append_next(buf), + Self::Unary32(iter) => iter.append_next(buf), + Self::Nilary(_) => {} + } + } + + /// Returns the number of bytes per control word + pub fn bytes_per_word(&self) -> usize { + match self { + Self::Binary8(_) => 1, + Self::Binary16(_) => 2, + Self::Binary32(_) => 4, + Self::Unary8(_) => 1, + Self::Unary16(_) => 2, + Self::Unary32(_) => 4, + Self::Nilary(_) => 0, + } + } + + /// Returns the number of bits used for the repetition level + pub fn bits_rep(&self) -> u8 { + match self { + Self::Binary8(iter) => iter.bits_rep, + Self::Binary16(iter) => iter.bits_rep, + Self::Binary32(iter) => iter.bits_rep, + Self::Unary8(iter) => iter.bits_rep, + Self::Unary16(iter) => iter.bits_rep, + Self::Unary32(iter) => iter.bits_rep, + Self::Nilary(_) => 0, + } + } + + /// Returns the number of bits used for the definition level + pub fn bits_def(&self) -> u8 { + match self { + Self::Binary8(iter) => iter.bits_def, + Self::Binary16(iter) => iter.bits_def, + Self::Binary32(iter) => iter.bits_def, + Self::Unary8(iter) => iter.bits_def, + Self::Unary16(iter) => iter.bits_def, + Self::Unary32(iter) => iter.bits_def, + Self::Nilary(_) => 0, + } + } +} + +/// Builds a [`ControlWordIterator`] from repetition and definition levels +/// by first calculating the width needed and then creating the iterator +/// with the appropriate width +pub fn build_control_word_iterator( + rep: Option>, + max_rep: u16, + def: Option>, + max_def: u16, +) -> ControlWordIterator { + let rep_width = if max_rep == 0 { + 0 + } else { + log_2_ceil(max_rep as u32) as u16 + }; + let rep_mask = if max_rep == 0 { 0 } else { get_mask(rep_width) }; + let def_width = if max_def == 0 { + 0 + } else { + log_2_ceil(max_def as u32) as u16 + }; + let def_mask = if max_def == 0 { 0 } else { get_mask(def_width) }; + let total_width = rep_width + def_width; + match (rep, def) { + (Some(rep), Some(def)) => { + let iter = rep.into_iter().zip(def); + let def_width = def_width as usize; + if total_width <= 8 { + ControlWordIterator::Binary8(BinaryControlWordIterator { + repdef: iter, + rep_mask, + def_mask, + def_width, + bits_rep: rep_width as u8, + bits_def: def_width as u8, + phantom: std::marker::PhantomData, + }) + } else if total_width <= 16 { + ControlWordIterator::Binary16(BinaryControlWordIterator { + repdef: iter, + rep_mask, + def_mask, + def_width, + bits_rep: rep_width as u8, + bits_def: def_width as u8, + phantom: std::marker::PhantomData, + }) + } else { + ControlWordIterator::Binary32(BinaryControlWordIterator { + repdef: iter, + rep_mask, + def_mask, + def_width, + bits_rep: rep_width as u8, + bits_def: def_width as u8, + phantom: std::marker::PhantomData, + }) + } + } + (Some(lev), None) => { + let iter = lev.into_iter(); + if total_width <= 8 { + ControlWordIterator::Unary8(UnaryControlWordIterator { + repdef: iter, + level_mask: rep_mask, + bits_rep: total_width as u8, + bits_def: 0, + phantom: std::marker::PhantomData, + }) + } else if total_width <= 16 { + ControlWordIterator::Unary16(UnaryControlWordIterator { + repdef: iter, + level_mask: rep_mask, + bits_rep: total_width as u8, + bits_def: 0, + phantom: std::marker::PhantomData, + }) + } else { + ControlWordIterator::Unary32(UnaryControlWordIterator { + repdef: iter, + level_mask: rep_mask, + bits_rep: total_width as u8, + bits_def: 0, + phantom: std::marker::PhantomData, + }) + } + } + (None, Some(lev)) => { + let iter = lev.into_iter(); + if total_width <= 8 { + ControlWordIterator::Unary8(UnaryControlWordIterator { + repdef: iter, + level_mask: def_mask, + bits_rep: 0, + bits_def: total_width as u8, + phantom: std::marker::PhantomData, + }) + } else if total_width <= 16 { + ControlWordIterator::Unary16(UnaryControlWordIterator { + repdef: iter, + level_mask: def_mask, + bits_rep: 0, + bits_def: total_width as u8, + phantom: std::marker::PhantomData, + }) + } else { + ControlWordIterator::Unary32(UnaryControlWordIterator { + repdef: iter, + level_mask: def_mask, + bits_rep: 0, + bits_def: total_width as u8, + phantom: std::marker::PhantomData, + }) + } + } + (None, None) => ControlWordIterator::Nilary(NilaryControlWordIterator {}), + } +} + +/// A parser to unwrap control words into repetition and definition levels +/// +/// This is the inverse of the [`ControlWordIterator`]. +#[derive(Copy, Clone, Debug)] +pub enum ControlWordParser { + // First item is the bits to shift, second is the mask to apply (the mask can be + // calculated from the bits to shift but we don't want to calculate it each time) + BOTH8(u8, u32), + BOTH16(u8, u32), + BOTH32(u8, u32), + REP8, + REP16, + REP32, + DEF8, + DEF16, + DEF32, + NIL, +} + +impl ControlWordParser { + fn parse_both( + src: &[u8], + dst_rep: &mut Vec, + dst_def: &mut Vec, + bits_to_shift: u8, + mask_to_apply: u32, + ) { + match WORD_SIZE { + 1 => { + let word = src[0]; + let rep = word >> bits_to_shift; + let def = word & (mask_to_apply as u8); + dst_rep.push(rep as u16); + dst_def.push(def as u16); + } + 2 => { + let word = u16::from_le_bytes([src[0], src[1]]); + let rep = word >> bits_to_shift; + let def = word & mask_to_apply as u16; + dst_rep.push(rep); + dst_def.push(def); + } + 4 => { + let word = u32::from_le_bytes([src[0], src[1], src[2], src[3]]); + let rep = word >> bits_to_shift; + let def = word & mask_to_apply; + dst_rep.push(rep as u16); + dst_def.push(def as u16); + } + _ => unreachable!(), + } + } + + fn parse_one(src: &[u8], dst: &mut Vec) { + match WORD_SIZE { + 1 => { + let word = src[0]; + dst.push(word as u16); + } + 2 => { + let word = u16::from_le_bytes([src[0], src[1]]); + dst.push(word); + } + 4 => { + let word = u32::from_le_bytes([src[0], src[1], src[2], src[3]]); + dst.push(word as u16); + } + _ => unreachable!(), + } + } + + /// Returns the number of bytes per control word + pub fn bytes_per_word(&self) -> usize { + match self { + Self::BOTH8(..) => 1, + Self::BOTH16(..) => 2, + Self::BOTH32(..) => 4, + Self::REP8 => 1, + Self::REP16 => 2, + Self::REP32 => 4, + Self::DEF8 => 1, + Self::DEF16 => 2, + Self::DEF32 => 4, + Self::NIL => 0, + } + } + + /// Appends the next control word to the rep & def buffers + /// + /// `src` should be pointing at the first byte (little endian) of the control word + /// + /// `dst_rep` and `dst_def` are the buffers to append the rep and def levels to. + /// They will not be appended to if not needed. + pub fn parse(&self, src: &[u8], dst_rep: &mut Vec, dst_def: &mut Vec) { + match self { + Self::BOTH8(bits_to_shift, mask_to_apply) => { + Self::parse_both::<1>(src, dst_rep, dst_def, *bits_to_shift, *mask_to_apply) + } + Self::BOTH16(bits_to_shift, mask_to_apply) => { + Self::parse_both::<2>(src, dst_rep, dst_def, *bits_to_shift, *mask_to_apply) + } + Self::BOTH32(bits_to_shift, mask_to_apply) => { + Self::parse_both::<4>(src, dst_rep, dst_def, *bits_to_shift, *mask_to_apply) + } + Self::REP8 => Self::parse_one::<1>(src, dst_rep), + Self::REP16 => Self::parse_one::<2>(src, dst_rep), + Self::REP32 => Self::parse_one::<4>(src, dst_rep), + Self::DEF8 => Self::parse_one::<1>(src, dst_def), + Self::DEF16 => Self::parse_one::<2>(src, dst_def), + Self::DEF32 => Self::parse_one::<4>(src, dst_def), + Self::NIL => {} + } + } + + /// Creates a new parser from the number of bits used for the repetition and definition levels + pub fn new(bits_rep: u8, bits_def: u8) -> Self { + let total_bits = bits_rep + bits_def; + + enum WordSize { + One, + Two, + Four, + } + + let word_size = if total_bits <= 8 { + WordSize::One + } else if total_bits <= 16 { + WordSize::Two + } else { + WordSize::Four + }; + + match (bits_rep > 0, bits_def > 0, word_size) { + (false, false, _) => Self::NIL, + (false, true, WordSize::One) => Self::DEF8, + (false, true, WordSize::Two) => Self::DEF16, + (false, true, WordSize::Four) => Self::DEF32, + (true, false, WordSize::One) => Self::REP8, + (true, false, WordSize::Two) => Self::REP16, + (true, false, WordSize::Four) => Self::REP32, + (true, true, WordSize::One) => Self::BOTH8(bits_def, get_mask(bits_def as u16) as u32), + (true, true, WordSize::Two) => Self::BOTH16(bits_def, get_mask(bits_def as u16) as u32), + (true, true, WordSize::Four) => { + Self::BOTH32(bits_def, get_mask(bits_def as u16) as u32) + } + } + } +} + #[cfg(test)] mod tests { use arrow_buffer::{NullBuffer, OffsetBuffer, ScalarBuffer}; @@ -688,4 +1124,108 @@ mod tests { assert_eq!(vec![2, 1, 0, 2, 0, 2, 0, 1, 0], rep); assert_eq!(vec![0, 0, 0, 3, 3, 2, 2, 0, 1], def); } + + #[test] + fn test_control_words() { + // Convert to control words, verify expected, convert back, verify same as original + fn check( + rep: Vec, + def: Vec, + expected_values: Vec, + expected_bytes_per_word: usize, + expected_bits_rep: u8, + expected_bits_def: u8, + ) { + let num_vals = rep.len().max(def.len()); + let max_rep = rep.iter().max().copied().unwrap_or(0); + let max_def = def.iter().max().copied().unwrap_or(0); + + let in_rep = if rep.is_empty() { + None + } else { + Some(rep.clone()) + }; + let in_def = if def.is_empty() { + None + } else { + Some(def.clone()) + }; + + let mut iter = super::build_control_word_iterator(in_rep, max_rep, in_def, max_def); + assert_eq!(iter.bytes_per_word(), expected_bytes_per_word); + assert_eq!(iter.bits_rep(), expected_bits_rep); + assert_eq!(iter.bits_def(), expected_bits_def); + let mut cw_vec = Vec::with_capacity(num_vals * iter.bytes_per_word()); + + for _ in 0..num_vals { + iter.append_next(&mut cw_vec); + } + + assert_eq!(expected_values, cw_vec); + + let parser = super::ControlWordParser::new(expected_bits_rep, expected_bits_def); + + let mut rep_out = Vec::with_capacity(num_vals); + let mut def_out = Vec::with_capacity(num_vals); + + if expected_bytes_per_word > 0 { + for slice in cw_vec.chunks_exact(expected_bytes_per_word) { + parser.parse(slice, &mut rep_out, &mut def_out); + } + } + + assert_eq!(rep, rep_out); + assert_eq!(def, def_out); + } + + // Each will need 4 bits and so we should get 1-byte control words + let rep = vec![0_u16, 7, 3, 2, 9, 8, 12, 5]; + let def = vec![5_u16, 3, 1, 2, 12, 15, 0, 2]; + let expected = vec![ + 0b00000101, // 0, 5 + 0b01110011, // 7, 3 + 0b00110001, // 3, 1 + 0b00100010, // 2, 2 + 0b10011100, // 9, 12 + 0b10001111, // 8, 15 + 0b11000000, // 12, 0 + 0b01010010, // 5, 2 + ]; + check(rep, def, expected, 1, 4, 4); + + // Now we need 5 bits for def so we get 2-byte control words + let rep = vec![0_u16, 7, 3, 2, 9, 8, 12, 5]; + let def = vec![5_u16, 3, 1, 2, 12, 22, 0, 2]; + let expected = vec![ + 0b00000101, 0b00000000, // 0, 5 + 0b11100011, 0b00000000, // 7, 3 + 0b01100001, 0b00000000, // 3, 1 + 0b01000010, 0b00000000, // 2, 2 + 0b00101100, 0b00000001, // 9, 12 + 0b00010110, 0b00000001, // 8, 22 + 0b10000000, 0b00000001, // 12, 0 + 0b10100010, 0b00000000, // 5, 2 + ]; + check(rep, def, expected, 2, 4, 5); + + // Just rep, 4 bits so 1 byte each + let levels = vec![0_u16, 7, 3, 2, 9, 8, 12, 5]; + let expected = vec![ + 0b00000000, // 0 + 0b00000111, // 7 + 0b00000011, // 3 + 0b00000010, // 2 + 0b00001001, // 9 + 0b00001000, // 8 + 0b00001100, // 12 + 0b00000101, // 5 + ]; + check(levels.clone(), Vec::default(), expected.clone(), 1, 4, 0); + + // Just def + check(Vec::default(), levels, expected, 1, 0, 4); + + // No rep, no def, no bytes + check(Vec::default(), Vec::default(), Vec::default(), 0, 0, 0); + } } From 2e43b23407473253ec59aa9c7a5d6ee8b336321d Mon Sep 17 00:00:00 2001 From: Weston Pace Date: Mon, 11 Nov 2024 15:37:03 -0800 Subject: [PATCH 2/3] Make endianess assertion a compile-time check instead of a runtime check --- rust/lance-core/src/utils/bit.rs | 9 --------- rust/lance-encoding/src/buffer.rs | 7 ++----- 2 files changed, 2 insertions(+), 14 deletions(-) diff --git a/rust/lance-core/src/utils/bit.rs b/rust/lance-core/src/utils/bit.rs index 345e5f67cc..75a13e783a 100644 --- a/rust/lance-core/src/utils/bit.rs +++ b/rust/lance-core/src/utils/bit.rs @@ -59,15 +59,6 @@ pub fn log_2_ceil(val: u32) -> u32 { } } -/// Asserts if the currently running system is not little_endian -pub fn check_little_endian() { - assert_eq!( - 1u16.to_le_bytes(), - [1, 0], - "This code is for little-endian systems only" - ); -} - #[cfg(test)] pub mod tests { diff --git a/rust/lance-encoding/src/buffer.rs b/rust/lance-encoding/src/buffer.rs index abca880cd9..832d7b90df 100644 --- a/rust/lance-encoding/src/buffer.rs +++ b/rust/lance-encoding/src/buffer.rs @@ -8,10 +8,7 @@ use std::{ops::Deref, ptr::NonNull, sync::Arc}; use arrow_buffer::{ArrowNativeType, Buffer, MutableBuffer, ScalarBuffer}; use snafu::{location, Location}; -use lance_core::{ - utils::bit::{check_little_endian, is_pwr_two}, - Error, Result, -}; +use lance_core::{utils::bit::is_pwr_two, Error, Result}; /// A copy-on-write byte buffer /// @@ -225,8 +222,8 @@ impl LanceBuffer { /// Reinterprets a LanceBuffer into a Vec /// /// If the underlying buffer is not properly aligned, this will involve a copy of the data + #[cfg(target_endian = "little")] pub fn borrow_to_typed_slice(&mut self) -> impl AsRef<[T]> { - check_little_endian(); let align = std::mem::align_of::(); let is_aligned = self.as_ptr().align_offset(align) == 0; if self.len() % std::mem::size_of::() != 0 { From 7d8a7142f5500eaed918afa58030923aa0233495 Mon Sep 17 00:00:00 2001 From: Weston Pace Date: Wed, 13 Nov 2024 06:43:26 -0800 Subject: [PATCH 3/3] Change where we enforce endianness --- rust/lance-encoding/src/buffer.rs | 6 +++++- rust/lance-encoding/src/lib.rs | 6 ++++++ 2 files changed, 11 insertions(+), 1 deletion(-) diff --git a/rust/lance-encoding/src/buffer.rs b/rust/lance-encoding/src/buffer.rs index 832d7b90df..f62f447a6b 100644 --- a/rust/lance-encoding/src/buffer.rs +++ b/rust/lance-encoding/src/buffer.rs @@ -222,7 +222,11 @@ impl LanceBuffer { /// Reinterprets a LanceBuffer into a Vec /// /// If the underlying buffer is not properly aligned, this will involve a copy of the data - #[cfg(target_endian = "little")] + /// + /// Note: doing this sort of re-interpretation generally makes assumptions about the endianness + /// of the data. Lance does not support big-endian machines so this is safe. However, if we end + /// up supporting big-endian machines in the future, then any use of this method will need to be + /// carefully reviewed. pub fn borrow_to_typed_slice(&mut self) -> impl AsRef<[T]> { let align = std::mem::align_of::(); let is_aligned = self.as_ptr().align_offset(align) == 0; diff --git a/rust/lance-encoding/src/lib.rs b/rust/lance-encoding/src/lib.rs index d4261ae170..19bcd721ed 100644 --- a/rust/lance-encoding/src/lib.rs +++ b/rust/lance-encoding/src/lib.rs @@ -21,6 +21,12 @@ pub mod statistics; pub mod testing; pub mod version; +// We can definitely add support for big-endian machines someday. However, it's not a priority and +// would involve extensive testing (probably through emulation) to ensure that the encodings are +// correct. +#[cfg(not(target_endian = "little"))] +compile_error!("Lance encodings only support little-endian systems."); + /// A trait for an I/O service /// /// This represents the I/O API that the encoders and decoders need in order to operate.