From 8fe7147f84425a3356fec77d0957151dc3762eea Mon Sep 17 00:00:00 2001 From: Weston Pace Date: Fri, 3 Jan 2025 08:17:08 -0800 Subject: [PATCH] feat: cache miniblock metadata (#3323) The miniblock chunk data was always intended to be cached. This is essential for good random access performance. This PR adds that caching. --- rust/lance-core/src/cache.rs | 19 +- rust/lance-core/src/datatypes.rs | 3 + rust/lance-core/src/utils/path.rs | 2 +- rust/lance-datagen/src/generator.rs | 39 ++++- rust/lance-encoding/src/data.rs | 2 +- .../src/encodings/logical/primitive.rs | 165 ++++++++++++++---- 6 files changed, 194 insertions(+), 36 deletions(-) diff --git a/rust/lance-core/src/cache.rs b/rust/lance-core/src/cache.rs index 3cc8800f56..8479044fe6 100644 --- a/rust/lance-core/src/cache.rs +++ b/rust/lance-core/src/cache.rs @@ -6,7 +6,6 @@ use std::any::{Any, TypeId}; use std::sync::Arc; -use deepsize::{Context, DeepSizeOf}; use futures::Future; use moka::sync::Cache; use object_store::path::Path; @@ -14,6 +13,8 @@ use object_store::path::Path; use crate::utils::path::LancePathExt; use crate::Result; +pub use deepsize::{Context, DeepSizeOf}; + type ArcAny = Arc; #[derive(Clone)] @@ -121,6 +122,12 @@ impl FileMetadataCache { } } + /// Fetch an item from the cache, using a str as the key + pub fn get_by_str(&self, path: &str) -> Option> { + self.get(&Path::parse(path).unwrap()) + } + + /// Fetch an item from the cache pub fn get(&self, path: &Path) -> Option> { let cache = self.cache.as_ref()?; let temp: Path; @@ -135,6 +142,7 @@ impl FileMetadataCache { .map(|metadata| metadata.record.clone().downcast::().unwrap()) } + /// Insert an item into the cache pub fn insert(&self, path: Path, metadata: Arc) { let Some(cache) = self.cache.as_ref() else { return; @@ -147,6 +155,15 @@ impl FileMetadataCache { cache.insert((path, TypeId::of::()), SizedRecord::new(metadata)); } + /// Insert an item into the cache, using a str as the key + pub fn insert_by_str( + &self, + key: &str, + metadata: Arc, + ) { + self.insert(Path::parse(key).unwrap(), metadata); + } + /// Get an item /// /// If it exists in the cache return that diff --git a/rust/lance-core/src/datatypes.rs b/rust/lance-core/src/datatypes.rs index 0214bb17d1..2f3fed4972 100644 --- a/rust/lance-core/src/datatypes.rs +++ b/rust/lance-core/src/datatypes.rs @@ -29,6 +29,9 @@ pub const COMPRESSION_LEVEL_META_KEY: &str = "lance-encoding:compression-level"; pub const BLOB_META_KEY: &str = "lance-encoding:blob"; pub const PACKED_STRUCT_LEGACY_META_KEY: &str = "packed"; pub const PACKED_STRUCT_META_KEY: &str = "lance-encoding:packed"; +pub const STRUCTURAL_ENCODING_META_KEY: &str = "lance-encoding:structural-encoding"; +pub const STRUCTURAL_ENCODING_MINIBLOCK: &str = "miniblock"; +pub const STRUCTURAL_ENCODING_FULLZIP: &str = "fullzip"; lazy_static::lazy_static! { pub static ref BLOB_DESC_FIELDS: Fields = diff --git a/rust/lance-core/src/utils/path.rs b/rust/lance-core/src/utils/path.rs index 72d7311894..fb4ec56eb4 100644 --- a/rust/lance-core/src/utils/path.rs +++ b/rust/lance-core/src/utils/path.rs @@ -11,7 +11,7 @@ impl LancePathExt for Path { fn child_path(&self, path: &Path) -> Path { let mut new_path = self.clone(); for part in path.parts() { - new_path = path.child(part); + new_path = new_path.child(part); } new_path } diff --git a/rust/lance-datagen/src/generator.rs b/rust/lance-datagen/src/generator.rs index 3d8f4d8012..fbb9f63b3a 100644 --- a/rust/lance-datagen/src/generator.rs +++ b/rust/lance-datagen/src/generator.rs @@ -1307,6 +1307,38 @@ impl BatchGeneratorBuilder { } } +/// Factory for creating a single random array +pub struct ArrayGeneratorBuilder { + generator: Box, + seed: Option, +} + +impl ArrayGeneratorBuilder { + fn new(generator: Box) -> Self { + Self { + generator, + seed: None, + } + } + + /// Use the given seed for the generator + pub fn with_seed(mut self, seed: Seed) -> Self { + self.seed = Some(seed); + self + } + + /// Generate a single array with the given length + pub fn into_array_rows( + mut self, + length: RowCount, + ) -> Result, ArrowError> { + let mut rng = rand_xoshiro::Xoshiro256PlusPlus::seed_from_u64( + self.seed.map(|s| s.0).unwrap_or(DEFAULT_SEED.0), + ); + self.generator.generate(length, &mut rng) + } +} + const MS_PER_DAY: i64 = 86400000; pub mod array { @@ -1858,11 +1890,16 @@ pub mod array { } } -/// Create a BatchGeneratorBuilder to start generating data +/// Create a BatchGeneratorBuilder to start generating batch data pub fn gen() -> BatchGeneratorBuilder { BatchGeneratorBuilder::default() } +/// Create an ArrayGeneratorBuilder to start generating array data +pub fn gen_array(gen: Box) -> ArrayGeneratorBuilder { + ArrayGeneratorBuilder::new(gen) +} + /// Create a BatchGeneratorBuilder with the given schema /// /// You can add more columns or convert this into a reader immediately diff --git a/rust/lance-encoding/src/data.rs b/rust/lance-encoding/src/data.rs index c0d3e27791..7ee182b9f6 100644 --- a/rust/lance-encoding/src/data.rs +++ b/rust/lance-encoding/src/data.rs @@ -928,7 +928,7 @@ impl DataBlock { Self::Empty() => Self::Empty(), Self::Constant(inner) => Self::Constant(inner), Self::AllNull(_) => panic!("Cannot remove validity on all-null data"), - Self::Nullable(inner) => *inner.data, + Self::Nullable(inner) => inner.data.remove_validity(), Self::FixedWidth(inner) => Self::FixedWidth(inner), Self::FixedSizeList(inner) => Self::FixedSizeList(inner.remove_validity()), Self::VariableWidth(inner) => Self::VariableWidth(inner), diff --git a/rust/lance-encoding/src/encodings/logical/primitive.rs b/rust/lance-encoding/src/encodings/logical/primitive.rs index e51a82baf9..342a4d7c72 100644 --- a/rust/lance-encoding/src/encodings/logical/primitive.rs +++ b/rust/lance-encoding/src/encodings/logical/primitive.rs @@ -17,8 +17,18 @@ use arrow_schema::{DataType, Field as ArrowField}; use futures::{future::BoxFuture, stream::FuturesUnordered, FutureExt, TryStreamExt}; use itertools::Itertools; use lance_arrow::deepcopy::deep_copy_array; -use lance_core::utils::bit::pad_bytes; -use lance_core::utils::hash::U8SliceKey; +use lance_core::{ + cache::FileMetadataCache, + datatypes::{ + STRUCTURAL_ENCODING_FULLZIP, STRUCTURAL_ENCODING_META_KEY, STRUCTURAL_ENCODING_MINIBLOCK, + }, + utils::bit::pad_bytes, + Error, +}; +use lance_core::{ + cache::{Context, DeepSizeOf}, + utils::hash::U8SliceKey, +}; use log::{debug, trace}; use snafu::{location, Location}; @@ -268,7 +278,11 @@ impl FieldScheduler for PrimitiveFieldScheduler { /// a single page. trait StructuralPageScheduler: std::fmt::Debug + Send { /// Fetches any metadata required for the page - fn initialize<'a>(&'a mut self, io: &Arc) -> BoxFuture<'a, Result<()>>; + fn initialize<'a>( + &'a mut self, + io: &Arc, + cache: &Arc, + ) -> BoxFuture<'a, Result<()>>; /// Schedules the read of the given ranges in the page fn schedule_ranges( &self, @@ -835,7 +849,12 @@ impl ComplexAllNullScheduler { } impl StructuralPageScheduler for ComplexAllNullScheduler { - fn initialize<'a>(&'a mut self, io: &Arc) -> BoxFuture<'a, Result<()>> { + fn initialize<'a>( + &'a mut self, + io: &Arc, + // TODO: Utilize cache here + _: &Arc, + ) -> BoxFuture<'a, Result<()>> { // Fully load the rep & def buffers, as needed let (rep_pos, rep_size) = self.buffer_offsets_and_sizes[0]; let (def_pos, def_size) = self.buffer_offsets_and_sizes[1]; @@ -998,7 +1017,11 @@ impl DecodePageTask for DecodeComplexAllNullTask { pub struct SimpleAllNullScheduler {} impl StructuralPageScheduler for SimpleAllNullScheduler { - fn initialize<'a>(&'a mut self, _io: &Arc) -> BoxFuture<'a, Result<()>> { + fn initialize<'a>( + &'a mut self, + _io: &Arc, + _cache: &Arc, + ) -> BoxFuture<'a, Result<()>> { std::future::ready(Ok(())).boxed() } @@ -1060,9 +1083,32 @@ struct MiniBlockSchedulerDictionary { dictionary_decompressor: Arc, dictionary_buf_position_and_size: (u64, u64), dictionary_data_alignment: u64, +} - // This is set after initialization - dictionary_data: Arc, +/// State that is loaded once and cached for future lookups +#[derive(Debug)] +struct MiniBlockCacheableState { + /// Metadata that describes each chunk in the page + chunk_meta: Vec, + /// The repetition index for each chunk + /// + /// There will be one element per chunk if no repetition (# items) + /// Otherwise, there will be one element plus N elements where N + /// is the maximum nested random access supported + rep_index: Vec>, + /// The dictionary for the page, if any + dictionary: Option>, +} + +impl DeepSizeOf for MiniBlockCacheableState { + fn deep_size_of_children(&self, context: &mut Context) -> usize { + self.rep_index.deep_size_of_children(context) + + self + .dictionary + .as_ref() + .map(|dict| dict.data_size() as usize) + .unwrap_or(0) + } } /// A scheduler for a page that has been encoded with the mini-block layout @@ -1098,15 +1144,14 @@ pub struct MiniBlockScheduler { priority: u64, items_in_page: u64, repetition_index_depth: u16, + cache_key: String, rep_decompressor: Arc, def_decompressor: Arc, value_decompressor: Arc, def_meaning: Arc<[DefinitionInterpretation]>, - // These are set after initialization - chunk_meta: Vec, - rep_index: Vec>, - dictionary: Option, + // This is set after initialization + page_meta: Option>, } impl MiniBlockScheduler { @@ -1114,6 +1159,8 @@ impl MiniBlockScheduler { buffer_offsets_and_sizes: &[(u64, u64)], priority: u64, items_in_page: u64, + page_number: usize, + column_number: usize, layout: &pb::MiniBlockLayout, decompressors: &dyn DecompressorStrategy, ) -> Result { @@ -1137,7 +1184,6 @@ impl MiniBlockScheduler { .into(), dictionary_buf_position_and_size: buffer_offsets_and_sizes[2], dictionary_data_alignment: 4, - dictionary_data: Arc::new(DataBlock::Empty()), }) } pb::array_encoding::ArrayEncoding::Flat(_) => Some(MiniBlockSchedulerDictionary { @@ -1146,7 +1192,6 @@ impl MiniBlockScheduler { .into(), dictionary_buf_position_and_size: buffer_offsets_and_sizes[2], dictionary_data_alignment: 16, - dictionary_data: Arc::new(DataBlock::Empty()), }), _ => { unreachable!("Currently only encodings `BinaryBlock` and `Flat` used for encoding MiniBlock dictionary.") @@ -1156,6 +1201,8 @@ impl MiniBlockScheduler { None }; + let cache_key = format!("miniblock/{}/{}", page_number, column_number); + Ok(Self { buffer_offsets_and_sizes: buffer_offsets_and_sizes.to_vec(), rep_decompressor: rep_decompressor.into(), @@ -1163,19 +1210,20 @@ impl MiniBlockScheduler { value_decompressor: value_decompressor.into(), repetition_index_depth: layout.repetition_index_depth as u16, priority, + cache_key, items_in_page, - chunk_meta: Vec::new(), - rep_index: Vec::new(), dictionary, def_meaning: def_meaning.into(), + page_meta: None, }) } fn lookup_chunks(&self, chunk_indices: &[usize]) -> Vec { + let page_meta = self.page_meta.as_ref().unwrap(); chunk_indices .iter() .map(|&chunk_idx| { - let chunk_meta = &self.chunk_meta[chunk_idx]; + let chunk_meta = &page_meta.chunk_meta[chunk_idx]; let bytes_start = chunk_meta.offset_bytes; let bytes_end = bytes_start + chunk_meta.chunk_size_bytes; LoadedChunk { @@ -1429,7 +1477,16 @@ impl ChunkInstructions { } impl StructuralPageScheduler for MiniBlockScheduler { - fn initialize<'a>(&'a mut self, io: &Arc) -> BoxFuture<'a, Result<()>> { + fn initialize<'a>( + &'a mut self, + io: &Arc, + cache: &Arc, + ) -> BoxFuture<'a, Result<()>> { + if let Some(cached_state) = cache.get_by_str(&self.cache_key) { + self.page_meta = Some(cached_state); + return Box::pin(std::future::ready(Ok(()))); + } + // We always need to fetch chunk metadata. We may also need to fetch a dictionary and // we may also need to fetch the repetition index. Here, we gather what buffers we // need. @@ -1457,6 +1514,7 @@ impl StructuralPageScheduler for MiniBlockScheduler { } let io_req = io.submit_request(required_ranges, 0); + let cache = cache.clone(); async move { let mut buffers = io_req.await?.into_iter().fuse(); let meta_bytes = buffers.next().unwrap(); @@ -1468,7 +1526,11 @@ impl StructuralPageScheduler for MiniBlockScheduler { let mut bytes = LanceBuffer::from_bytes(meta_bytes, 2); let words = bytes.borrow_to_typed_slice::(); let words = words.as_ref(); - self.chunk_meta.reserve(words.len()); + let mut page_meta = MiniBlockCacheableState { + chunk_meta: Vec::with_capacity(words.len()), + rep_index: Vec::with_capacity(words.len()), + dictionary: None, + }; let mut rows_counter = 0; let mut offset_bytes = value_buf_position; for (word_idx, word) in words.iter().enumerate() { @@ -1485,7 +1547,7 @@ impl StructuralPageScheduler for MiniBlockScheduler { }; rows_counter += num_values; - self.chunk_meta.push(ChunkMeta { + page_meta.chunk_meta.push(ChunkMeta { num_values, chunk_size_bytes: num_bytes as u64, offset_bytes, @@ -1501,7 +1563,7 @@ impl StructuralPageScheduler for MiniBlockScheduler { let mut repetition_index_vals = LanceBuffer::from_bytes(rep_index_data, 8); let repetition_index_vals = repetition_index_vals.borrow_to_typed_slice::(); // Unflatten - self.rep_index = repetition_index_vals + page_meta.rep_index = repetition_index_vals .as_ref() .chunks_exact(self.repetition_index_depth as usize + 1) .map(|c| c.to_vec()) @@ -1509,7 +1571,7 @@ impl StructuralPageScheduler for MiniBlockScheduler { } else { // Default rep index is just the number of items in each chunk // with 0 partials/leftovers - self.rep_index = self + page_meta.rep_index = page_meta .chunk_meta .iter() .map(|c| vec![c.num_values, 0]) @@ -1519,14 +1581,17 @@ impl StructuralPageScheduler for MiniBlockScheduler { // decode dictionary if let Some(ref mut dictionary) = self.dictionary { let dictionary_data = dictionary_bytes.unwrap(); - dictionary.dictionary_data = - Arc::new(dictionary.dictionary_decompressor.decompress( + page_meta.dictionary = + Some(Arc::new(dictionary.dictionary_decompressor.decompress( LanceBuffer::from_bytes( dictionary_data, dictionary.dictionary_data_alignment, ), - )?) + )?)); }; + let page_meta = Arc::new(page_meta); + cache.insert_by_str(&self.cache_key, page_meta.clone()); + self.page_meta = Some(page_meta); Ok(()) } .boxed() @@ -1537,7 +1602,10 @@ impl StructuralPageScheduler for MiniBlockScheduler { ranges: &[Range], io: &dyn EncodingsIo, ) -> Result>>> { - let chunk_instructions = ChunkInstructions::schedule_instructions(&self.rep_index, ranges); + let page_meta = self.page_meta.as_ref().unwrap(); + + let chunk_instructions = + ChunkInstructions::schedule_instructions(&page_meta.rep_index, ranges); let num_rows = ranges.iter().map(|r| r.end - r.start).sum(); debug_assert_eq!( @@ -1570,10 +1638,10 @@ impl StructuralPageScheduler for MiniBlockScheduler { let rep_decompressor = self.rep_decompressor.clone(); let def_decompressor = self.def_decompressor.clone(); let value_decompressor = self.value_decompressor.clone(); - let dictionary = self + let dictionary = page_meta .dictionary .as_ref() - .map(|dictionary| dictionary.dictionary_data.clone()); + .map(|dictionary| dictionary.clone()); let def_meaning = self.def_meaning.clone(); Ok(async move { @@ -1650,7 +1718,11 @@ impl FullZipScheduler { } impl StructuralPageScheduler for FullZipScheduler { - fn initialize<'a>(&'a mut self, _io: &Arc) -> BoxFuture<'a, Result<()>> { + fn initialize<'a>( + &'a mut self, + _io: &Arc, + _: &Arc, + ) -> BoxFuture<'a, Result<()>> { std::future::ready(Ok(())).boxed() } @@ -1974,7 +2046,12 @@ impl StructuralPrimitiveFieldScheduler { .iter() .enumerate() .map(|(page_index, page_info)| { - Self::page_info_to_scheduler(page_info, page_index, decompressors) + Self::page_info_to_scheduler( + page_info, + page_index, + column_info.index as usize, + decompressors, + ) }) .collect::>>()?; Ok(Self { @@ -1986,6 +2063,7 @@ impl StructuralPrimitiveFieldScheduler { fn page_info_to_scheduler( page_info: &PageInfo, page_index: usize, + column_index: usize, decompressors: &dyn DecompressorStrategy, ) -> Result { let scheduler: Box = @@ -1995,6 +2073,8 @@ impl StructuralPrimitiveFieldScheduler { &page_info.buffer_offsets_and_sizes, page_info.priority, mini_block.num_items, + page_index, + column_index, mini_block, decompressors, )?) @@ -2045,7 +2125,7 @@ impl StructuralFieldScheduler for StructuralPrimitiveFieldScheduler { let page_init = self .page_schedulers .iter_mut() - .map(|s| s.scheduler.initialize(context.io())) + .map(|s| s.scheduler.initialize(context.io(), context.cache())) .collect::>(); async move { page_init.try_collect::>().await?; @@ -2657,6 +2737,25 @@ impl PrimitiveStructuralEncoder { false } + fn prefers_miniblock(data_block: &DataBlock, field: &Field) -> bool { + // If the user specifically requested miniblock then use it + if let Some(user_requested) = field.metadata.get(STRUCTURAL_ENCODING_META_KEY) { + return user_requested.to_lowercase() == STRUCTURAL_ENCODING_MINIBLOCK; + } + // Otherwise only use miniblock if it is narrow + Self::is_narrow(data_block) + } + + fn prefers_fullzip(field: &Field) -> bool { + // Fullzip is the backup option so the only reason we wouldn't use it is if the + // user specifically requested not to use it (in which case we're probably going + // to emit an error) + if let Some(user_requested) = field.metadata.get(STRUCTURAL_ENCODING_META_KEY) { + return user_requested.to_lowercase() == STRUCTURAL_ENCODING_FULLZIP; + } + true + } + // Converts value data, repetition levels, and definition levels into a single // buffer of mini-blocks. In addition, creates a buffer of mini-block metadata // which tells us the size of each block. Finally, if repetition is present then @@ -3376,7 +3475,7 @@ impl PrimitiveStructuralEncoder { Some(dictionary_data_block), num_rows, ) - } else if Self::is_narrow(&data_block) { + } else if Self::prefers_miniblock(&data_block, &field) { log::debug!( "Encoding column {} with {} items using mini-block layout", column_idx, @@ -3392,7 +3491,7 @@ impl PrimitiveStructuralEncoder { None, num_rows, ) - } else { + } else if Self::prefers_fullzip(&field) { log::debug!( "Encoding column {} with {} items using full-zip layout", column_idx, @@ -3406,6 +3505,8 @@ impl PrimitiveStructuralEncoder { repdefs, row_number, ) + } else { + Err(Error::InvalidInput { source: format!("Cannot determine structural encoding for field {}. This typically indicates an invalid value of the field metadata key {}", field.name, STRUCTURAL_ENCODING_META_KEY).into(), location: location!() }) } } })