From 964fee3802b957eb5c0b5103c0c39af70f05f0e3 Mon Sep 17 00:00:00 2001 From: broccoliSpicy Date: Wed, 30 Oct 2024 16:53:17 +0000 Subject: [PATCH 01/12] bitpack with miniblock --- protos/encodings.proto | 9 + python/play.py | 96 ++++++++ rust/lance-encoding/Cargo.toml | 1 + rust/lance-encoding/src/decoder.rs | 4 + rust/lance-encoding/src/encoder.rs | 11 +- rust/lance-encoding/src/encodings/physical.rs | 1 + .../encodings/physical/bitpack_fastlanes.rs | 214 +++++++++++++++++- rust/lance-encoding/src/format.rs | 13 +- .../src/scalar/inverted/builder.rs | 4 +- .../src/scalar/inverted/tokenizer.rs | 2 +- 10 files changed, 346 insertions(+), 9 deletions(-) create mode 100644 python/play.py diff --git a/protos/encodings.proto b/protos/encodings.proto index 6ad252dece..66adb647d9 100644 --- a/protos/encodings.proto +++ b/protos/encodings.proto @@ -211,6 +211,14 @@ message BitpackedForNonNeg { Buffer buffer = 3; } +message Bitpack2 { + // the number of bits of the uncompressed value. e.g. for a u32, this will be 32 + uint64 uncompressed_bits_per_value = 2; + + // The items in the list + Buffer buffer = 3; +} + // An array encoding for shredded structs that will never be null // // There is no actual data in this column. @@ -263,6 +271,7 @@ message ArrayEncoding { FixedSizeBinary fixed_size_binary = 11; BitpackedForNonNeg bitpacked_for_non_neg = 12; Constant constant = 13; + Bitpack2 bitpack2 = 14; } } diff --git a/python/play.py b/python/play.py new file mode 100644 index 0000000000..58e29be33e --- /dev/null +++ b/python/play.py @@ -0,0 +1,96 @@ +import pyarrow as pa +import pyarrow.parquet as pq +import datetime +import os +import numpy as np +from lance.file import LanceFileReader, LanceFileWriter + +def format_throughput(value): + return f"{value:.2f} GiB/s" + +parquet_file_path = "/home/admin/tmp/int64_column.parquet" +lance_file_path = "/home/admin/tmp/int64_column.lance" + +# Generate 1024 * 1024 * 512 random values modulo 1024 using NumPy +values = np.random.randint(0, 1024, size=1024 * 1024 * 64 + 1, dtype=np.int64) +#values = [8] * 1024 +int64_array = pa.array(values, type=pa.int64()) +table = pa.Table.from_arrays([int64_array], names=['int64_column']) + +values = np.random.randint(0, 1024, size=1024 * 1024 * 4 + 1, dtype=np.int32) +#values = [8] * 1024 +int32_array = pa.array(values, type=pa.int32()) +table = pa.Table.from_arrays([int32_array], names=['int_column']) + +start = datetime.datetime.now() +# Write the table to a Parquet file +pq.write_table(table, parquet_file_path) +end = datetime.datetime.now() +elapsed_parquet = (end - start).total_seconds() +print(f"Parquet write time: {elapsed_parquet:.2f}s") + +start = datetime.datetime.now() +# Write the table to a Lance file +with LanceFileWriter(lance_file_path, version="2.1") as writer: + writer.write_batch(table) +end = datetime.datetime.now() +elapsed_lance = (end - start).total_seconds() +print(f"Lance write time: {elapsed_lance:.2f}s") + +# Flush file from kernel cache +os.system('sync; echo 3 | sudo tee /proc/sys/vm/drop_caches') + +# Measure read time for Parquet file with batch size +batch_size = 32 * 1024 +start = datetime.datetime.now() +parquet_file = pq.ParquetFile(parquet_file_path) +batches = parquet_file.iter_batches(batch_size=batch_size) +tab_parquet = pa.Table.from_batches(batches) +end = datetime.datetime.now() +elapsed_parquet = (end - start).total_seconds() + +# Flush file from kernel cache again before reading Lance file +os.system('sync; echo 3 | sudo tee /proc/sys/vm/drop_caches') + +# Measure read time for Lance file with batch size +start = datetime.datetime.now() +tab_lance = LanceFileReader(lance_file_path).read_all(batch_size=batch_size).to_table() +end = datetime.datetime.now() +elapsed_lance = (end - start).total_seconds() + +num_rows = tab_parquet['int_column'].length() + +print(f"Number of rows: {num_rows}") +print(f"Parquet read time: {elapsed_parquet:.2f}s") +print(f"Lance read time: {elapsed_lance:.2f}s") + +# Compute total memory size +parquet_memory_size = tab_parquet.get_total_buffer_size() +lance_memory_size = tab_lance.get_total_buffer_size() + +# Convert memory size to GiB +parquet_memory_size_gib = parquet_memory_size / (1024 * 1024 * 1024) +lance_memory_size_gib = lance_memory_size / (1024 * 1024 * 1024) + +# Compute read throughput in GiB/sec +throughput_parquet_gib = parquet_memory_size_gib / elapsed_parquet +throughput_lance_gib = lance_memory_size_gib / elapsed_lance + +# Format throughput values +formatted_throughput_parquet_gib = format_throughput(throughput_parquet_gib) +formatted_throughput_lance_gib = format_throughput(throughput_lance_gib) + +print(f"Parquet read throughput: {formatted_throughput_parquet_gib}") +print(f"Lance read throughput: {formatted_throughput_lance_gib}") + +# Check file sizes +lance_file_size = os.path.getsize(lance_file_path) +lance_file_size_mib = lance_file_size // 1048576 +parquet_file_size = os.path.getsize(parquet_file_path) +parquet_file_size_mib = parquet_file_size // 1048576 + +print(f"Parquet file size: {parquet_file_size} bytes ({parquet_file_size_mib:,} MiB)") +print(f"Lance file size: {lance_file_size} bytes ({lance_file_size_mib:,} MiB)") + +# Assert that the tables are equal +assert tab_parquet == tab_lance \ No newline at end of file diff --git a/rust/lance-encoding/Cargo.toml b/rust/lance-encoding/Cargo.toml index 4cc94ae4be..56205f2f98 100644 --- a/rust/lance-encoding/Cargo.toml +++ b/rust/lance-encoding/Cargo.toml @@ -42,6 +42,7 @@ bytemuck = "=1.18.0" arrayref = "0.3.7" paste = "1.0.15" seq-macro = "0.3.5" +byteorder.workspace = true [dev-dependencies] lance-testing.workspace = true diff --git a/rust/lance-encoding/src/decoder.rs b/rust/lance-encoding/src/decoder.rs index 25ff64e923..b6c72a5caa 100644 --- a/rust/lance-encoding/src/decoder.rs +++ b/rust/lance-encoding/src/decoder.rs @@ -246,6 +246,7 @@ use crate::encodings::logical::primitive::{ use crate::encodings::logical::r#struct::{ SimpleStructDecoder, SimpleStructScheduler, StructuralStructDecoder, StructuralStructScheduler, }; +use crate::encodings::physical::bitpack_fastlanes::BitpackMiniBlockDecompressor; use crate::encodings::physical::value::{ConstantDecompressor, ValueDecompressor}; use crate::encodings::physical::{ColumnBuffers, FileBuffers}; use crate::format::pb::{self, column_encoding}; @@ -490,6 +491,9 @@ impl DecompressorStrategy for CoreDecompressorStrategy { pb::array_encoding::ArrayEncoding::Flat(flat) => { Ok(Box::new(ValueDecompressor::new(flat))) } + pb::array_encoding::ArrayEncoding::Bitpack2(description) => { + Ok(Box::new(BitpackMiniBlockDecompressor::new(description))) + } _ => todo!(), } } diff --git a/rust/lance-encoding/src/encoder.rs b/rust/lance-encoding/src/encoder.rs index 837da5769c..96aa0c724b 100644 --- a/rust/lance-encoding/src/encoder.rs +++ b/rust/lance-encoding/src/encoder.rs @@ -22,8 +22,10 @@ use crate::encodings::logical::blob::BlobFieldEncoder; use crate::encodings::logical::primitive::PrimitiveStructuralEncoder; use crate::encodings::logical::r#struct::StructFieldEncoder; use crate::encodings::logical::r#struct::StructStructuralEncoder; -use crate::encodings::physical::bitpack_fastlanes::compute_compressed_bit_width_for_non_neg; use crate::encodings::physical::bitpack_fastlanes::BitpackedForNonNegArrayEncoder; +use crate::encodings::physical::bitpack_fastlanes::{ + compute_compressed_bit_width_for_non_neg, BitpackMiniBlockEncoder, +}; use crate::encodings::physical::block_compress::{CompressionConfig, CompressionScheme}; use crate::encodings::physical::dictionary::AlreadyDictionaryEncoder; use crate::encodings::physical::fsst::FsstArrayEncoder; @@ -773,9 +775,14 @@ impl CompressionStrategy for CoreArrayEncodingStrategy { fn create_miniblock_compressor( &self, field: &Field, - _data: &DataBlock, + data: &DataBlock, ) -> Result> { assert!(field.data_type().byte_width() > 0); + if let DataBlock::FixedWidth(ref fixed_width_data) = data { + if fixed_width_data.bits_per_value <= 64 { + return Ok(Box::new(BitpackMiniBlockEncoder::default())); + } + } Ok(Box::new(ValueEncoder::default())) } diff --git a/rust/lance-encoding/src/encodings/physical.rs b/rust/lance-encoding/src/encodings/physical.rs index cbedb5dd17..b349513efa 100644 --- a/rust/lance-encoding/src/encodings/physical.rs +++ b/rust/lance-encoding/src/encodings/physical.rs @@ -283,6 +283,7 @@ pub fn decoder_from_array_encoding( pb::array_encoding::ArrayEncoding::Struct(_) => unreachable!(), // 2.1 only pb::array_encoding::ArrayEncoding::Constant(_) => unreachable!(), + pb::array_encoding::ArrayEncoding::Bitpack2(_) => unreachable!(), } } diff --git a/rust/lance-encoding/src/encodings/physical/bitpack_fastlanes.rs b/rust/lance-encoding/src/encodings/physical/bitpack_fastlanes.rs index 8900418e68..ca8029c94e 100644 --- a/rust/lance-encoding/src/encodings/physical/bitpack_fastlanes.rs +++ b/rust/lance-encoding/src/encodings/physical/bitpack_fastlanes.rs @@ -8,6 +8,7 @@ use arrow::datatypes::{ }; use arrow_array::{Array, PrimitiveArray}; use arrow_schema::DataType; +use byteorder::{ByteOrder, LittleEndian}; use bytes::Bytes; use futures::future::{BoxFuture, FutureExt}; use log::trace; @@ -20,9 +21,12 @@ use crate::buffer::LanceBuffer; use crate::compression_algo::fastlanes::BitPacking; use crate::data::{BlockInfo, UsedEncoding}; use crate::data::{DataBlock, FixedWidthDataBlock, NullableDataBlock}; -use crate::decoder::{PageScheduler, PrimitivePageDecoder}; -use crate::encoder::{ArrayEncoder, EncodedArray}; -use crate::format::ProtobufUtils; +use crate::decoder::{MiniBlockDecompressor, PageScheduler, PrimitivePageDecoder}; +use crate::encoder::{ + ArrayEncoder, EncodedArray, MiniBlockChunk, MiniBlockCompressed, MiniBlockCompressor, +}; +use crate::format::{pb, ProtobufUtils}; +use crate::statistics::{GetStat, Stat}; use arrow::array::ArrayRef; use bytemuck::cast_slice; const ELEMS_PER_CHUNK: u64 = 1024; @@ -1560,3 +1564,207 @@ mod tests { // check_round_trip_bitpacked(arr).await; // } } + +// This macro chunks the FixedWidth DataBlock, bitpacks them, +// put the bit-width parameter in front of each chunk, +// the bit-width parameter has the same bit-width as the uncompressed DataBlock +macro_rules! chunk_data_impl { + ($data:expr, $data_type:ty) => {{ + let data_buffer = $data.data.borrow_to_typed_slice::<$data_type>(); + let data_buffer = data_buffer.as_ref(); + + let bit_widths = $data + .get_stat(Stat::BitWidth) + .expect("FixedWidthDataBlock should have valid bit width statistics"); + println!("bit_widths statistics got"); + let bit_widths_array = bit_widths + .as_any() + .downcast_ref::>() + .unwrap(); + + let (packed_chunk_sizes, total_size) = bit_widths_array + .values() + .iter() + .map(|&bit_width| { + let chunk_size = ((1024 * bit_width) / $data.bits_per_value) as usize; + (chunk_size, chunk_size + 1) + }) + .fold( + (Vec::with_capacity(bit_widths_array.len()), 0), + |(mut sizes, total), (size, inc)| { + sizes.push(size); + (sizes, total + inc) + }, + ); + + let mut output: Vec<$data_type> = Vec::with_capacity(total_size); + let mut chunks = Vec::with_capacity(bit_widths_array.len()); + + for i in 0..bit_widths_array.len() - 1 { + let start_elem = i * ELEMS_PER_CHUNK as usize; + let bit_width = bit_widths_array.value(i) as $data_type; + output.push(bit_width); + let output_len = output.len(); + unsafe { + output.set_len(output_len + packed_chunk_sizes[i]); + BitPacking::unchecked_pack( + bit_width as usize, + &data_buffer[start_elem..][..ELEMS_PER_CHUNK as usize], + &mut output[output_len..][..packed_chunk_sizes[i]], + ); + } + chunks.push(MiniBlockChunk { + num_bytes: ((1 + packed_chunk_sizes[i]) * std::mem::size_of::<$data_type>()) as u16, + log_num_values: 10, + }); + } + + // Handle the last chunk + let last_chunk_elem_num = if $data.num_values % ELEMS_PER_CHUNK == 0 { + 1024 + } else { + $data.num_values % ELEMS_PER_CHUNK + }; + let mut last_chunk = vec![0; ELEMS_PER_CHUNK as usize]; + last_chunk[..last_chunk_elem_num as usize].clone_from_slice( + &data_buffer[$data.num_values as usize - last_chunk_elem_num as usize..], + ); + let bit_width = bit_widths_array.value(bit_widths_array.len() - 1) as $data_type; + output.push(bit_width); + let output_len = output.len(); + unsafe { + output.set_len(output_len + packed_chunk_sizes[bit_widths_array.len() - 1]); + BitPacking::unchecked_pack( + bit_width as usize, + &last_chunk, + &mut output[output_len..][..packed_chunk_sizes[bit_widths_array.len() - 1]], + ); + } + chunks.push(MiniBlockChunk { + num_bytes: ((1 + packed_chunk_sizes[bit_widths_array.len() - 1]) + * std::mem::size_of::<$data_type>()) as u16, + log_num_values: 0, + }); + + ( + MiniBlockCompressed { + data: LanceBuffer::reinterpret_vec(output), + chunks, + num_values: $data.num_values, + }, + ProtobufUtils::bitpack2($data.bits_per_value, 0), + ) + }}; +} + +/// A compression strategy that writes fixed-width data as-is (no compression) +#[derive(Debug, Default)] +pub struct BitpackMiniBlockEncoder {} + +impl BitpackMiniBlockEncoder { + fn chunk_data( + &self, + mut data: FixedWidthDataBlock, + ) -> (MiniBlockCompressed, crate::format::pb::ArrayEncoding) { + assert!(data.bits_per_value % 8 == 0); + match data.bits_per_value { + 8 => chunk_data_impl!(data, u8), + 16 => chunk_data_impl!(data, u16), + 32 => chunk_data_impl!(data, u32), + 64 => chunk_data_impl!(data, u64), + _ => unreachable!(), + } + } +} + +impl MiniBlockCompressor for BitpackMiniBlockEncoder { + fn compress( + &self, + chunk: DataBlock, + ) -> Result<( + crate::encoder::MiniBlockCompressed, + crate::format::pb::ArrayEncoding, + )> { + match chunk { + DataBlock::FixedWidth(fixed_width) => Ok(self.chunk_data(fixed_width)), + _ => Err(Error::InvalidInput { + source: format!( + "Cannot compress a data block of type {} with ValueEncoder", + chunk.name() + ) + .into(), + location: location!(), + }), + } + } +} + +/// A decompressor for fixed-width data that has +/// been written, as-is, to disk in single contiguous array +#[derive(Debug)] +pub struct BitpackMiniBlockDecompressor { + uncompressed_bit_width: u64, +} + +impl BitpackMiniBlockDecompressor { + pub fn new(description: &pb::Bitpack2) -> Self { + Self { + uncompressed_bit_width: description.uncompressed_bits_per_value, + } + } +} + +impl MiniBlockDecompressor for BitpackMiniBlockDecompressor { + fn decompress(&self, data: LanceBuffer, num_values: u64) -> Result { + assert!(data.len() >= 8); + assert!(num_values <= ELEMS_PER_CHUNK); + + macro_rules! decompress_case { + ($type:ty) => {{ + let bit_width = std::mem::size_of::<$type>() * 8; + let mut decompressed = vec![0 as $type; ELEMS_PER_CHUNK as usize]; + + // Copy for memory alignment + let chunk_in_u8: Vec = data.to_vec(); + let bit_width_bytes = &chunk_in_u8[..std::mem::size_of::<$type>()]; + let bit_width_value = LittleEndian::read_uint(bit_width_bytes, std::mem::size_of::<$type>()); + let chunk = cast_slice(&chunk_in_u8[std::mem::size_of::<$type>()..]); + + // The bit-packed chunk should have number of bytes (bit_width_value * ELEMS_PER_CHUNK / 8) + assert!(chunk.len() * std::mem::size_of::<$type>() == (bit_width_value * ELEMS_PER_CHUNK as u64) as usize / 8); + + unsafe { + BitPacking::unchecked_unpack( + bit_width_value as usize, + chunk, + &mut decompressed, + ); + } + + decompressed.shrink_to(num_values as usize); + Ok(DataBlock::FixedWidth(FixedWidthDataBlock { + data: LanceBuffer::reinterpret_vec(decompressed), + bits_per_value: bit_width as u64, + num_values, + block_info: BlockInfo::new(), + used_encoding: UsedEncoding::new(), + })) + }}; + } + + match self.uncompressed_bit_width { + 8 => decompress_case!(u8), + 16 => decompress_case!(u16), + 32 => decompress_case!(u32), + 64 => decompress_case!(u64), + _ => todo!(), + } + } +} + +mod test { + #[test] + fn hello() { + println!("hello"); + } +} diff --git a/rust/lance-encoding/src/format.rs b/rust/lance-encoding/src/format.rs index 647e683b86..8938636589 100644 --- a/rust/lance-encoding/src/format.rs +++ b/rust/lance-encoding/src/format.rs @@ -19,7 +19,7 @@ use pb::{ buffer::BufferType, nullable::{AllNull, NoNull, Nullability, SomeNull}, page_layout::Layout, - AllNullLayout, ArrayEncoding, Binary, Bitpacked, BitpackedForNonNeg, Dictionary, + AllNullLayout, ArrayEncoding, Binary, Bitpack2, Bitpacked, BitpackedForNonNeg, Dictionary, FixedSizeBinary, FixedSizeList, Flat, Fsst, MiniBlockLayout, Nullable, PackedStruct, PageLayout, }; @@ -125,6 +125,17 @@ impl ProtobufUtils { })), } } + pub fn bitpack2(uncompressed_bits_per_value: u64, buffer_index: u32) -> ArrayEncoding { + ArrayEncoding { + array_encoding: Some(ArrayEncodingEnum::Bitpack2(Bitpack2 { + buffer: Some(pb::Buffer { + buffer_index, + buffer_type: BufferType::Page as i32, + }), + uncompressed_bits_per_value, + })), + } + } pub fn packed_struct( child_encodings: Vec, diff --git a/rust/lance-index/src/scalar/inverted/builder.rs b/rust/lance-index/src/scalar/inverted/builder.rs index 4a73cc4dda..aee7fdde95 100644 --- a/rust/lance-index/src/scalar/inverted/builder.rs +++ b/rust/lance-index/src/scalar/inverted/builder.rs @@ -561,7 +561,7 @@ impl IndexWorker { } } -pub(crate) struct PostingReader { +pub struct PostingReader { tmpdir: Option, existing_tokens: HashMap, inverted_list_reader: Option>, @@ -705,7 +705,7 @@ impl Ord for OrderedDoc { } } -pub(crate) fn inverted_list_schema(with_position: bool) -> SchemaRef { +pub fn inverted_list_schema(with_position: bool) -> SchemaRef { let mut fields = vec![ arrow_schema::Field::new(ROW_ID, arrow_schema::DataType::UInt64, false), arrow_schema::Field::new(FREQUENCY_COL, arrow_schema::DataType::Float32, false), diff --git a/rust/lance-index/src/scalar/inverted/tokenizer.rs b/rust/lance-index/src/scalar/inverted/tokenizer.rs index 1796091f12..440def7a5a 100644 --- a/rust/lance-index/src/scalar/inverted/tokenizer.rs +++ b/rust/lance-index/src/scalar/inverted/tokenizer.rs @@ -46,7 +46,7 @@ impl Default for TokenizerConfig { impl TokenizerConfig { pub fn new(base_tokenizer: String, language: tantivy::tokenizer::Language) -> Self { - TokenizerConfig { + Self { base_tokenizer, language, max_token_length: Some(40), From 30d7f511a8489a1baaa707060ab01e1939f65716 Mon Sep 17 00:00:00 2001 From: broccoliSpicy Date: Wed, 30 Oct 2024 19:03:17 +0000 Subject: [PATCH 02/12] add tests for miniblock bitpack --- python/play.py | 96 ------------------- .../encodings/physical/bitpack_fastlanes.rs | 58 ++++++++++- 2 files changed, 55 insertions(+), 99 deletions(-) delete mode 100644 python/play.py diff --git a/python/play.py b/python/play.py deleted file mode 100644 index 58e29be33e..0000000000 --- a/python/play.py +++ /dev/null @@ -1,96 +0,0 @@ -import pyarrow as pa -import pyarrow.parquet as pq -import datetime -import os -import numpy as np -from lance.file import LanceFileReader, LanceFileWriter - -def format_throughput(value): - return f"{value:.2f} GiB/s" - -parquet_file_path = "/home/admin/tmp/int64_column.parquet" -lance_file_path = "/home/admin/tmp/int64_column.lance" - -# Generate 1024 * 1024 * 512 random values modulo 1024 using NumPy -values = np.random.randint(0, 1024, size=1024 * 1024 * 64 + 1, dtype=np.int64) -#values = [8] * 1024 -int64_array = pa.array(values, type=pa.int64()) -table = pa.Table.from_arrays([int64_array], names=['int64_column']) - -values = np.random.randint(0, 1024, size=1024 * 1024 * 4 + 1, dtype=np.int32) -#values = [8] * 1024 -int32_array = pa.array(values, type=pa.int32()) -table = pa.Table.from_arrays([int32_array], names=['int_column']) - -start = datetime.datetime.now() -# Write the table to a Parquet file -pq.write_table(table, parquet_file_path) -end = datetime.datetime.now() -elapsed_parquet = (end - start).total_seconds() -print(f"Parquet write time: {elapsed_parquet:.2f}s") - -start = datetime.datetime.now() -# Write the table to a Lance file -with LanceFileWriter(lance_file_path, version="2.1") as writer: - writer.write_batch(table) -end = datetime.datetime.now() -elapsed_lance = (end - start).total_seconds() -print(f"Lance write time: {elapsed_lance:.2f}s") - -# Flush file from kernel cache -os.system('sync; echo 3 | sudo tee /proc/sys/vm/drop_caches') - -# Measure read time for Parquet file with batch size -batch_size = 32 * 1024 -start = datetime.datetime.now() -parquet_file = pq.ParquetFile(parquet_file_path) -batches = parquet_file.iter_batches(batch_size=batch_size) -tab_parquet = pa.Table.from_batches(batches) -end = datetime.datetime.now() -elapsed_parquet = (end - start).total_seconds() - -# Flush file from kernel cache again before reading Lance file -os.system('sync; echo 3 | sudo tee /proc/sys/vm/drop_caches') - -# Measure read time for Lance file with batch size -start = datetime.datetime.now() -tab_lance = LanceFileReader(lance_file_path).read_all(batch_size=batch_size).to_table() -end = datetime.datetime.now() -elapsed_lance = (end - start).total_seconds() - -num_rows = tab_parquet['int_column'].length() - -print(f"Number of rows: {num_rows}") -print(f"Parquet read time: {elapsed_parquet:.2f}s") -print(f"Lance read time: {elapsed_lance:.2f}s") - -# Compute total memory size -parquet_memory_size = tab_parquet.get_total_buffer_size() -lance_memory_size = tab_lance.get_total_buffer_size() - -# Convert memory size to GiB -parquet_memory_size_gib = parquet_memory_size / (1024 * 1024 * 1024) -lance_memory_size_gib = lance_memory_size / (1024 * 1024 * 1024) - -# Compute read throughput in GiB/sec -throughput_parquet_gib = parquet_memory_size_gib / elapsed_parquet -throughput_lance_gib = lance_memory_size_gib / elapsed_lance - -# Format throughput values -formatted_throughput_parquet_gib = format_throughput(throughput_parquet_gib) -formatted_throughput_lance_gib = format_throughput(throughput_lance_gib) - -print(f"Parquet read throughput: {formatted_throughput_parquet_gib}") -print(f"Lance read throughput: {formatted_throughput_lance_gib}") - -# Check file sizes -lance_file_size = os.path.getsize(lance_file_path) -lance_file_size_mib = lance_file_size // 1048576 -parquet_file_size = os.path.getsize(parquet_file_path) -parquet_file_size_mib = parquet_file_size // 1048576 - -print(f"Parquet file size: {parquet_file_size} bytes ({parquet_file_size_mib:,} MiB)") -print(f"Lance file size: {lance_file_size} bytes ({lance_file_size_mib:,} MiB)") - -# Assert that the tables are equal -assert tab_parquet == tab_lance \ No newline at end of file diff --git a/rust/lance-encoding/src/encodings/physical/bitpack_fastlanes.rs b/rust/lance-encoding/src/encodings/physical/bitpack_fastlanes.rs index ca8029c94e..7952b2d614 100644 --- a/rust/lance-encoding/src/encodings/physical/bitpack_fastlanes.rs +++ b/rust/lance-encoding/src/encodings/physical/bitpack_fastlanes.rs @@ -1762,9 +1762,61 @@ impl MiniBlockDecompressor for BitpackMiniBlockDecompressor { } } +#[cfg(test)] mod test { - #[test] - fn hello() { - println!("hello"); + use std::{collections::HashMap, sync::Arc}; + + use arrow::datatypes::{Int16Type, Int32Type, Int8Type}; + use arrow_array::{ + ArrayRef, DictionaryArray, Int16Array, Int32Array, Int64Array, Int8Array, LargeBinaryArray, + StringArray, UInt8Array, + }; + use arrow_buffer::{BooleanBuffer, NullBuffer}; + + use arrow_schema::DataType; + use lance_datagen::{array, ArrayGeneratorExt, RowCount, DEFAULT_SEED}; + use rand::SeedableRng; + + use crate::buffer::LanceBuffer; + + use super::DataBlock; + + use arrow::compute::concat; + use arrow_array::Array; + + use crate::{ + testing::{check_round_trip_encoding_of_data, TestCases}, + version::LanceFileVersion, + }; + + #[test_log::test(tokio::test)] + async fn test_miniblock_bitpack() { + let test_cases = TestCases::default().with_file_version(LanceFileVersion::V2_1); + + let mut arrays = vec![]; + arrays.push(Arc::new(Int8Array::from(vec![100; 1024])) as Arc); + arrays.push(Arc::new(Int8Array::from(vec![1; 1024])) as Arc); + arrays.push(Arc::new(Int8Array::from(vec![16; 1024])) as Arc); + arrays.push(Arc::new(Int8Array::from(vec![-1; 1024])) as Arc); + arrays.push(Arc::new(Int8Array::from(vec![5; 1])) as Arc); + check_round_trip_encoding_of_data(arrays, &test_cases, HashMap::new()).await; + + for data_type in [DataType::Int16, DataType::Int32, DataType::Int64] { + let mut int64_arrays = vec![]; + int64_arrays.push(Int64Array::from(vec![3; 1024])); + int64_arrays.push(Int64Array::from(vec![8; 1024])); + int64_arrays.push(Int64Array::from(vec![16; 1024])); + int64_arrays.push(Int64Array::from(vec![100; 1024])); + int64_arrays.push(Int64Array::from(vec![512; 1024])); + int64_arrays.push(Int64Array::from(vec![1000; 1024])); + int64_arrays.push(Int64Array::from(vec![2000; 1024])); + int64_arrays.push(Int64Array::from(vec![-1; 10])); + + let mut arrays = vec![]; + for int64_array in int64_arrays { + arrays.push(arrow_cast::cast(&int64_array, &data_type).unwrap()); + } + check_round_trip_encoding_of_data(arrays, &test_cases, HashMap::new()).await; + } } } From cc6e468f3f850438f3f9cf9ae4d4bb72707b64ee Mon Sep 17 00:00:00 2001 From: broccoliSpicy Date: Wed, 30 Oct 2024 19:19:17 +0000 Subject: [PATCH 03/12] lint --- .../encodings/physical/bitpack_fastlanes.rs | 37 +++++++------------ 1 file changed, 14 insertions(+), 23 deletions(-) diff --git a/rust/lance-encoding/src/encodings/physical/bitpack_fastlanes.rs b/rust/lance-encoding/src/encodings/physical/bitpack_fastlanes.rs index 7952b2d614..1a2233a7bb 100644 --- a/rust/lance-encoding/src/encodings/physical/bitpack_fastlanes.rs +++ b/rust/lance-encoding/src/encodings/physical/bitpack_fastlanes.rs @@ -1565,9 +1565,11 @@ mod tests { // } } -// This macro chunks the FixedWidth DataBlock, bitpacks them, -// put the bit-width parameter in front of each chunk, -// the bit-width parameter has the same bit-width as the uncompressed DataBlock +// This macro chunks the FixedWidth DataBlock, bitpacks them with 1024 values per chunk, +// it puts the bit-width parameter in front of each chunk, +// and the bit-width parameter has the same bit-width as the uncompressed DataBlock +// for example, if the input DataBlock has `bits_per_value` of `16`, there will be 2 bytes(16 bits) +// in front of each chunk storing the `bit-width` parameter. macro_rules! chunk_data_impl { ($data:expr, $data_type:ty) => {{ let data_buffer = $data.data.borrow_to_typed_slice::<$data_type>(); @@ -1719,9 +1721,10 @@ impl MiniBlockDecompressor for BitpackMiniBlockDecompressor { assert!(data.len() >= 8); assert!(num_values <= ELEMS_PER_CHUNK); - macro_rules! decompress_case { + // This macro decompresses a chunk(1024 values) of bitpacked values. + macro_rules! decompress_impl { ($type:ty) => {{ - let bit_width = std::mem::size_of::<$type>() * 8; + let uncompressed_bit_width = std::mem::size_of::<$type>() * 8; let mut decompressed = vec![0 as $type; ELEMS_PER_CHUNK as usize]; // Copy for memory alignment @@ -1744,7 +1747,7 @@ impl MiniBlockDecompressor for BitpackMiniBlockDecompressor { decompressed.shrink_to(num_values as usize); Ok(DataBlock::FixedWidth(FixedWidthDataBlock { data: LanceBuffer::reinterpret_vec(decompressed), - bits_per_value: bit_width as u64, + bits_per_value: uncompressed_bit_width as u64, num_values, block_info: BlockInfo::new(), used_encoding: UsedEncoding::new(), @@ -1753,10 +1756,10 @@ impl MiniBlockDecompressor for BitpackMiniBlockDecompressor { } match self.uncompressed_bit_width { - 8 => decompress_case!(u8), - 16 => decompress_case!(u16), - 32 => decompress_case!(u32), - 64 => decompress_case!(u64), + 8 => decompress_impl!(u8), + 16 => decompress_impl!(u16), + 32 => decompress_impl!(u32), + 64 => decompress_impl!(u64), _ => todo!(), } } @@ -1766,22 +1769,10 @@ impl MiniBlockDecompressor for BitpackMiniBlockDecompressor { mod test { use std::{collections::HashMap, sync::Arc}; - use arrow::datatypes::{Int16Type, Int32Type, Int8Type}; - use arrow_array::{ - ArrayRef, DictionaryArray, Int16Array, Int32Array, Int64Array, Int8Array, LargeBinaryArray, - StringArray, UInt8Array, - }; - use arrow_buffer::{BooleanBuffer, NullBuffer}; + use arrow_array::{Int64Array, Int8Array}; use arrow_schema::DataType; - use lance_datagen::{array, ArrayGeneratorExt, RowCount, DEFAULT_SEED}; - use rand::SeedableRng; - - use crate::buffer::LanceBuffer; - - use super::DataBlock; - use arrow::compute::concat; use arrow_array::Array; use crate::{ From 1f15274054bd26be9dc113f7b76fc6fa91f629b2 Mon Sep 17 00:00:00 2001 From: broccoliSpicy Date: Wed, 30 Oct 2024 19:44:23 +0000 Subject: [PATCH 04/12] lint 2 --- .../src/encodings/physical/bitpack_fastlanes.rs | 13 +++++++------ 1 file changed, 7 insertions(+), 6 deletions(-) diff --git a/rust/lance-encoding/src/encodings/physical/bitpack_fastlanes.rs b/rust/lance-encoding/src/encodings/physical/bitpack_fastlanes.rs index 1a2233a7bb..7d39a38976 100644 --- a/rust/lance-encoding/src/encodings/physical/bitpack_fastlanes.rs +++ b/rust/lance-encoding/src/encodings/physical/bitpack_fastlanes.rs @@ -1784,12 +1784,13 @@ mod test { async fn test_miniblock_bitpack() { let test_cases = TestCases::default().with_file_version(LanceFileVersion::V2_1); - let mut arrays = vec![]; - arrays.push(Arc::new(Int8Array::from(vec![100; 1024])) as Arc); - arrays.push(Arc::new(Int8Array::from(vec![1; 1024])) as Arc); - arrays.push(Arc::new(Int8Array::from(vec![16; 1024])) as Arc); - arrays.push(Arc::new(Int8Array::from(vec![-1; 1024])) as Arc); - arrays.push(Arc::new(Int8Array::from(vec![5; 1])) as Arc); + let arrays = vec![ + Arc::new(Int8Array::from(vec![100; 1024])) as Arc, + Arc::new(Int8Array::from(vec![1; 1024])) as Arc, + Arc::new(Int8Array::from(vec![16; 1024])) as Arc, + Arc::new(Int8Array::from(vec![-1; 1024])) as Arc, + Arc::new(Int8Array::from(vec![5; 1])) as Arc, + ]; check_round_trip_encoding_of_data(arrays, &test_cases, HashMap::new()).await; for data_type in [DataType::Int16, DataType::Int32, DataType::Int64] { From 22159e9799377824d7cc7b3e30f3c762454cb1ef Mon Sep 17 00:00:00 2001 From: broccoliSpicy Date: Wed, 30 Oct 2024 20:29:23 +0000 Subject: [PATCH 05/12] lint 3 --- python/play.py | 96 +++++++++++++++++++ .../encodings/physical/bitpack_fastlanes.rs | 19 ++-- 2 files changed, 106 insertions(+), 9 deletions(-) create mode 100644 python/play.py diff --git a/python/play.py b/python/play.py new file mode 100644 index 0000000000..58e29be33e --- /dev/null +++ b/python/play.py @@ -0,0 +1,96 @@ +import pyarrow as pa +import pyarrow.parquet as pq +import datetime +import os +import numpy as np +from lance.file import LanceFileReader, LanceFileWriter + +def format_throughput(value): + return f"{value:.2f} GiB/s" + +parquet_file_path = "/home/admin/tmp/int64_column.parquet" +lance_file_path = "/home/admin/tmp/int64_column.lance" + +# Generate 1024 * 1024 * 512 random values modulo 1024 using NumPy +values = np.random.randint(0, 1024, size=1024 * 1024 * 64 + 1, dtype=np.int64) +#values = [8] * 1024 +int64_array = pa.array(values, type=pa.int64()) +table = pa.Table.from_arrays([int64_array], names=['int64_column']) + +values = np.random.randint(0, 1024, size=1024 * 1024 * 4 + 1, dtype=np.int32) +#values = [8] * 1024 +int32_array = pa.array(values, type=pa.int32()) +table = pa.Table.from_arrays([int32_array], names=['int_column']) + +start = datetime.datetime.now() +# Write the table to a Parquet file +pq.write_table(table, parquet_file_path) +end = datetime.datetime.now() +elapsed_parquet = (end - start).total_seconds() +print(f"Parquet write time: {elapsed_parquet:.2f}s") + +start = datetime.datetime.now() +# Write the table to a Lance file +with LanceFileWriter(lance_file_path, version="2.1") as writer: + writer.write_batch(table) +end = datetime.datetime.now() +elapsed_lance = (end - start).total_seconds() +print(f"Lance write time: {elapsed_lance:.2f}s") + +# Flush file from kernel cache +os.system('sync; echo 3 | sudo tee /proc/sys/vm/drop_caches') + +# Measure read time for Parquet file with batch size +batch_size = 32 * 1024 +start = datetime.datetime.now() +parquet_file = pq.ParquetFile(parquet_file_path) +batches = parquet_file.iter_batches(batch_size=batch_size) +tab_parquet = pa.Table.from_batches(batches) +end = datetime.datetime.now() +elapsed_parquet = (end - start).total_seconds() + +# Flush file from kernel cache again before reading Lance file +os.system('sync; echo 3 | sudo tee /proc/sys/vm/drop_caches') + +# Measure read time for Lance file with batch size +start = datetime.datetime.now() +tab_lance = LanceFileReader(lance_file_path).read_all(batch_size=batch_size).to_table() +end = datetime.datetime.now() +elapsed_lance = (end - start).total_seconds() + +num_rows = tab_parquet['int_column'].length() + +print(f"Number of rows: {num_rows}") +print(f"Parquet read time: {elapsed_parquet:.2f}s") +print(f"Lance read time: {elapsed_lance:.2f}s") + +# Compute total memory size +parquet_memory_size = tab_parquet.get_total_buffer_size() +lance_memory_size = tab_lance.get_total_buffer_size() + +# Convert memory size to GiB +parquet_memory_size_gib = parquet_memory_size / (1024 * 1024 * 1024) +lance_memory_size_gib = lance_memory_size / (1024 * 1024 * 1024) + +# Compute read throughput in GiB/sec +throughput_parquet_gib = parquet_memory_size_gib / elapsed_parquet +throughput_lance_gib = lance_memory_size_gib / elapsed_lance + +# Format throughput values +formatted_throughput_parquet_gib = format_throughput(throughput_parquet_gib) +formatted_throughput_lance_gib = format_throughput(throughput_lance_gib) + +print(f"Parquet read throughput: {formatted_throughput_parquet_gib}") +print(f"Lance read throughput: {formatted_throughput_lance_gib}") + +# Check file sizes +lance_file_size = os.path.getsize(lance_file_path) +lance_file_size_mib = lance_file_size // 1048576 +parquet_file_size = os.path.getsize(parquet_file_path) +parquet_file_size_mib = parquet_file_size // 1048576 + +print(f"Parquet file size: {parquet_file_size} bytes ({parquet_file_size_mib:,} MiB)") +print(f"Lance file size: {lance_file_size} bytes ({lance_file_size_mib:,} MiB)") + +# Assert that the tables are equal +assert tab_parquet == tab_lance \ No newline at end of file diff --git a/rust/lance-encoding/src/encodings/physical/bitpack_fastlanes.rs b/rust/lance-encoding/src/encodings/physical/bitpack_fastlanes.rs index 7d39a38976..78f19a0e73 100644 --- a/rust/lance-encoding/src/encodings/physical/bitpack_fastlanes.rs +++ b/rust/lance-encoding/src/encodings/physical/bitpack_fastlanes.rs @@ -1794,15 +1794,16 @@ mod test { check_round_trip_encoding_of_data(arrays, &test_cases, HashMap::new()).await; for data_type in [DataType::Int16, DataType::Int32, DataType::Int64] { - let mut int64_arrays = vec![]; - int64_arrays.push(Int64Array::from(vec![3; 1024])); - int64_arrays.push(Int64Array::from(vec![8; 1024])); - int64_arrays.push(Int64Array::from(vec![16; 1024])); - int64_arrays.push(Int64Array::from(vec![100; 1024])); - int64_arrays.push(Int64Array::from(vec![512; 1024])); - int64_arrays.push(Int64Array::from(vec![1000; 1024])); - int64_arrays.push(Int64Array::from(vec![2000; 1024])); - int64_arrays.push(Int64Array::from(vec![-1; 10])); + let int64_arrays = vec![ + Int64Array::from(vec![3; 1024]), + Int64Array::from(vec![8; 1024]), + Int64Array::from(vec![16; 1024]), + Int64Array::from(vec![100; 1024]), + Int64Array::from(vec![512; 1024]), + Int64Array::from(vec![1000; 1024]), + Int64Array::from(vec![2000; 1024]), + Int64Array::from(vec![-1; 10]), + ]; let mut arrays = vec![]; for int64_array in int64_arrays { From 9f5dc6dc07220ecb3e4f34b4fa2832ad7968c4b2 Mon Sep 17 00:00:00 2001 From: broccoliSpicy Date: Wed, 30 Oct 2024 20:31:50 +0000 Subject: [PATCH 06/12] remove a test script --- python/play.py | 96 -------------------------------------------------- 1 file changed, 96 deletions(-) delete mode 100644 python/play.py diff --git a/python/play.py b/python/play.py deleted file mode 100644 index 58e29be33e..0000000000 --- a/python/play.py +++ /dev/null @@ -1,96 +0,0 @@ -import pyarrow as pa -import pyarrow.parquet as pq -import datetime -import os -import numpy as np -from lance.file import LanceFileReader, LanceFileWriter - -def format_throughput(value): - return f"{value:.2f} GiB/s" - -parquet_file_path = "/home/admin/tmp/int64_column.parquet" -lance_file_path = "/home/admin/tmp/int64_column.lance" - -# Generate 1024 * 1024 * 512 random values modulo 1024 using NumPy -values = np.random.randint(0, 1024, size=1024 * 1024 * 64 + 1, dtype=np.int64) -#values = [8] * 1024 -int64_array = pa.array(values, type=pa.int64()) -table = pa.Table.from_arrays([int64_array], names=['int64_column']) - -values = np.random.randint(0, 1024, size=1024 * 1024 * 4 + 1, dtype=np.int32) -#values = [8] * 1024 -int32_array = pa.array(values, type=pa.int32()) -table = pa.Table.from_arrays([int32_array], names=['int_column']) - -start = datetime.datetime.now() -# Write the table to a Parquet file -pq.write_table(table, parquet_file_path) -end = datetime.datetime.now() -elapsed_parquet = (end - start).total_seconds() -print(f"Parquet write time: {elapsed_parquet:.2f}s") - -start = datetime.datetime.now() -# Write the table to a Lance file -with LanceFileWriter(lance_file_path, version="2.1") as writer: - writer.write_batch(table) -end = datetime.datetime.now() -elapsed_lance = (end - start).total_seconds() -print(f"Lance write time: {elapsed_lance:.2f}s") - -# Flush file from kernel cache -os.system('sync; echo 3 | sudo tee /proc/sys/vm/drop_caches') - -# Measure read time for Parquet file with batch size -batch_size = 32 * 1024 -start = datetime.datetime.now() -parquet_file = pq.ParquetFile(parquet_file_path) -batches = parquet_file.iter_batches(batch_size=batch_size) -tab_parquet = pa.Table.from_batches(batches) -end = datetime.datetime.now() -elapsed_parquet = (end - start).total_seconds() - -# Flush file from kernel cache again before reading Lance file -os.system('sync; echo 3 | sudo tee /proc/sys/vm/drop_caches') - -# Measure read time for Lance file with batch size -start = datetime.datetime.now() -tab_lance = LanceFileReader(lance_file_path).read_all(batch_size=batch_size).to_table() -end = datetime.datetime.now() -elapsed_lance = (end - start).total_seconds() - -num_rows = tab_parquet['int_column'].length() - -print(f"Number of rows: {num_rows}") -print(f"Parquet read time: {elapsed_parquet:.2f}s") -print(f"Lance read time: {elapsed_lance:.2f}s") - -# Compute total memory size -parquet_memory_size = tab_parquet.get_total_buffer_size() -lance_memory_size = tab_lance.get_total_buffer_size() - -# Convert memory size to GiB -parquet_memory_size_gib = parquet_memory_size / (1024 * 1024 * 1024) -lance_memory_size_gib = lance_memory_size / (1024 * 1024 * 1024) - -# Compute read throughput in GiB/sec -throughput_parquet_gib = parquet_memory_size_gib / elapsed_parquet -throughput_lance_gib = lance_memory_size_gib / elapsed_lance - -# Format throughput values -formatted_throughput_parquet_gib = format_throughput(throughput_parquet_gib) -formatted_throughput_lance_gib = format_throughput(throughput_lance_gib) - -print(f"Parquet read throughput: {formatted_throughput_parquet_gib}") -print(f"Lance read throughput: {formatted_throughput_lance_gib}") - -# Check file sizes -lance_file_size = os.path.getsize(lance_file_path) -lance_file_size_mib = lance_file_size // 1048576 -parquet_file_size = os.path.getsize(parquet_file_path) -parquet_file_size_mib = parquet_file_size // 1048576 - -print(f"Parquet file size: {parquet_file_size} bytes ({parquet_file_size_mib:,} MiB)") -print(f"Lance file size: {lance_file_size} bytes ({lance_file_size_mib:,} MiB)") - -# Assert that the tables are equal -assert tab_parquet == tab_lance \ No newline at end of file From 1ad38c17e1ac7379a81bb3d8ba540f5ac50ed010 Mon Sep 17 00:00:00 2001 From: broccoliSpicy Date: Wed, 30 Oct 2024 20:45:28 +0000 Subject: [PATCH 07/12] delete a irrelevant comment --- python/play.py | 96 +++++++++++++++++++ .../encodings/physical/bitpack_fastlanes.rs | 3 +- 2 files changed, 97 insertions(+), 2 deletions(-) create mode 100644 python/play.py diff --git a/python/play.py b/python/play.py new file mode 100644 index 0000000000..58e29be33e --- /dev/null +++ b/python/play.py @@ -0,0 +1,96 @@ +import pyarrow as pa +import pyarrow.parquet as pq +import datetime +import os +import numpy as np +from lance.file import LanceFileReader, LanceFileWriter + +def format_throughput(value): + return f"{value:.2f} GiB/s" + +parquet_file_path = "/home/admin/tmp/int64_column.parquet" +lance_file_path = "/home/admin/tmp/int64_column.lance" + +# Generate 1024 * 1024 * 512 random values modulo 1024 using NumPy +values = np.random.randint(0, 1024, size=1024 * 1024 * 64 + 1, dtype=np.int64) +#values = [8] * 1024 +int64_array = pa.array(values, type=pa.int64()) +table = pa.Table.from_arrays([int64_array], names=['int64_column']) + +values = np.random.randint(0, 1024, size=1024 * 1024 * 4 + 1, dtype=np.int32) +#values = [8] * 1024 +int32_array = pa.array(values, type=pa.int32()) +table = pa.Table.from_arrays([int32_array], names=['int_column']) + +start = datetime.datetime.now() +# Write the table to a Parquet file +pq.write_table(table, parquet_file_path) +end = datetime.datetime.now() +elapsed_parquet = (end - start).total_seconds() +print(f"Parquet write time: {elapsed_parquet:.2f}s") + +start = datetime.datetime.now() +# Write the table to a Lance file +with LanceFileWriter(lance_file_path, version="2.1") as writer: + writer.write_batch(table) +end = datetime.datetime.now() +elapsed_lance = (end - start).total_seconds() +print(f"Lance write time: {elapsed_lance:.2f}s") + +# Flush file from kernel cache +os.system('sync; echo 3 | sudo tee /proc/sys/vm/drop_caches') + +# Measure read time for Parquet file with batch size +batch_size = 32 * 1024 +start = datetime.datetime.now() +parquet_file = pq.ParquetFile(parquet_file_path) +batches = parquet_file.iter_batches(batch_size=batch_size) +tab_parquet = pa.Table.from_batches(batches) +end = datetime.datetime.now() +elapsed_parquet = (end - start).total_seconds() + +# Flush file from kernel cache again before reading Lance file +os.system('sync; echo 3 | sudo tee /proc/sys/vm/drop_caches') + +# Measure read time for Lance file with batch size +start = datetime.datetime.now() +tab_lance = LanceFileReader(lance_file_path).read_all(batch_size=batch_size).to_table() +end = datetime.datetime.now() +elapsed_lance = (end - start).total_seconds() + +num_rows = tab_parquet['int_column'].length() + +print(f"Number of rows: {num_rows}") +print(f"Parquet read time: {elapsed_parquet:.2f}s") +print(f"Lance read time: {elapsed_lance:.2f}s") + +# Compute total memory size +parquet_memory_size = tab_parquet.get_total_buffer_size() +lance_memory_size = tab_lance.get_total_buffer_size() + +# Convert memory size to GiB +parquet_memory_size_gib = parquet_memory_size / (1024 * 1024 * 1024) +lance_memory_size_gib = lance_memory_size / (1024 * 1024 * 1024) + +# Compute read throughput in GiB/sec +throughput_parquet_gib = parquet_memory_size_gib / elapsed_parquet +throughput_lance_gib = lance_memory_size_gib / elapsed_lance + +# Format throughput values +formatted_throughput_parquet_gib = format_throughput(throughput_parquet_gib) +formatted_throughput_lance_gib = format_throughput(throughput_lance_gib) + +print(f"Parquet read throughput: {formatted_throughput_parquet_gib}") +print(f"Lance read throughput: {formatted_throughput_lance_gib}") + +# Check file sizes +lance_file_size = os.path.getsize(lance_file_path) +lance_file_size_mib = lance_file_size // 1048576 +parquet_file_size = os.path.getsize(parquet_file_path) +parquet_file_size_mib = parquet_file_size // 1048576 + +print(f"Parquet file size: {parquet_file_size} bytes ({parquet_file_size_mib:,} MiB)") +print(f"Lance file size: {lance_file_size} bytes ({lance_file_size_mib:,} MiB)") + +# Assert that the tables are equal +assert tab_parquet == tab_lance \ No newline at end of file diff --git a/rust/lance-encoding/src/encodings/physical/bitpack_fastlanes.rs b/rust/lance-encoding/src/encodings/physical/bitpack_fastlanes.rs index 78f19a0e73..5effea796c 100644 --- a/rust/lance-encoding/src/encodings/physical/bitpack_fastlanes.rs +++ b/rust/lance-encoding/src/encodings/physical/bitpack_fastlanes.rs @@ -1659,7 +1659,6 @@ macro_rules! chunk_data_impl { }}; } -/// A compression strategy that writes fixed-width data as-is (no compression) #[derive(Debug, Default)] pub struct BitpackMiniBlockEncoder {} @@ -1684,7 +1683,7 @@ impl MiniBlockCompressor for BitpackMiniBlockEncoder { &self, chunk: DataBlock, ) -> Result<( - crate::encoder::MiniBlockCompressed, + MiniBlockCompressed, crate::format::pb::ArrayEncoding, )> { match chunk { From edae11a854526fee40bbd68d030dc901833a0195 Mon Sep 17 00:00:00 2001 From: broccoliSpicy Date: Wed, 30 Oct 2024 21:06:27 +0000 Subject: [PATCH 08/12] another lint --- .../src/encodings/physical/bitpack_fastlanes.rs | 5 +---- 1 file changed, 1 insertion(+), 4 deletions(-) diff --git a/rust/lance-encoding/src/encodings/physical/bitpack_fastlanes.rs b/rust/lance-encoding/src/encodings/physical/bitpack_fastlanes.rs index 5effea796c..53d0d03e99 100644 --- a/rust/lance-encoding/src/encodings/physical/bitpack_fastlanes.rs +++ b/rust/lance-encoding/src/encodings/physical/bitpack_fastlanes.rs @@ -1682,10 +1682,7 @@ impl MiniBlockCompressor for BitpackMiniBlockEncoder { fn compress( &self, chunk: DataBlock, - ) -> Result<( - MiniBlockCompressed, - crate::format::pb::ArrayEncoding, - )> { + ) -> Result<(MiniBlockCompressed, crate::format::pb::ArrayEncoding)> { match chunk { DataBlock::FixedWidth(fixed_width) => Ok(self.chunk_data(fixed_width)), _ => Err(Error::InvalidInput { From 0f688fd38235674f21fc8a35506a38ba9d77b2e1 Mon Sep 17 00:00:00 2001 From: broccoliSpicy Date: Thu, 31 Oct 2024 14:39:24 +0000 Subject: [PATCH 09/12] address PR comments --- rust/lance-index/src/scalar/inverted/builder.rs | 4 ++-- rust/lance-index/src/scalar/inverted/tokenizer.rs | 2 +- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/rust/lance-index/src/scalar/inverted/builder.rs b/rust/lance-index/src/scalar/inverted/builder.rs index aee7fdde95..4a73cc4dda 100644 --- a/rust/lance-index/src/scalar/inverted/builder.rs +++ b/rust/lance-index/src/scalar/inverted/builder.rs @@ -561,7 +561,7 @@ impl IndexWorker { } } -pub struct PostingReader { +pub(crate) struct PostingReader { tmpdir: Option, existing_tokens: HashMap, inverted_list_reader: Option>, @@ -705,7 +705,7 @@ impl Ord for OrderedDoc { } } -pub fn inverted_list_schema(with_position: bool) -> SchemaRef { +pub(crate) fn inverted_list_schema(with_position: bool) -> SchemaRef { let mut fields = vec![ arrow_schema::Field::new(ROW_ID, arrow_schema::DataType::UInt64, false), arrow_schema::Field::new(FREQUENCY_COL, arrow_schema::DataType::Float32, false), diff --git a/rust/lance-index/src/scalar/inverted/tokenizer.rs b/rust/lance-index/src/scalar/inverted/tokenizer.rs index 440def7a5a..1796091f12 100644 --- a/rust/lance-index/src/scalar/inverted/tokenizer.rs +++ b/rust/lance-index/src/scalar/inverted/tokenizer.rs @@ -46,7 +46,7 @@ impl Default for TokenizerConfig { impl TokenizerConfig { pub fn new(base_tokenizer: String, language: tantivy::tokenizer::Language) -> Self { - Self { + TokenizerConfig { base_tokenizer, language, max_token_length: Some(40), From ca96e2185dd87555016ab6d952d43d61e95c4bd4 Mon Sep 17 00:00:00 2001 From: broccoliSpicy Date: Thu, 31 Oct 2024 14:47:57 +0000 Subject: [PATCH 10/12] address PR comments 2 --- protos/encodings.proto | 3 --- rust/lance-encoding/src/encoder.rs | 2 +- .../src/encodings/physical/bitpack_fastlanes.rs | 8 ++++---- rust/lance-encoding/src/format.rs | 6 +----- 4 files changed, 6 insertions(+), 13 deletions(-) diff --git a/protos/encodings.proto b/protos/encodings.proto index 66adb647d9..ac66d13dfa 100644 --- a/protos/encodings.proto +++ b/protos/encodings.proto @@ -214,9 +214,6 @@ message BitpackedForNonNeg { message Bitpack2 { // the number of bits of the uncompressed value. e.g. for a u32, this will be 32 uint64 uncompressed_bits_per_value = 2; - - // The items in the list - Buffer buffer = 3; } // An array encoding for shredded structs that will never be null diff --git a/rust/lance-encoding/src/encoder.rs b/rust/lance-encoding/src/encoder.rs index 96aa0c724b..eac25c8feb 100644 --- a/rust/lance-encoding/src/encoder.rs +++ b/rust/lance-encoding/src/encoder.rs @@ -779,7 +779,7 @@ impl CompressionStrategy for CoreArrayEncodingStrategy { ) -> Result> { assert!(field.data_type().byte_width() > 0); if let DataBlock::FixedWidth(ref fixed_width_data) = data { - if fixed_width_data.bits_per_value <= 64 { + if fixed_width_data.bits_per_value == 8 || fixed_width_data.bits_per_value == 16 || fixed_width_data.bits_per_value == 32 || fixed_width_data.bits_per_value == 64 { return Ok(Box::new(BitpackMiniBlockEncoder::default())); } } diff --git a/rust/lance-encoding/src/encodings/physical/bitpack_fastlanes.rs b/rust/lance-encoding/src/encodings/physical/bitpack_fastlanes.rs index 53d0d03e99..17d75fc4a9 100644 --- a/rust/lance-encoding/src/encodings/physical/bitpack_fastlanes.rs +++ b/rust/lance-encoding/src/encodings/physical/bitpack_fastlanes.rs @@ -30,6 +30,7 @@ use crate::statistics::{GetStat, Stat}; use arrow::array::ArrayRef; use bytemuck::cast_slice; const ELEMS_PER_CHUNK: u64 = 1024; +const LOG_ELEMS_PER_CHUNK: u8 = 10; // Compute the compressed_bit_width for a given array of integers // todo: compute all statistics before encoding @@ -1578,7 +1579,6 @@ macro_rules! chunk_data_impl { let bit_widths = $data .get_stat(Stat::BitWidth) .expect("FixedWidthDataBlock should have valid bit width statistics"); - println!("bit_widths statistics got"); let bit_widths_array = bit_widths .as_any() .downcast_ref::>() @@ -1617,7 +1617,7 @@ macro_rules! chunk_data_impl { } chunks.push(MiniBlockChunk { num_bytes: ((1 + packed_chunk_sizes[i]) * std::mem::size_of::<$data_type>()) as u16, - log_num_values: 10, + log_num_values: LOG_ELEMS_PER_CHUNK, }); } @@ -1654,7 +1654,7 @@ macro_rules! chunk_data_impl { chunks, num_values: $data.num_values, }, - ProtobufUtils::bitpack2($data.bits_per_value, 0), + ProtobufUtils::bitpack2($data.bits_per_value), ) }}; } @@ -1687,7 +1687,7 @@ impl MiniBlockCompressor for BitpackMiniBlockEncoder { DataBlock::FixedWidth(fixed_width) => Ok(self.chunk_data(fixed_width)), _ => Err(Error::InvalidInput { source: format!( - "Cannot compress a data block of type {} with ValueEncoder", + "Cannot compress a data block of type {} with BitpackMiniBlockEncoder", chunk.name() ) .into(), diff --git a/rust/lance-encoding/src/format.rs b/rust/lance-encoding/src/format.rs index 8938636589..ba66b15dc8 100644 --- a/rust/lance-encoding/src/format.rs +++ b/rust/lance-encoding/src/format.rs @@ -125,13 +125,9 @@ impl ProtobufUtils { })), } } - pub fn bitpack2(uncompressed_bits_per_value: u64, buffer_index: u32) -> ArrayEncoding { + pub fn bitpack2(uncompressed_bits_per_value: u64) -> ArrayEncoding { ArrayEncoding { array_encoding: Some(ArrayEncodingEnum::Bitpack2(Bitpack2 { - buffer: Some(pb::Buffer { - buffer_index, - buffer_type: BufferType::Page as i32, - }), uncompressed_bits_per_value, })), } From 553e124e4dd488b9c212338010864aa893da5417 Mon Sep 17 00:00:00 2001 From: broccoliSpicy Date: Thu, 31 Oct 2024 14:49:14 +0000 Subject: [PATCH 11/12] remove a test script --- python/play.py | 96 -------------------------------------------------- 1 file changed, 96 deletions(-) delete mode 100644 python/play.py diff --git a/python/play.py b/python/play.py deleted file mode 100644 index 58e29be33e..0000000000 --- a/python/play.py +++ /dev/null @@ -1,96 +0,0 @@ -import pyarrow as pa -import pyarrow.parquet as pq -import datetime -import os -import numpy as np -from lance.file import LanceFileReader, LanceFileWriter - -def format_throughput(value): - return f"{value:.2f} GiB/s" - -parquet_file_path = "/home/admin/tmp/int64_column.parquet" -lance_file_path = "/home/admin/tmp/int64_column.lance" - -# Generate 1024 * 1024 * 512 random values modulo 1024 using NumPy -values = np.random.randint(0, 1024, size=1024 * 1024 * 64 + 1, dtype=np.int64) -#values = [8] * 1024 -int64_array = pa.array(values, type=pa.int64()) -table = pa.Table.from_arrays([int64_array], names=['int64_column']) - -values = np.random.randint(0, 1024, size=1024 * 1024 * 4 + 1, dtype=np.int32) -#values = [8] * 1024 -int32_array = pa.array(values, type=pa.int32()) -table = pa.Table.from_arrays([int32_array], names=['int_column']) - -start = datetime.datetime.now() -# Write the table to a Parquet file -pq.write_table(table, parquet_file_path) -end = datetime.datetime.now() -elapsed_parquet = (end - start).total_seconds() -print(f"Parquet write time: {elapsed_parquet:.2f}s") - -start = datetime.datetime.now() -# Write the table to a Lance file -with LanceFileWriter(lance_file_path, version="2.1") as writer: - writer.write_batch(table) -end = datetime.datetime.now() -elapsed_lance = (end - start).total_seconds() -print(f"Lance write time: {elapsed_lance:.2f}s") - -# Flush file from kernel cache -os.system('sync; echo 3 | sudo tee /proc/sys/vm/drop_caches') - -# Measure read time for Parquet file with batch size -batch_size = 32 * 1024 -start = datetime.datetime.now() -parquet_file = pq.ParquetFile(parquet_file_path) -batches = parquet_file.iter_batches(batch_size=batch_size) -tab_parquet = pa.Table.from_batches(batches) -end = datetime.datetime.now() -elapsed_parquet = (end - start).total_seconds() - -# Flush file from kernel cache again before reading Lance file -os.system('sync; echo 3 | sudo tee /proc/sys/vm/drop_caches') - -# Measure read time for Lance file with batch size -start = datetime.datetime.now() -tab_lance = LanceFileReader(lance_file_path).read_all(batch_size=batch_size).to_table() -end = datetime.datetime.now() -elapsed_lance = (end - start).total_seconds() - -num_rows = tab_parquet['int_column'].length() - -print(f"Number of rows: {num_rows}") -print(f"Parquet read time: {elapsed_parquet:.2f}s") -print(f"Lance read time: {elapsed_lance:.2f}s") - -# Compute total memory size -parquet_memory_size = tab_parquet.get_total_buffer_size() -lance_memory_size = tab_lance.get_total_buffer_size() - -# Convert memory size to GiB -parquet_memory_size_gib = parquet_memory_size / (1024 * 1024 * 1024) -lance_memory_size_gib = lance_memory_size / (1024 * 1024 * 1024) - -# Compute read throughput in GiB/sec -throughput_parquet_gib = parquet_memory_size_gib / elapsed_parquet -throughput_lance_gib = lance_memory_size_gib / elapsed_lance - -# Format throughput values -formatted_throughput_parquet_gib = format_throughput(throughput_parquet_gib) -formatted_throughput_lance_gib = format_throughput(throughput_lance_gib) - -print(f"Parquet read throughput: {formatted_throughput_parquet_gib}") -print(f"Lance read throughput: {formatted_throughput_lance_gib}") - -# Check file sizes -lance_file_size = os.path.getsize(lance_file_path) -lance_file_size_mib = lance_file_size // 1048576 -parquet_file_size = os.path.getsize(parquet_file_path) -parquet_file_size_mib = parquet_file_size // 1048576 - -print(f"Parquet file size: {parquet_file_size} bytes ({parquet_file_size_mib:,} MiB)") -print(f"Lance file size: {lance_file_size} bytes ({lance_file_size_mib:,} MiB)") - -# Assert that the tables are equal -assert tab_parquet == tab_lance \ No newline at end of file From dcabd0f58f86d311bd312b3c773068b58325941d Mon Sep 17 00:00:00 2001 From: broccoliSpicy Date: Thu, 31 Oct 2024 16:13:51 +0000 Subject: [PATCH 12/12] lint --- rust/lance-encoding/src/encoder.rs | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/rust/lance-encoding/src/encoder.rs b/rust/lance-encoding/src/encoder.rs index eac25c8feb..3d3e267f2a 100644 --- a/rust/lance-encoding/src/encoder.rs +++ b/rust/lance-encoding/src/encoder.rs @@ -779,7 +779,11 @@ impl CompressionStrategy for CoreArrayEncodingStrategy { ) -> Result> { assert!(field.data_type().byte_width() > 0); if let DataBlock::FixedWidth(ref fixed_width_data) = data { - if fixed_width_data.bits_per_value == 8 || fixed_width_data.bits_per_value == 16 || fixed_width_data.bits_per_value == 32 || fixed_width_data.bits_per_value == 64 { + if fixed_width_data.bits_per_value == 8 + || fixed_width_data.bits_per_value == 16 + || fixed_width_data.bits_per_value == 32 + || fixed_width_data.bits_per_value == 64 + { return Ok(Box::new(BitpackMiniBlockEncoder::default())); } }