diff --git a/python/src/file.rs b/python/src/file.rs index d4f9bf586b..9d14d13ff0 100644 --- a/python/src/file.rs +++ b/python/src/file.rs @@ -19,6 +19,7 @@ use arrow_schema::Schema as ArrowSchema; use bytes::Bytes; use futures::stream::StreamExt; use lance::io::{ObjectStore, RecordBatchStream}; +use lance_core::cache::FileMetadataCache; use lance_encoding::decoder::{DecoderMiddlewareChain, FilterExpression}; use lance_file::{ v2::{ @@ -331,9 +332,14 @@ impl LanceFileReader { }, ); let file = scheduler.open_file(&path).await.infer_error()?; - let inner = FileReader::try_open(file, None, DecoderMiddlewareChain::default()) - .await - .infer_error()?; + let inner = FileReader::try_open( + file, + None, + Arc::::default(), + &FileMetadataCache::no_cache(), + ) + .await + .infer_error()?; Ok(Self { inner: Arc::new(inner), }) diff --git a/rust/lance-core/src/cache.rs b/rust/lance-core/src/cache.rs index 289b48d358..731473a062 100644 --- a/rust/lance-core/src/cache.rs +++ b/rust/lance-core/src/cache.rs @@ -11,6 +11,7 @@ use futures::Future; use moka::sync::Cache; use object_store::path::Path; +use crate::utils::path::LancePathExt; use crate::Result; pub const DEFAULT_INDEX_CACHE_SIZE: usize = 128; @@ -21,7 +22,7 @@ type ArcAny = Arc; #[derive(Clone)] struct SizedRecord { record: ArcAny, - size_accessor: Arc usize + Send + Sync>, + size_accessor: Arc usize + Send + Sync>, } impl std::fmt::Debug for SizedRecord { @@ -35,7 +36,7 @@ impl std::fmt::Debug for SizedRecord { impl SizedRecord { fn new(record: Arc) -> Self { let size_accessor = - |record: ArcAny| -> usize { record.downcast_ref::().unwrap().deep_size_of() }; + |record: &ArcAny| -> usize { record.downcast_ref::().unwrap().deep_size_of() }; Self { record, size_accessor: Arc::new(size_accessor), @@ -48,38 +49,104 @@ impl SizedRecord { /// The cache is keyed by the file path and the type of metadata. #[derive(Clone, Debug)] pub struct FileMetadataCache { - cache: Arc>, + cache: Option>>, + base_path: Option, } impl DeepSizeOf for FileMetadataCache { fn deep_size_of_children(&self, _: &mut Context) -> usize { self.cache - .iter() - .map(|(_, v)| (v.size_accessor)(v.record)) - .sum() + .as_ref() + .map(|cache| { + cache + .iter() + .map(|(_, v)| (v.size_accessor)(&v.record)) + .sum() + }) + .unwrap_or(0) } } +pub enum CapacityMode { + Items, + Bytes, +} + impl FileMetadataCache { + /// Instantiates a new cache which, for legacy reasons, uses Items capacity mode. pub fn new(capacity: usize) -> Self { Self { - cache: Arc::new(Cache::new(capacity as u64)), + cache: Some(Arc::new(Cache::new(capacity as u64))), + base_path: None, + } + } + + /// Instantiates a dummy cache that will never cache anything. + pub fn no_cache() -> Self { + Self { + cache: None, + base_path: None, + } + } + + /// Instantiates a new cache with a given capacity and capacity mode. + pub fn with_capacity(capacity: usize, mode: CapacityMode) -> Self { + match mode { + CapacityMode::Items => Self::new(capacity), + CapacityMode::Bytes => Self { + cache: Some(Arc::new( + Cache::builder() + .weigher(|_, v: &SizedRecord| { + (v.size_accessor)(&v.record).try_into().unwrap_or(u32::MAX) + }) + .build(), + )), + base_path: None, + }, + } + } + + /// Creates a new cache which shares the same underlying cache but prepends `base_path` to all + /// keys. + pub fn with_base_path(&self, base_path: Path) -> Self { + Self { + cache: self.cache.clone(), + base_path: Some(base_path), } } pub fn size(&self) -> usize { - self.cache.entry_count() as usize + if let Some(cache) = self.cache.as_ref() { + cache.entry_count() as usize + } else { + 0 + } } pub fn get(&self, path: &Path) -> Option> { - self.cache + let cache = self.cache.as_ref()?; + let temp: Path; + let path = if let Some(base_path) = &self.base_path { + temp = base_path.child_path(path); + &temp + } else { + path + }; + cache .get(&(path.to_owned(), TypeId::of::())) .map(|metadata| metadata.record.clone().downcast::().unwrap()) } pub fn insert(&self, path: Path, metadata: Arc) { - self.cache - .insert((path, TypeId::of::()), SizedRecord::new(metadata)); + let Some(cache) = self.cache.as_ref() else { + return; + }; + let path = if let Some(base_path) = &self.base_path { + base_path.child_path(&path) + } else { + path + }; + cache.insert((path, TypeId::of::()), SizedRecord::new(metadata)); } /// Get an item diff --git a/rust/lance-core/src/utils.rs b/rust/lance-core/src/utils.rs index 0ec72545d9..86676466ab 100644 --- a/rust/lance-core/src/utils.rs +++ b/rust/lance-core/src/utils.rs @@ -7,6 +7,7 @@ pub mod cpu; pub mod deletion; pub mod futures; pub mod mask; +pub mod path; pub mod testing; pub mod tokio; pub mod tracing; diff --git a/rust/lance-core/src/utils/path.rs b/rust/lance-core/src/utils/path.rs new file mode 100644 index 0000000000..72d7311894 --- /dev/null +++ b/rust/lance-core/src/utils/path.rs @@ -0,0 +1,18 @@ +// SPDX-License-Identifier: Apache-2.0 +// SPDX-FileCopyrightText: Copyright The Lance Authors + +use object_store::path::Path; + +pub trait LancePathExt { + fn child_path(&self, path: &Path) -> Path; +} + +impl LancePathExt for Path { + fn child_path(&self, path: &Path) -> Path { + let mut new_path = self.clone(); + for part in path.parts() { + new_path = path.child(part); + } + new_path + } +} diff --git a/rust/lance-datafusion/src/substrait.rs b/rust/lance-datafusion/src/substrait.rs index a2db60b871..7c86479622 100644 --- a/rust/lance-datafusion/src/substrait.rs +++ b/rust/lance-datafusion/src/substrait.rs @@ -271,22 +271,31 @@ pub async fn parse_substrait(expr: &[u8], input_schema: Arc) -> Result, /// As we visit the decoding tree we populate this with the pushdown /// information that is available. - zone_map_buffers: Vec, + zone_map_buffers: HashMap, } /// This strategy is responsible for creating the field scheduler @@ -60,7 +63,7 @@ impl LanceDfFieldDecoderStrategy { if state.is_none() { *state = Some(LanceDfFieldDecoderState { rows_per_map: None, - zone_map_buffers: Vec::new(), + zone_map_buffers: HashMap::new(), }); true } else { @@ -68,7 +71,12 @@ impl LanceDfFieldDecoderStrategy { } } - fn add_pushdown_field(&self, rows_per_map: u32, unloaded_pushdown: UnloadedPushdown) { + fn add_pushdown_field( + &self, + field: &Field, + rows_per_map: u32, + unloaded_pushdown: UnloadedPushdown, + ) { let mut state = self.state.lock().unwrap(); let state = state.as_mut().unwrap(); match state.rows_per_map { @@ -79,7 +87,9 @@ impl LanceDfFieldDecoderStrategy { state.rows_per_map = Some(rows_per_map); } } - state.zone_map_buffers.push(unloaded_pushdown); + state + .zone_map_buffers + .insert(field.id as u32, unloaded_pushdown); } } @@ -96,55 +106,40 @@ impl FieldDecoderStrategy for LanceDfFieldDecoderStrategy { )> { let is_root = self.initialize(); - if let Some((rows_per_map, unloaded_pushdown)) = extract_zone_info( - column_infos.next().unwrap(), - &field.data_type(), - chain.current_path(), - ) { + if let Some((rows_per_map, unloaded_pushdown)) = + extract_zone_info(column_infos, &field.data_type(), chain.current_path()) + { // If there is pushdown info then record it and unwrap the // pushdown encoding layer. - self.add_pushdown_field(rows_per_map, unloaded_pushdown); + self.add_pushdown_field(field, rows_per_map, unloaded_pushdown); } // Delegate to the rest of the chain to create the decoder let (chain, next) = chain.next(field, column_infos, buffers)?; // If this is the top level decoder then wrap it with our // pushdown filtering scheduler. - let state = if is_root { - self.state.lock().unwrap().take() - } else { - None - }; - let schema = self.schema.clone(); - let _io = chain.io().clone(); - - let next = next?; if is_root { - let state = state.unwrap(); + let state = self.state.lock().unwrap().take().unwrap(); + let schema = self.schema.clone(); let rows_per_map = state.rows_per_map; let zone_map_buffers = state.zone_map_buffers; + let next = next?; let num_rows = next.num_rows(); if rows_per_map.is_none() { // No columns had any pushdown info Ok((chain, Ok(next))) } else { - let mut _scheduler = ZoneMapsFieldScheduler::new( + let scheduler = ZoneMapsFieldScheduler::new( next, schema, zone_map_buffers, rows_per_map.unwrap(), num_rows, ); - // Load all the zone maps from disk - // TODO: it would be slightly more efficient to do this - // later when we know what columns are actually used - // for filtering. - // scheduler.initialize(io.as_ref()).await?; - // Ok(Arc::new(scheduler) as Arc) - todo!() + Ok((chain, Ok(Arc::new(scheduler)))) } } else { - Ok((chain, Ok(next))) + Ok((chain, next)) } } } diff --git a/rust/lance-encoding-datafusion/src/substrait.rs b/rust/lance-encoding-datafusion/src/substrait.rs index 299b84c8dd..4e81e70195 100644 --- a/rust/lance-encoding-datafusion/src/substrait.rs +++ b/rust/lance-encoding-datafusion/src/substrait.rs @@ -5,7 +5,6 @@ use std::sync::Arc; use arrow_schema::Schema as ArrowSchema; use bytes::Bytes; -use datafusion_common::DFSchema; use datafusion_common::ScalarValue; use datafusion_expr::Expr; use futures::FutureExt; @@ -19,7 +18,7 @@ use lance_encoding::decoder::FilterExpression; pub trait FilterExpressionExt { /// Convert a lance-encoding filter expression (which we assume is /// substrait encoded) into a datafusion expr - fn substrait_to_df(&self, schema: &Schema) -> Result<(Expr, DFSchema)>; + fn substrait_to_df(&self, schema: Arc) -> Result; /// Convert a datafusion filter expression into a lance-encoding /// filter expression (using substrait) fn df_to_substrait(expr: Expr, schema: &Schema) -> Result @@ -28,19 +27,12 @@ pub trait FilterExpressionExt { } impl FilterExpressionExt for FilterExpression { - fn substrait_to_df(&self, schema: &Schema) -> Result<(Expr, DFSchema)> { + fn substrait_to_df(&self, schema: Arc) -> Result { if self.0.is_empty() { - return Ok(( - Expr::Literal(ScalarValue::Boolean(Some(true))), - DFSchema::empty(), - )); + return Ok(Expr::Literal(ScalarValue::Boolean(Some(true)))); } - let input_schema = Arc::new(ArrowSchema::from(schema)); - let expr = parse_substrait(&self.0, input_schema.clone()) - .now_or_never() - .unwrap()?; - let df_schema = DFSchema::try_from(input_schema.as_ref().clone())?; - Ok((expr, df_schema)) + let expr = parse_substrait(&self.0, schema).now_or_never().unwrap()?; + Ok(expr) } fn df_to_substrait(expr: Expr, schema: &Schema) -> Result diff --git a/rust/lance-encoding-datafusion/src/zone.rs b/rust/lance-encoding-datafusion/src/zone.rs index 627ada5ee6..7b38140e11 100644 --- a/rust/lance-encoding-datafusion/src/zone.rs +++ b/rust/lance-encoding-datafusion/src/zone.rs @@ -1,12 +1,16 @@ // SPDX-License-Identifier: Apache-2.0 // SPDX-FileCopyrightText: Copyright The Lance Authors -use std::{collections::VecDeque, ops::Range, sync::Arc}; +use std::{ + collections::{HashMap, VecDeque}, + ops::Range, + sync::{Arc, Mutex}, +}; use arrow_array::{cast::AsArray, types::UInt32Type, ArrayRef, RecordBatch, UInt32Array}; use arrow_schema::{Field as ArrowField, Schema as ArrowSchema}; use bytes::Bytes; -use datafusion_common::{arrow::datatypes::DataType, DFSchemaRef, ScalarValue}; +use datafusion_common::{arrow::datatypes::DataType, DFSchema, DFSchemaRef, ScalarValue}; use datafusion_expr::{ col, execution_props::ExecutionProps, @@ -18,10 +22,11 @@ use datafusion_functions::core::expr_ext::FieldAccessor; use datafusion_optimizer::simplify_expressions::ExprSimplifier; use datafusion_physical_expr::expressions::{MaxAccumulator, MinAccumulator}; use futures::{future::BoxFuture, FutureExt}; +use lance_datafusion::planner::Planner; use lance_encoding::{ buffer::LanceBuffer, decoder::{ - decode_batch, ColumnInfo, DecoderMiddlewareChain, FieldScheduler, FilterExpression, + decode_batch, ColumnInfoIter, DecoderMiddlewareChain, FieldScheduler, FilterExpression, PriorityRange, ScheduledScanLine, SchedulerContext, SchedulingJob, }, encoder::{ @@ -32,7 +37,7 @@ use lance_encoding::{ EncodingsIo, }; -use lance_core::{datatypes::Schema, Error, Result}; +use lance_core::{cache::FileMetadataCache, datatypes::Schema, Error, Result}; use lance_file::v2::{reader::EncodedBatchReaderExt, writer::EncodedBatchWriteExt}; use snafu::{location, Location}; @@ -123,38 +128,43 @@ fn path_to_expr(path: &VecDeque) -> Expr { /// If a column has zone info in the encoding description then extract it pub(crate) fn extract_zone_info( - _column_info: &ColumnInfo, - _data_type: &DataType, - _cur_path: &VecDeque, + column_info: &mut ColumnInfoIter, + data_type: &DataType, + cur_path: &VecDeque, ) -> Option<(u32, UnloadedPushdown)> { - todo!() - // let encoding = column_info.encoding.column_encoding.take().unwrap(); - // match encoding { - // pb::column_encoding::ColumnEncoding::ZoneIndex(mut zone_index) => { - // let inner = zone_index.inner.take().unwrap(); - // let rows_per_zone = zone_index.rows_per_zone; - // let zone_map_buffer = zone_index.zone_map_buffer.as_ref().unwrap().clone(); - // assert_eq!( - // zone_map_buffer.buffer_type, - // i32::from(pb::buffer::BufferType::Column) - // ); - // let (position, size) = - // column_info.buffer_offsets_and_sizes[zone_map_buffer.buffer_index as usize]; - // column_info.encoding = *inner; - // let column = path_to_expr(cur_path); - // let unloaded_pushdown = UnloadedPushdown { - // data_type: data_type.clone(), - // column, - // position, - // size, - // }; - // Some((rows_per_zone, unloaded_pushdown)) - // } - // _ => { - // column_info.encoding.column_encoding = Some(encoding); - // None - // } - // } + let mut result: Option<(u32, UnloadedPushdown)> = None; + let result_ref = &mut result; + column_info.peek_transform(|col_info| { + let encoding = col_info.encoding.column_encoding.as_ref().unwrap(); + match *encoding { + pb::column_encoding::ColumnEncoding::ZoneIndex(ref zone_index) => { + let mut zone_index = zone_index.clone(); + let inner = zone_index.inner.take().unwrap(); + let rows_per_zone = zone_index.rows_per_zone; + let zone_map_buffer = zone_index.zone_map_buffer.as_ref().unwrap().clone(); + assert_eq!( + zone_map_buffer.buffer_type, + i32::from(pb::buffer::BufferType::Column) + ); + let (position, size) = + col_info.buffer_offsets_and_sizes[zone_map_buffer.buffer_index as usize]; + let column = path_to_expr(cur_path); + let unloaded_pushdown = UnloadedPushdown { + data_type: data_type.clone(), + column, + position, + size, + }; + *result_ref = Some((rows_per_zone, unloaded_pushdown)); + + let mut col_info = col_info.as_ref().clone(); + col_info.encoding = *inner; + Arc::new(col_info) + } + _ => col_info, + } + }); + result } /// Extracted pushdown information obtained from the column encoding @@ -171,23 +181,36 @@ pub struct UnloadedPushdown { size: u64, } +#[derive(Debug)] +struct ZoneMap { + items: Vec<(Expr, NullableInterval)>, +} + +#[derive(Debug)] +struct InitializedState { + zone_maps: Vec, + filter: Option, + df_schema: Option, +} + /// A top level scheduler that refines the requested range based on /// pushdown filtering with zone maps #[derive(Debug)] pub struct ZoneMapsFieldScheduler { inner: Arc, schema: Arc, - pushdown_buffers: Vec, - zone_guarantees: Arc>>, + // A map from field id to unloaded zone map for that field + pushdown_buffers: HashMap, rows_per_zone: u32, num_rows: u64, + initialized_state: Mutex>, } impl ZoneMapsFieldScheduler { pub fn new( inner: Arc, schema: Arc, - pushdown_buffers: Vec, + pushdown_buffers: HashMap, rows_per_zone: u32, num_rows: u64, ) -> Self { @@ -195,58 +218,103 @@ impl ZoneMapsFieldScheduler { inner, schema, pushdown_buffers, - zone_guarantees: Arc::default(), rows_per_zone, num_rows, + // These are set during initialization + initialized_state: Mutex::new(None), } } + async fn load_pushdowns( + &self, + io: &dyn EncodingsIo, + _cache: &FileMetadataCache, + pushdowns: &[&UnloadedPushdown], + ) -> Result> { + // TODO: Use cache + let ranges = pushdowns + .iter() + .map(|pushdown| pushdown.position..pushdown.position + pushdown.size) + .collect(); + let buffers = io.submit_request(ranges, 0).await?; + let mut maps = Vec::new(); + for (buffer, pushdown) in buffers.into_iter().zip(pushdowns.iter()) { + // There's no point in running this in parallel since it's actually synchronous + let map = self + .parse_zone(buffer, &pushdown.data_type, &pushdown.column) + .await?; + maps.push(map); + } + // A this point each item in `maps` is a vector of guarantees for a single field + // We need to transpose this so that each item is a vector of guarantees for a single zone + let zone_maps = transpose2(maps) + .into_iter() + .map(|items| ZoneMap { items }) + .collect(); + Ok(zone_maps) + } + /// Load the zone maps from the file - /// - /// TODO: only load zone maps for columns used in the filter - pub fn initialize<'a>(&'a mut self, io: &dyn EncodingsIo) -> BoxFuture<'a, Result<()>> { - let ranges = self - .pushdown_buffers + async fn load_maps( + &self, + io: &dyn EncodingsIo, + cache: &FileMetadataCache, + filter_schema: &Schema, + ) -> Result> { + let pushdowns_to_load = filter_schema + .fields .iter() - .map(|unloaded_pushdown| { - unloaded_pushdown.position..(unloaded_pushdown.position + unloaded_pushdown.size) + .filter_map(|field| { + let field_id = field.id as u32; + let unloaded = self.pushdown_buffers.get(&field_id)?; + Some(unloaded) }) .collect::>(); - let zone_maps_fut = io.submit_request(ranges, 0); - async move { - let zone_map_buffers = zone_maps_fut.await?; - let mut all_fields = Vec::with_capacity(zone_map_buffers.len()); - for (bytes, unloaded_pushdown) in - zone_map_buffers.iter().zip(self.pushdown_buffers.iter()) - { - let guarantees = self - .map_from_buffer( - bytes.clone(), - &unloaded_pushdown.data_type, - &unloaded_pushdown.column, - ) - .await?; - all_fields.push(guarantees); - } - self.zone_guarantees = Arc::new(transpose2(all_fields)); - Ok(()) - } - .boxed() + self.load_pushdowns(io, cache, &pushdowns_to_load).await } - fn process_filter( + async fn do_initialize( &self, - filter: Expr, - projection_schema: DFSchemaRef, - ) -> Result bool> { - let zone_guarantees = self.zone_guarantees.clone(); + io: &dyn EncodingsIo, + cache: &FileMetadataCache, + filter: &FilterExpression, + ) -> Result<()> { + if filter.is_noop() { + return Ok(()); + } + + let arrow_schema = ArrowSchema::from(self.schema.as_ref()); + let df_schema = DFSchema::try_from(arrow_schema.clone())?; + let df_filter = filter.substrait_to_df(Arc::new(arrow_schema))?; + + let columns = Planner::column_names_in_expr(&df_filter); + let referenced_schema = self.schema.project(&columns)?; + + let df_schema = Some(Arc::new(df_schema)); + let zone_maps = self.load_maps(io, cache, &referenced_schema).await?; + let filter = Some(df_filter); + + let state = InitializedState { + zone_maps, + filter, + df_schema, + }; + let mut initialized_state = self.initialized_state.lock().unwrap(); + *initialized_state = Some(state); + Ok(()) + } + + fn create_filter(&self) -> Result bool + '_> { Ok(move |zone_idx| { - let guarantees = &zone_guarantees[zone_idx as usize]; + let state = self.initialized_state.lock().unwrap(); + let state = state.as_ref().unwrap(); + let zone_map = &state.zone_maps[zone_idx as usize]; let props = ExecutionProps::new(); - let context = SimplifyContext::new(&props).with_schema(projection_schema.clone()); + let context = + SimplifyContext::new(&props).with_schema(state.df_schema.as_ref().unwrap().clone()); let mut simplifier = ExprSimplifier::new(context); - simplifier = simplifier.with_guarantees(guarantees.clone()); - match simplifier.simplify(filter.clone()) { + simplifier = simplifier.with_guarantees(zone_map.items.clone()); + match simplifier.simplify(state.filter.as_ref().unwrap().clone()) { Ok(expr) => match expr { // Predicate, given guarantees, is always false, we can skip the zone Expr::Literal(ScalarValue::Boolean(Some(false))) => false, @@ -301,7 +369,7 @@ impl ZoneMapsFieldScheduler { guarantees } - async fn map_from_buffer( + async fn parse_zone( &self, buffer: Bytes, data_type: &DataType, @@ -317,7 +385,7 @@ impl ZoneMapsFieldScheduler { let zone_maps_batch = decode_batch( &zone_maps_batch, &FilterExpression::no_filter(), - &DecoderMiddlewareChain::default(), + Arc::::default(), ) .await?; @@ -370,13 +438,27 @@ impl SchedulingJob for EmptySchedulingJob { } impl FieldScheduler for ZoneMapsFieldScheduler { + fn initialize<'a>( + &'a self, + filter: &'a FilterExpression, + context: &'a SchedulerContext, + ) -> BoxFuture<'a, Result<()>> { + async move { + self.do_initialize(context.io().as_ref(), context.cache(), filter) + .await + } + .boxed() + } + fn schedule_ranges<'a>( &'a self, ranges: &[std::ops::Range], filter: &FilterExpression, ) -> Result> { - let (df_filter, projection_schema) = filter.substrait_to_df(self.schema.as_ref())?; - let zone_filter_fn = self.process_filter(df_filter, Arc::new(projection_schema))?; + if filter.is_noop() { + return self.inner.schedule_ranges(ranges, filter); + } + let zone_filter_fn = self.create_filter()?; let zone_filter = ZoneMapsFilter::new(zone_filter_fn, self.rows_per_zone as u64); let ranges = zone_filter.refine_ranges(ranges); if ranges.is_empty() { @@ -528,18 +610,20 @@ impl FieldEncoder for ZoneMapsFieldEncoder { } fn flush(&mut self) -> Result> { - if self.cur_offset > 0 { - // Create final map - self.new_map()?; - } self.items_encoder.flush() } fn finish(&mut self) -> BoxFuture<'_, Result>> { async move { + if self.cur_offset > 0 { + // Create final map + self.new_map()?; + } let items_columns = self.items_encoder.finish().await?; - if items_columns.is_empty() { - return Err(Error::invalid_input("attempt to apply zone maps to a field encoder that generated zero columns of data".to_string(), location!())) + if items_columns.len() != 1 { + return Err(Error::InvalidInput { + source: format!("attempt to apply zone maps to a field encoder that generated {} columns of data (expected 1)", items_columns.len()).into(), + location: location!()}) } let items_column = items_columns.into_iter().next().unwrap(); let final_pages = items_column.final_pages; @@ -593,8 +677,6 @@ mod tests { }; #[test_log::test(tokio::test)] - #[ignore] // Stats currently disabled until https://github.com/lancedb/lance/issues/2605 - // is addressed async fn test_basic_stats() { let data = lance_datagen::gen() .col("0", lance_datagen::array::step::()) @@ -609,11 +691,13 @@ mod tests { let written_file = write_lance_file(data, &fs, options).await; - let decoder_middleware = DecoderMiddlewareChain::new() - .add_strategy(Arc::new(LanceDfFieldDecoderStrategy::new( - written_file.schema.clone(), - ))) - .add_strategy(Arc::new(CoreFieldDecoderStrategy::default())); + let decoder_middleware = Arc::new( + DecoderMiddlewareChain::new() + .add_strategy(Arc::new(LanceDfFieldDecoderStrategy::new( + written_file.schema.clone(), + ))) + .add_strategy(Arc::new(CoreFieldDecoderStrategy::default())), + ); let num_rows = written_file .data @@ -629,15 +713,9 @@ mod tests { .await; assert_eq!(num_rows, result); - let decoder_middleware = DecoderMiddlewareChain::new() - .add_strategy(Arc::new(LanceDfFieldDecoderStrategy::new( - written_file.schema.clone(), - ))) - .add_strategy(Arc::new(CoreFieldDecoderStrategy::default())); - let result = count_lance_file( &fs, - decoder_middleware, + decoder_middleware.clone(), FilterExpression::df_to_substrait( Expr::BinaryExpr(BinaryExpr { left: Box::new(col("0")), @@ -651,12 +729,6 @@ mod tests { .await; assert_eq!(0, result); - let decoder_middleware = DecoderMiddlewareChain::new() - .add_strategy(Arc::new(LanceDfFieldDecoderStrategy::new( - written_file.schema.clone(), - ))) - .add_strategy(Arc::new(CoreFieldDecoderStrategy::default())); - let result = count_lance_file( &fs, decoder_middleware, diff --git a/rust/lance-encoding/benches/decoder.rs b/rust/lance-encoding/benches/decoder.rs index 21155f2453..7c22bb71e1 100644 --- a/rust/lance-encoding/benches/decoder.rs +++ b/rust/lance-encoding/benches/decoder.rs @@ -88,7 +88,7 @@ fn bench_decode(c: &mut Criterion) { .block_on(lance_encoding::decoder::decode_batch( &encoded, &FilterExpression::no_filter(), - &DecoderMiddlewareChain::default(), + Arc::::default(), )) .unwrap(); assert_eq!(data.num_rows(), batch.num_rows()); @@ -127,7 +127,7 @@ fn bench_decode_fsl(c: &mut Criterion) { .block_on(lance_encoding::decoder::decode_batch( &encoded, &FilterExpression::no_filter(), - &DecoderMiddlewareChain::default(), + Arc::::default(), )) .unwrap(); assert_eq!(data.num_rows(), batch.num_rows()); @@ -183,7 +183,7 @@ fn bench_decode_str_with_dict_encoding(c: &mut Criterion) { .block_on(lance_encoding::decoder::decode_batch( &encoded, &FilterExpression::no_filter(), - &DecoderMiddlewareChain::default(), + Arc::::default(), )) .unwrap(); assert_eq!(data.num_rows(), batch.num_rows()); @@ -218,7 +218,6 @@ fn bench_decode_packed_struct(c: &mut Criterion) { .iter() .map(|field| { if matches!(field.data_type(), &DataType::Struct(_)) { - println!("Match"); let mut metadata = HashMap::new(); metadata.insert("packed".to_string(), "true".to_string()); let field = @@ -253,7 +252,7 @@ fn bench_decode_packed_struct(c: &mut Criterion) { .block_on(lance_encoding::decoder::decode_batch( &encoded, &FilterExpression::no_filter(), - &DecoderMiddlewareChain::default(), + Arc::::default(), )) .unwrap(); assert_eq!(data.num_rows(), batch.num_rows()); @@ -301,7 +300,7 @@ fn bench_decode_str_with_fixed_size_binary_encoding(c: &mut Criterion) { .block_on(lance_encoding::decoder::decode_batch( &encoded, &FilterExpression::no_filter(), - &DecoderMiddlewareChain::default(), + Arc::::default(), )) .unwrap(); assert_eq!(data.num_rows(), batch.num_rows()); diff --git a/rust/lance-encoding/src/decoder.rs b/rust/lance-encoding/src/decoder.rs index 97d8ccee7d..8229a7ac27 100644 --- a/rust/lance-encoding/src/decoder.rs +++ b/rust/lance-encoding/src/decoder.rs @@ -221,9 +221,10 @@ use arrow_array::{ArrayRef, RecordBatch}; use arrow_schema::{DataType, Field as ArrowField, Fields, Schema as ArrowSchema}; use bytes::Bytes; use futures::future::BoxFuture; -use futures::stream::BoxStream; +use futures::stream::{self, BoxStream}; use futures::{FutureExt, StreamExt}; use lance_arrow::DataTypeExt; +use lance_core::cache::{CapacityMode, FileMetadataCache}; use lance_core::datatypes::{Field, Schema}; use log::{debug, trace, warn}; use snafu::{location, Location}; @@ -313,6 +314,7 @@ impl ColumnInfo { pub struct DecodeBatchScheduler { pub root_scheduler: Arc, pub root_fields: Fields, + cache: Arc, } /// Represents a series of decoder strategies @@ -347,10 +349,7 @@ impl DecoderMiddlewareChain { /// Obtain a cursor into the chain that can be used to create /// field schedulers - pub(crate) fn cursor<'a>( - &'a self, - io: &'a Arc, - ) -> DecoderMiddlewareChainCursor<'a> { + pub(crate) fn cursor(&self, io: Arc) -> DecoderMiddlewareChainCursor<'_> { DecoderMiddlewareChainCursor { chain: self, io, @@ -367,7 +366,7 @@ impl DecoderMiddlewareChain { /// to create a scheduler from an inner encoding. pub struct DecoderMiddlewareChainCursor<'a> { chain: &'a DecoderMiddlewareChain, - io: &'a Arc, + io: Arc, path: VecDeque, cur_idx: usize, } @@ -385,7 +384,7 @@ impl<'a> DecoderMiddlewareChainCursor<'a> { /// Returns the I/O service which can be used to grab column metadata pub fn io(&self) -> &Arc { - self.io + &self.io } /// Delegates responsibilty to the next encoder in the chain @@ -467,14 +466,14 @@ impl<'a> DecoderMiddlewareChainCursor<'a> { } pub struct ColumnInfoIter<'a> { - column_infos: &'a [ColumnInfo], + column_infos: Vec>, column_indices: &'a [u32], column_info_pos: usize, column_indices_pos: usize, } impl<'a> ColumnInfoIter<'a> { - pub fn new(column_infos: &'a [ColumnInfo], column_indices: &'a [u32]) -> Self { + pub fn new(column_infos: Vec>, column_indices: &'a [u32]) -> Self { let initial_pos = column_indices[0] as usize; Self { column_infos, @@ -484,11 +483,17 @@ impl<'a> ColumnInfoIter<'a> { } } - pub fn peek(&self) -> &'a ColumnInfo { + pub fn peek(&self) -> &Arc { &self.column_infos[self.column_info_pos] } - pub fn expect_next(&mut self) -> Result<&'a ColumnInfo> { + pub fn peek_transform(&mut self, transform: impl FnOnce(Arc) -> Arc) { + let column_info = self.column_infos[self.column_info_pos].clone(); + let transformed = transform(column_info); + self.column_infos[self.column_info_pos] = transformed; + } + + pub fn expect_next(&mut self) -> Result<&Arc> { self.next().ok_or_else(|| { Error::invalid_input( "there were more fields in the schema than provided column indices", @@ -497,20 +502,7 @@ impl<'a> ColumnInfoIter<'a> { }) } - pub(crate) fn next_top_level(&mut self) { - self.column_indices_pos += 1; - if self.column_indices_pos < self.column_indices.len() { - self.column_info_pos = self.column_indices[self.column_indices_pos] as usize; - } else { - self.column_info_pos = self.column_infos.len(); - } - } -} - -impl<'a> Iterator for ColumnInfoIter<'a> { - type Item = &'a ColumnInfo; - - fn next(&mut self) -> Option { + fn next(&mut self) -> Option<&Arc> { if self.column_info_pos < self.column_infos.len() { let info = &self.column_infos[self.column_info_pos]; self.column_info_pos += 1; @@ -519,6 +511,15 @@ impl<'a> Iterator for ColumnInfoIter<'a> { None } } + + pub(crate) fn next_top_level(&mut self) { + self.column_indices_pos += 1; + if self.column_indices_pos < self.column_indices.len() { + self.column_info_pos = self.column_indices[self.column_indices_pos] as usize; + } else { + self.column_info_pos = self.column_infos.len(); + } + } } // A trait that handles the mapping from Arrow schema to field decoders. @@ -735,9 +736,9 @@ impl FieldDecoderStrategy for CoreFieldDecoderStrategy { } } DataType::List(items_field) | DataType::LargeList(items_field) => { - let offsets_column = column_infos.expect_next()?; + let offsets_column = column_infos.expect_next()?.clone(); column_infos.next_top_level(); - Self::ensure_values_encoded(offsets_column, chain.current_path())?; + Self::ensure_values_encoded(offsets_column.as_ref(), chain.current_path())?; let offsets_column_buffers = ColumnBuffers { file_buffers: buffers, positions_and_sizes: &offsets_column.buffer_offsets_and_sizes, @@ -877,48 +878,66 @@ fn root_column(num_rows: u64) -> ColumnInfo { impl DecodeBatchScheduler { /// Creates a new decode scheduler with the expected schema and the column /// metadata of the file. - pub fn try_new<'a>( + #[allow(clippy::too_many_arguments)] + pub async fn try_new<'a>( schema: &'a Schema, column_indices: &[u32], column_infos: &[Arc], file_buffer_positions_and_sizes: &'a Vec<(u64, u64)>, num_rows: u64, - decoder_strategy: &DecoderMiddlewareChain, - io: &Arc, + decoder_strategy: Arc, + io: Arc, + cache: Arc, + filter: &FilterExpression, ) -> Result { + assert!(num_rows > 0); let buffers = FileBuffers { positions_and_sizes: file_buffer_positions_and_sizes, }; let arrow_schema = ArrowSchema::from(schema); let root_fields = arrow_schema.fields().clone(); let mut columns = Vec::with_capacity(column_infos.len() + 1); - columns.push(root_column(num_rows)); - columns.extend(column_infos.iter().map(|col| col.as_ref().clone())); + columns.push(Arc::new(root_column(num_rows))); + columns.extend(column_infos.iter().cloned()); let adjusted_column_indices = [0_u32] .into_iter() .chain(column_indices.iter().map(|i| *i + 1)) .collect::>(); - let mut column_iter = ColumnInfoIter::new(&columns, &adjusted_column_indices); + let mut column_iter = ColumnInfoIter::new(columns, &adjusted_column_indices); let root_type = DataType::Struct(root_fields.clone()); let mut root_field = Field::try_from(&ArrowField::new("root", root_type, false))?; + // root_field.children and schema.fields should be identical at this point but the latter + // has field ids and the former does not. This line restores that. + // TODO: Is there another way to create the root field without forcing a trip through arrow? + root_field.children = schema.fields.clone(); root_field .metadata .insert("__lance_decoder_root".to_string(), "true".to_string()); let (_, root_scheduler) = decoder_strategy - .cursor(io) + .cursor(io.clone()) .start(&root_field, &mut column_iter, buffers)?; let root_scheduler = root_scheduler?; + + let context = SchedulerContext::new(io, cache.clone()); + root_scheduler.initialize(filter, &context).await?; + Ok(Self { root_scheduler, root_fields, + cache, }) } - pub fn from_scheduler(root_scheduler: Arc, root_fields: Fields) -> Self { + pub fn from_scheduler( + root_scheduler: Arc, + root_fields: Fields, + cache: Arc, + ) -> Self { Self { root_scheduler, root_fields, + cache, } } @@ -946,7 +965,7 @@ impl DecodeBatchScheduler { .unwrap_or_default() ); - let mut context = SchedulerContext::new(io); + let mut context = SchedulerContext::new(io, self.cache.clone()); let maybe_root_job = self.root_scheduler.schedule_ranges(ranges, filter); if let Err(schedule_ranges_err) = maybe_root_job { schedule_action(Err(schedule_ranges_err)); @@ -1285,6 +1304,137 @@ impl BatchDecodeStream { } } +#[derive(Debug)] +pub enum RequestedRows { + Ranges(Vec>), + Indices(Vec), +} + +impl RequestedRows { + pub fn num_rows(&self) -> u64 { + match self { + Self::Ranges(ranges) => ranges.iter().map(|r| r.end - r.start).sum(), + Self::Indices(indices) => indices.len() as u64, + } + } +} + +#[derive(Debug, Clone)] +pub struct SchedulerDecoderConfig { + pub decoder_strategy: Arc, + pub batch_size: u32, + pub io: Arc, + pub cache: Arc, +} + +fn check_scheduler_on_drop( + stream: BoxStream<'static, ReadBatchTask>, + scheduler_handle: tokio::task::JoinHandle<()>, +) -> BoxStream<'static, ReadBatchTask> { + // This is a bit weird but we create an "empty stream" that unwraps the scheduler handle (which + // will panic if the scheduler panicked). This let's us check if the scheduler panicked + // when the stream finishes. + let mut scheduler_handle = Some(scheduler_handle); + let check_scheduler = stream::unfold((), move |_| { + let handle = scheduler_handle.take(); + async move { + if let Some(handle) = handle { + handle.await.unwrap(); + } + None + } + }); + stream.chain(check_scheduler).boxed() +} + +async fn create_scheduler_decoder( + column_infos: Vec>, + requested_rows: RequestedRows, + filter: FilterExpression, + column_indices: Vec, + target_schema: Arc, + config: SchedulerDecoderConfig, +) -> Result> { + let num_rows = requested_rows.num_rows(); + + let mut decode_scheduler = DecodeBatchScheduler::try_new( + target_schema.as_ref(), + &column_indices, + &column_infos, + &vec![], + num_rows, + config.decoder_strategy, + config.io.clone(), + config.cache, + &filter, + ) + .await?; + + let root_decoder = match &requested_rows { + RequestedRows::Ranges(ranges) => decode_scheduler.new_root_decoder_ranges(ranges), + RequestedRows::Indices(indices) => decode_scheduler.new_root_decoder_indices(indices), + }; + + let (tx, rx) = mpsc::unbounded_channel(); + + let io = config.io; + let scheduler_handle = tokio::task::spawn(async move { + match requested_rows { + RequestedRows::Ranges(ranges) => { + decode_scheduler.schedule_ranges(&ranges, &filter, tx, io) + } + RequestedRows::Indices(indices) => { + decode_scheduler.schedule_take(&indices, &filter, tx, io) + } + } + }); + + let decode_stream = + BatchDecodeStream::new(rx, config.batch_size, num_rows, root_decoder).into_stream(); + + Ok(check_scheduler_on_drop(decode_stream, scheduler_handle)) +} + +/// Launches a scheduler on a dedicated (spawned) task and creates a decoder to +/// decode the scheduled data and returns the decoder as a stream of record batches. +/// +/// This is a convenience function that creates both the scheduler and the decoder +/// which can be a little tricky to get right. +pub fn schedule_and_decode( + column_infos: Vec>, + requested_rows: RequestedRows, + filter: FilterExpression, + column_indices: Vec, + target_schema: Arc, + config: SchedulerDecoderConfig, +) -> BoxStream<'static, ReadBatchTask> { + if requested_rows.num_rows() == 0 { + return stream::empty().boxed(); + } + // For convenience we really want this method to be a snchronous method where all + // errors happen on the stream. There is some async initialization that must happen + // when creating a scheduler. We wrap that all up in the very first task. + stream::once(create_scheduler_decoder( + column_infos, + requested_rows, + filter, + column_indices, + target_schema, + config, + )) + .map(|maybe_stream| match maybe_stream { + // If the initialization failed make it look like a failed task + Ok(stream) => stream, + Err(e) => stream::once(std::future::ready(ReadBatchTask { + num_rows: 0, + task: std::future::ready(Err(e)).boxed(), + })) + .boxed(), + }) + .flatten() + .boxed() +} + /// A decoder for single-column encodings of primitive data (this includes fixed size /// lists of primitive data) /// @@ -1470,6 +1620,7 @@ impl PriorityRange for ListPriorityRange { pub struct SchedulerContext { recv: Option>, io: Arc, + cache: Arc, name: String, path: Vec, path_names: Vec, @@ -1487,9 +1638,10 @@ impl<'a> ScopedSchedulerContext<'a> { } impl SchedulerContext { - pub fn new(io: Arc) -> Self { + pub fn new(io: Arc, cache: Arc) -> Self { Self { io, + cache, recv: None, name: "".to_string(), path: Vec::new(), @@ -1501,6 +1653,10 @@ impl SchedulerContext { &self.io } + pub fn cache(&self) -> &Arc { + &self.cache + } + pub fn push(&mut self, name: &str, index: u32) -> ScopedSchedulerContext { self.path.push(index); self.path_names.push(name.to_string()); @@ -1567,6 +1723,11 @@ impl FilterExpression { pub fn no_filter() -> Self { Self(Bytes::new()) } + + /// Returns true if the filter is the same as the [`Self::no_filter`] filter + pub fn is_noop(&self) -> bool { + self.0.is_empty() + } } /// A scheduler for a field's worth of data @@ -1594,6 +1755,12 @@ impl FilterExpression { /// /// See [`crate::decoder`] for more information pub trait FieldScheduler: Send + Sync + std::fmt::Debug { + /// Called at the beginning of scheduling to initialize the scheduler + fn initialize<'a>( + &'a self, + filter: &'a FilterExpression, + context: &'a SchedulerContext, + ) -> BoxFuture<'a, Result<()>>; /// Schedules I/O for the requested portions of the field. /// /// Note: `ranges` must be ordered and non-overlapping @@ -1711,9 +1878,17 @@ pub trait LogicalPageDecoder: std::fmt::Debug + Send { pub async fn decode_batch( batch: &EncodedBatch, filter: &FilterExpression, - field_decoder_strategy: &DecoderMiddlewareChain, + field_decoder_strategy: Arc, ) -> Result { + // The io is synchronous so it shouldn't be possible for any async stuff to still be in progress + // Still, if we just use now_or_never we hit misfires because some futures (channels) need to be + // polled twice. + let io_scheduler = Arc::new(BufferScheduler::new(batch.data.clone())) as Arc; + let cache = Arc::new(FileMetadataCache::with_capacity( + 128 * 1024 * 1024, + CapacityMode::Bytes, + )); let mut decode_scheduler = DecodeBatchScheduler::try_new( batch.schema.as_ref(), &batch.top_level_columns, @@ -1721,8 +1896,11 @@ pub async fn decode_batch( &vec![], batch.num_rows, field_decoder_strategy, - &io_scheduler, - )?; + io_scheduler.clone(), + cache, + filter, + ) + .await?; let (tx, rx) = unbounded_channel(); decode_scheduler.schedule_range(0..batch.num_rows, filter, tx, io_scheduler); #[allow(clippy::single_range_in_vec_init)] diff --git a/rust/lance-encoding/src/encoder.rs b/rust/lance-encoding/src/encoder.rs index c2afe88eaa..0a07e02991 100644 --- a/rust/lance-encoding/src/encoder.rs +++ b/rust/lance-encoding/src/encoder.rs @@ -744,6 +744,7 @@ impl BatchEncoder { /// An encoded batch of data and a page table describing it /// /// This is returned by [`crate::encoder::encode_batch`] +#[derive(Debug)] pub struct EncodedBatch { pub data: Bytes, pub page_table: Vec>, diff --git a/rust/lance-encoding/src/encodings/logical/list.rs b/rust/lance-encoding/src/encodings/logical/list.rs index f5dbd2ab92..9e6b9cd332 100644 --- a/rust/lance-encoding/src/encodings/logical/list.rs +++ b/rust/lance-encoding/src/encodings/logical/list.rs @@ -16,7 +16,7 @@ use log::trace; use snafu::{location, Location}; use tokio::task::JoinHandle; -use lance_core::{Error, Result}; +use lance_core::{cache::FileMetadataCache, Error, Result}; use crate::{ buffer::LanceBuffer, @@ -314,6 +314,7 @@ fn decode_offsets( /// /// This task does not wait for the items data. That happens on the main decode loop (unless /// we have list of list of ... in which case it happens in the outer indirect decode loop) +#[allow(clippy::too_many_arguments)] async fn indirect_schedule_task( mut offsets_decoder: Box, list_requests: Vec, @@ -321,6 +322,7 @@ async fn indirect_schedule_task( items_scheduler: Arc, items_type: DataType, io: Arc, + cache: Arc, priority: Box, ) -> Result { let num_offsets = offsets_decoder.num_rows(); @@ -357,7 +359,7 @@ async fn indirect_schedule_task( let indirect_root_scheduler = SimpleStructScheduler::new(vec![items_scheduler], root_fields.clone()); let mut indirect_scheduler = - DecodeBatchScheduler::from_scheduler(Arc::new(indirect_root_scheduler), root_fields); + DecodeBatchScheduler::from_scheduler(Arc::new(indirect_root_scheduler), root_fields, cache); let mut root_decoder = indirect_scheduler.new_root_decoder_ranges(&item_ranges); let priority = Box::new(ListPriorityRange::new(priority, offsets.clone())); @@ -440,6 +442,7 @@ impl<'a> SchedulingJob for ListFieldSchedulingJob<'a> { let items_scheduler = self.scheduler.items_scheduler.clone(); let items_type = self.scheduler.items_type.clone(); let io = context.io().clone(); + let cache = context.cache().clone(); // Immediately spawn the indirect scheduling let indirect_fut = tokio::spawn(indirect_schedule_task( @@ -449,6 +452,7 @@ impl<'a> SchedulingJob for ListFieldSchedulingJob<'a> { items_scheduler, items_type, io, + cache, priority.box_clone(), )); @@ -559,6 +563,15 @@ impl FieldScheduler for ListFieldScheduler { fn num_rows(&self) -> u64 { self.offsets_scheduler.num_rows() } + + fn initialize<'a>( + &'a self, + _filter: &'a FilterExpression, + _context: &'a SchedulerContext, + ) -> BoxFuture<'a, Result<()>> { + // 2.0 schedulers do not need to initialize + std::future::ready(Ok(())).boxed() + } } /// As soon as the first call to decode comes in we wait for all indirect I/O to diff --git a/rust/lance-encoding/src/encodings/logical/primitive.rs b/rust/lance-encoding/src/encodings/logical/primitive.rs index fb3064909f..946aee8100 100644 --- a/rust/lance-encoding/src/encodings/logical/primitive.rs +++ b/rust/lance-encoding/src/encodings/logical/primitive.rs @@ -225,6 +225,15 @@ impl FieldScheduler for PrimitiveFieldScheduler { ranges.to_vec(), ))) } + + fn initialize<'a>( + &'a self, + _filter: &'a FilterExpression, + _context: &'a SchedulerContext, + ) -> BoxFuture<'a, Result<()>> { + // 2.0 schedulers do not need to initialize + std::future::ready(Ok(())).boxed() + } } pub struct PrimitiveFieldDecoder { diff --git a/rust/lance-encoding/src/encodings/logical/struct.rs b/rust/lance-encoding/src/encodings/logical/struct.rs index 2596b72a1b..00fe3effde 100644 --- a/rust/lance-encoding/src/encodings/logical/struct.rs +++ b/rust/lance-encoding/src/encodings/logical/struct.rs @@ -9,7 +9,7 @@ use std::{ use arrow_array::{cast::AsArray, Array, ArrayRef, StructArray}; use arrow_schema::{DataType, Fields}; -use futures::{future::BoxFuture, FutureExt}; +use futures::{future::BoxFuture, stream::FuturesUnordered, FutureExt, StreamExt, TryStreamExt}; use log::trace; use snafu::{location, Location}; @@ -199,6 +199,26 @@ impl FieldScheduler for SimpleStructScheduler { fn num_rows(&self) -> u64 { self.num_rows } + + fn initialize<'a>( + &'a self, + _filter: &'a FilterExpression, + _context: &'a SchedulerContext, + ) -> BoxFuture<'a, Result<()>> { + let futures = self + .children + .iter() + .map(|child| child.initialize(_filter, _context)) + .collect::>(); + async move { + futures + .map(|res| res.map(|_| ())) + .try_collect::>() + .await?; + Ok(()) + } + .boxed() + } } #[derive(Debug)] diff --git a/rust/lance-encoding/src/testing.rs b/rust/lance-encoding/src/testing.rs index 4089291f2b..6b6c525249 100644 --- a/rust/lance-encoding/src/testing.rs +++ b/rust/lance-encoding/src/testing.rs @@ -12,7 +12,10 @@ use futures::{future::BoxFuture, FutureExt, StreamExt}; use log::{debug, trace}; use tokio::sync::mpsc::{self, UnboundedSender}; -use lance_core::Result; +use lance_core::{ + cache::{CapacityMode, FileMetadataCache}, + Result, +}; use lance_datagen::{array, gen, ArrayGenerator, RowCount, Seed}; use crate::{ @@ -115,17 +118,22 @@ async fn test_decode( schema: &Schema, column_infos: &[Arc], expected: Option>, - io: &Arc, + io: Arc, schedule_fn: impl FnOnce( DecodeBatchScheduler, UnboundedSender>, ) -> (SimpleStructDecoder, BoxFuture<'static, ()>), ) { let lance_schema = lance_core::datatypes::Schema::try_from(schema).unwrap(); - let decode_and_validate = - DecoderMiddlewareChain::new().add_strategy(Arc::new(CoreFieldDecoderStrategy { + let decode_and_validate = Arc::new(DecoderMiddlewareChain::new().add_strategy(Arc::new( + CoreFieldDecoderStrategy { validate_data: true, - })); + }, + ))); + let cache = Arc::new(FileMetadataCache::with_capacity( + 128 * 1024 * 1024, + CapacityMode::Bytes, + )); let column_indices = column_indices_from_schema(schema); let decode_scheduler = DecodeBatchScheduler::try_new( &lance_schema, @@ -133,9 +141,12 @@ async fn test_decode( column_infos, &Vec::new(), num_rows, - &decode_and_validate, + decode_and_validate, io, + cache, + &FilterExpression::no_filter(), ) + .await .unwrap(); let (tx, rx) = mpsc::unbounded_channel(); @@ -455,7 +466,7 @@ async fn check_round_trip_encoding_inner( &schema, &column_infos, concat_data.clone(), - &scheduler_copy.clone(), + scheduler_copy.clone(), |mut decode_scheduler, tx| { #[allow(clippy::single_range_in_vec_init)] let root_decoder = decode_scheduler.new_root_decoder_ranges(&[0..num_rows]); @@ -490,7 +501,7 @@ async fn check_round_trip_encoding_inner( &schema, &column_infos, expected, - &scheduler.clone(), + scheduler.clone(), |mut decode_scheduler, tx| { #[allow(clippy::single_range_in_vec_init)] let root_decoder = decode_scheduler.new_root_decoder_ranges(&[0..num_rows]); @@ -536,7 +547,7 @@ async fn check_round_trip_encoding_inner( &schema, &column_infos, expected, - &scheduler.clone(), + scheduler.clone(), |mut decode_scheduler, tx| { let root_decoder = decode_scheduler.new_root_decoder_indices(&indices); ( diff --git a/rust/lance-file/benches/reader.rs b/rust/lance-file/benches/reader.rs index 9ac8105209..1e6d613c56 100644 --- a/rust/lance-file/benches/reader.rs +++ b/rust/lance-file/benches/reader.rs @@ -8,6 +8,7 @@ use futures::StreamExt; use lance_encoding::decoder::{DecoderMiddlewareChain, FilterExpression}; use lance_file::v2::{ reader::FileReader, + testing::test_cache, writer::{FileWriter, FileWriterOptions}, }; use lance_io::{ @@ -55,7 +56,8 @@ fn bench_reader(c: &mut Criterion) { let reader = FileReader::try_open( scheduler.clone(), None, - DecoderMiddlewareChain::default(), + Arc::::default(), + &test_cache(), ) .await .unwrap(); diff --git a/rust/lance-file/src/v2/reader.rs b/rust/lance-file/src/v2/reader.rs index ef2c5c7fc6..1980f4cabc 100644 --- a/rust/lance-file/src/v2/reader.rs +++ b/rust/lance-file/src/v2/reader.rs @@ -13,14 +13,11 @@ use arrow_schema::Schema as ArrowSchema; use byteorder::{ByteOrder, LittleEndian, ReadBytesExt}; use bytes::{Bytes, BytesMut}; use deepsize::{Context, DeepSizeOf}; -use futures::{ - stream::{self, BoxStream}, - Stream, StreamExt, -}; +use futures::{stream::BoxStream, Stream, StreamExt}; use lance_encoding::{ decoder::{ - BatchDecodeStream, ColumnInfo, DecodeBatchScheduler, DecoderMiddlewareChain, - FilterExpression, PageInfo, ReadBatchTask, + schedule_and_decode, ColumnInfo, DecoderMiddlewareChain, FilterExpression, PageInfo, + ReadBatchTask, RequestedRows, SchedulerDecoderConfig, }, encoder::EncodedBatch, version::LanceFileVersion, @@ -31,6 +28,7 @@ use prost::{Message, Name}; use snafu::{location, Location}; use lance_core::{ + cache::FileMetadataCache, datatypes::{Field, Schema}, Error, Result, }; @@ -40,7 +38,6 @@ use lance_io::{ stream::{RecordBatchStream, RecordBatchStreamAdapter}, ReadBatchParams, }; -use tokio::sync::mpsc; use crate::{ datatypes::{Fields, FieldsWithMeta}, @@ -256,7 +253,8 @@ pub struct FileReader { base_projection: ReaderProjection, num_rows: u64, metadata: Arc, - decoder_strategy: DecoderMiddlewareChain, + decoder_strategy: Arc, + cache: Arc, } #[derive(Debug)] @@ -614,7 +612,8 @@ impl FileReader { pub async fn try_open( scheduler: FileScheduler, base_projection: Option, - decoder_strategy: DecoderMiddlewareChain, + decoder_strategy: Arc, + cache: &FileMetadataCache, ) -> Result { let file_metadata = Arc::new(Self::read_all_metadata(&scheduler).await?); Self::try_open_with_file_metadata( @@ -622,6 +621,7 @@ impl FileReader { base_projection, decoder_strategy, file_metadata, + cache, ) .await } @@ -630,9 +630,12 @@ impl FileReader { pub async fn try_open_with_file_metadata( scheduler: FileScheduler, base_projection: Option, - decoder_strategy: DecoderMiddlewareChain, + decoder_strategy: Arc, file_metadata: Arc, + cache: &FileMetadataCache, ) -> Result { + let cache = Arc::new(cache.with_base_path(scheduler.reader().path().clone())); + if let Some(base_projection) = base_projection.as_ref() { Self::validate_projection(base_projection, &file_metadata)?; } @@ -645,6 +648,7 @@ impl FileReader { num_rows, metadata: file_metadata, decoder_strategy, + cache, }) } @@ -668,35 +672,16 @@ impl FileReader { Ok(self.metadata.column_infos.to_vec()) } - fn check_scheduler_on_drop( - stream: BoxStream<'static, ReadBatchTask>, - scheduler_handle: tokio::task::JoinHandle<()>, - ) -> BoxStream<'static, ReadBatchTask> { - // This is a bit weird but we create an "empty stream" that unwraps the scheduler handle (which - // will panic if the scheduler panicked). This let's us check if the scheduler panicked - // when the stream finishes. - let mut scheduler_handle = Some(scheduler_handle); - let check_scheduler = stream::unfold((), move |_| { - let handle = scheduler_handle.take(); - async move { - if let Some(handle) = handle { - handle.await.unwrap(); - } - None - } - }); - stream.chain(check_scheduler).boxed() - } - #[allow(clippy::too_many_arguments)] fn do_read_range( column_infos: Vec>, - scheduler: Arc, + io: Arc, + cache: Arc, num_rows: u64, - decoder_strategy: DecoderMiddlewareChain, + decoder_strategy: Arc, range: Range, batch_size: u32, - projection: &ReaderProjection, + projection: ReaderProjection, filter: FilterExpression, ) -> Result> { debug!( @@ -708,74 +693,56 @@ impl FileReader { projection.schema.fields.len(), ); - if range.is_empty() { - return Err(Error::InvalidInput { - source: format!("Cannot read empty range {:?} from file", range).into(), - location: location!(), - }); - } - - let mut decode_scheduler = DecodeBatchScheduler::try_new( - &projection.schema, - &projection.column_indices, - &column_infos, - &vec![], - num_rows, - &decoder_strategy, - &scheduler, - )?; - - let root_decoder = decode_scheduler.new_root_decoder_ranges(&[range.clone()]); - - let (tx, rx) = mpsc::unbounded_channel(); - - let num_rows_to_read = range.end - range.start; - - let scheduler_handle = tokio::task::spawn(async move { - decode_scheduler.schedule_range(range, &filter, tx, scheduler); - }); + let config = SchedulerDecoderConfig { + batch_size, + cache, + decoder_strategy, + io, + }; - let batches = - BatchDecodeStream::new(rx, batch_size, num_rows_to_read, root_decoder).into_stream(); + let requested_rows = RequestedRows::Ranges(vec![range]); - Ok(Self::check_scheduler_on_drop(batches, scheduler_handle)) + Ok(schedule_and_decode( + column_infos, + requested_rows, + filter, + projection.column_indices, + projection.schema, + config, + )) } fn read_range( &self, range: Range, batch_size: u32, - projection: &ReaderProjection, + projection: ReaderProjection, filter: FilterExpression, ) -> Result> { - // Grab what we need to initialize the stream - let range = range.clone(); - let projection = projection.clone(); - let column_infos = self.collect_columns_from_projection(&projection)?; - let scheduler = self.scheduler.clone(); - let num_rows = self.num_rows; - let decoder_strategy = self.decoder_strategy.clone(); // Create and initialize the stream Self::do_read_range( - column_infos, - scheduler, - num_rows, - decoder_strategy, + self.collect_columns_from_projection(&projection)?, + self.scheduler.clone(), + self.cache.clone(), + self.num_rows, + self.decoder_strategy.clone(), range, batch_size, - &projection, + projection, filter, ) } + #[allow(clippy::too_many_arguments)] fn do_take_rows( column_infos: Vec>, - scheduler: Arc, - num_rows: u64, - decoder_strategy: DecoderMiddlewareChain, + io: Arc, + cache: Arc, + decoder_strategy: Arc, indices: Vec, batch_size: u32, - projection: &ReaderProjection, + projection: ReaderProjection, + filter: FilterExpression, ) -> Result> { debug!( "Taking {} rows spread across range {}..{} with batch_size {} from columns {:?}", @@ -786,53 +753,41 @@ impl FileReader { column_infos.iter().map(|ci| ci.index).collect::>() ); - let mut decode_scheduler = DecodeBatchScheduler::try_new( - &projection.schema, - &projection.column_indices, - &column_infos, - &vec![], - num_rows, - &decoder_strategy, - &scheduler, - )?; - - let root_decoder = decode_scheduler.new_root_decoder_indices(&indices); - - let (tx, rx) = mpsc::unbounded_channel(); - - let num_rows_to_read = indices.len() as u64; - - let scheduler_handle = tokio::task::spawn(async move { - decode_scheduler.schedule_take(&indices, &FilterExpression::no_filter(), tx, scheduler) - }); + let config = SchedulerDecoderConfig { + batch_size, + cache, + decoder_strategy, + io, + }; - let batches = - BatchDecodeStream::new(rx, batch_size, num_rows_to_read, root_decoder).into_stream(); + let requested_rows = RequestedRows::Indices(indices); - Ok(Self::check_scheduler_on_drop(batches, scheduler_handle)) + Ok(schedule_and_decode( + column_infos, + requested_rows, + filter, + projection.column_indices, + projection.schema, + config, + )) } fn take_rows( &self, indices: Vec, batch_size: u32, - projection: &ReaderProjection, + projection: ReaderProjection, ) -> Result> { - // Grab what we need to initialize the stream - let projection = projection.clone(); - let column_infos = self.collect_columns_from_projection(&projection)?; - let scheduler = self.scheduler.clone(); - let num_rows = self.num_rows; - let decoder_strategy = self.decoder_strategy.clone(); // Create and initialize the stream Self::do_take_rows( - column_infos, - scheduler, - num_rows, - decoder_strategy, + self.collect_columns_from_projection(&projection)?, + self.scheduler.clone(), + self.cache.clone(), + self.decoder_strategy.clone(), indices, batch_size, - &projection, + projection, + FilterExpression::no_filter(), ) } @@ -850,10 +805,10 @@ impl FileReader { &self, params: ReadBatchParams, batch_size: u32, - projection: &ReaderProjection, + projection: ReaderProjection, filter: FilterExpression, ) -> Result + Send>>> { - Self::validate_projection(projection, &self.metadata)?; + Self::validate_projection(&projection, &self.metadata)?; let verify_bound = |params: &ReadBatchParams, bound: u64, inclusive: bool| { if bound > self.num_rows || bound == self.num_rows && inclusive { Err(Error::invalid_input( @@ -939,15 +894,15 @@ impl FileReader { params: ReadBatchParams, batch_size: u32, batch_readahead: u32, - projection: &ReaderProjection, + projection: ReaderProjection, filter: FilterExpression, ) -> Result>> { + let arrow_schema = Arc::new(ArrowSchema::from(projection.schema.as_ref())); let tasks_stream = self.read_tasks(params, batch_size, projection, filter)?; let batch_stream = tasks_stream .map(|task| task.task) .buffered(batch_readahead as usize) .boxed(); - let arrow_schema = Arc::new(ArrowSchema::from(projection.schema.as_ref())); Ok(Box::pin(RecordBatchStreamAdapter::new( arrow_schema, batch_stream, @@ -970,7 +925,7 @@ impl FileReader { params, batch_size, batch_readahead, - &self.base_projection, + self.base_projection.clone(), filter, ) } @@ -1121,7 +1076,6 @@ pub mod tests { use lance_encoding::{ decoder::{decode_batch, DecodeBatchScheduler, DecoderMiddlewareChain, FilterExpression}, encoder::{encode_batch, CoreFieldEncodingStrategy, EncodedBatch, EncodingOptions}, - EncodingsIo, }; use lance_io::stream::RecordBatchStream; use log::debug; @@ -1129,7 +1083,7 @@ pub mod tests { use crate::v2::{ reader::{EncodedBatchReaderExt, FileReader, ReaderProjection}, - testing::{write_lance_file, FsFixture, WrittenFile}, + testing::{test_cache, write_lance_file, FsFixture, WrittenFile}, writer::{EncodedBatchWriteExt, FileWriter, FileWriterOptions}, }; @@ -1211,10 +1165,14 @@ pub mod tests { for read_size in [32, 1024, 1024 * 1024] { let file_scheduler = fs.scheduler.open_file(&fs.tmp_path).await.unwrap(); - let file_reader = - FileReader::try_open(file_scheduler, None, DecoderMiddlewareChain::default()) - .await - .unwrap(); + let file_reader = FileReader::try_open( + file_scheduler, + None, + Arc::::default(), + &test_cache(), + ) + .await + .unwrap(); let schema = file_reader.schema(); assert_eq!(schema.metadata.get("foo").unwrap(), "bar"); @@ -1264,7 +1222,7 @@ pub mod tests { let decoded = decode_batch( &decoded_batch, &FilterExpression::no_filter(), - &DecoderMiddlewareChain::default(), + Arc::::default(), ) .await .unwrap(); @@ -1278,7 +1236,7 @@ pub mod tests { let decoded = decode_batch( &decoded_batch, &FilterExpression::no_filter(), - &DecoderMiddlewareChain::default(), + Arc::::default(), ) .await .unwrap(); @@ -1314,7 +1272,8 @@ pub mod tests { let file_reader = FileReader::try_open( file_scheduler.clone(), None, - DecoderMiddlewareChain::default(), + Arc::::default(), + &test_cache(), ) .await .unwrap(); @@ -1328,7 +1287,7 @@ pub mod tests { lance_io::ReadBatchParams::RangeFull, 1024, 16, - &projection, + projection.clone(), FilterExpression::no_filter(), ) .unwrap(); @@ -1348,7 +1307,8 @@ pub mod tests { let file_reader = FileReader::try_open( file_scheduler.clone(), Some(projection.clone()), - DecoderMiddlewareChain::default(), + Arc::::default(), + &test_cache(), ) .await .unwrap(); @@ -1382,7 +1342,8 @@ pub mod tests { assert!(FileReader::try_open( file_scheduler.clone(), Some(empty_projection), - DecoderMiddlewareChain::default() + Arc::::default(), + &test_cache() ) .await .is_err()); @@ -1401,7 +1362,8 @@ pub mod tests { assert!(FileReader::try_open( file_scheduler.clone(), Some(projection_with_dupes), - DecoderMiddlewareChain::default() + Arc::::default(), + &test_cache() ) .await .is_err()); @@ -1418,7 +1380,8 @@ pub mod tests { let file_reader = FileReader::try_open( file_scheduler.clone(), None, - DecoderMiddlewareChain::default(), + Arc::::default(), + &test_cache(), ) .await .unwrap(); @@ -1439,7 +1402,7 @@ pub mod tests { lance_io::ReadBatchParams::RangeFull, 1024, 16, - &projection, + projection.clone(), FilterExpression::no_filter(), ) .unwrap(); @@ -1466,7 +1429,8 @@ pub mod tests { let file_reader = FileReader::try_open( file_scheduler.clone(), None, - DecoderMiddlewareChain::default(), + Arc::::default(), + &test_cache(), ) .await .unwrap(); @@ -1496,7 +1460,8 @@ pub mod tests { let file_reader = FileReader::try_open( file_scheduler.clone(), None, - DecoderMiddlewareChain::default(), + Arc::::default(), + &test_cache(), ) .await .unwrap(); @@ -1542,7 +1507,8 @@ pub mod tests { let file_reader = FileReader::try_open( file_scheduler.clone(), None, - DecoderMiddlewareChain::default(), + Arc::::default(), + &test_cache(), ) .await .unwrap(); @@ -1557,9 +1523,12 @@ pub mod tests { &column_infos, &vec![], total_rows as u64, - &DecoderMiddlewareChain::default(), - &(file_reader.scheduler.clone() as Arc), + Arc::::default(), + file_reader.scheduler.clone(), + test_cache(), + &FilterExpression::no_filter(), ) + .await .unwrap(); let range = 0..total_rows as u64; @@ -1612,7 +1581,8 @@ pub mod tests { let file_reader = FileReader::try_open( file_scheduler.clone(), None, - DecoderMiddlewareChain::default(), + Arc::::default(), + &test_cache(), ) .await .unwrap(); diff --git a/rust/lance-file/src/v2/testing.rs b/rust/lance-file/src/v2/testing.rs index 11ae8fad20..a484e85b43 100644 --- a/rust/lance-file/src/v2/testing.rs +++ b/rust/lance-file/src/v2/testing.rs @@ -6,7 +6,10 @@ use std::sync::Arc; use arrow_array::{RecordBatch, RecordBatchReader}; use arrow_schema::ArrowError; use futures::TryStreamExt; -use lance_core::datatypes::Schema; +use lance_core::{ + cache::{CapacityMode, FileMetadataCache}, + datatypes::Schema, +}; use lance_encoding::decoder::{DecoderMiddlewareChain, FilterExpression}; use lance_io::{ object_store::ObjectStore, @@ -79,13 +82,20 @@ pub async fn write_lance_file( } } +pub fn test_cache() -> Arc { + Arc::new(FileMetadataCache::with_capacity( + 128 * 1024 * 1024, + CapacityMode::Bytes, + )) +} + pub async fn read_lance_file( fs: &FsFixture, - decoder_middleware: DecoderMiddlewareChain, + decoder_middleware: Arc, filter: FilterExpression, ) -> Vec { let file_scheduler = fs.scheduler.open_file(&fs.tmp_path).await.unwrap(); - let file_reader = FileReader::try_open(file_scheduler, None, decoder_middleware) + let file_reader = FileReader::try_open(file_scheduler, None, decoder_middleware, &test_cache()) .await .unwrap(); @@ -101,7 +111,7 @@ pub async fn read_lance_file( pub async fn count_lance_file( fs: &FsFixture, - decoder_middleware: DecoderMiddlewareChain, + decoder_middleware: Arc, filter: FilterExpression, ) -> usize { read_lance_file(fs, decoder_middleware, filter) diff --git a/rust/lance-index/benches/inverted.rs b/rust/lance-index/benches/inverted.rs index c7e056636b..4a3102f0bf 100644 --- a/rust/lance-index/benches/inverted.rs +++ b/rust/lance-index/benches/inverted.rs @@ -12,6 +12,7 @@ use criterion::{black_box, criterion_group, criterion_main, Criterion}; use datafusion::physical_plan::stream::RecordBatchStreamAdapter; use futures::stream; use itertools::Itertools; +use lance_core::cache::FileMetadataCache; use lance_core::ROW_ID; use lance_index::prefilter::NoFilter; use lance_index::scalar::inverted::{InvertedIndex, InvertedIndexBuilder}; @@ -29,8 +30,13 @@ fn bench_inverted(c: &mut Criterion) { let tempdir = tempfile::tempdir().unwrap(); let index_dir = Path::from_filesystem_path(tempdir.path()).unwrap(); - let store = rt - .block_on(async { Arc::new(LanceIndexStore::new(ObjectStore::local(), index_dir, None)) }); + let store = rt.block_on(async { + Arc::new(LanceIndexStore::new( + ObjectStore::local(), + index_dir, + FileMetadataCache::no_cache(), + )) + }); let mut builder = InvertedIndexBuilder::default(); // generate 2000 different tokens diff --git a/rust/lance-index/src/scalar/inverted/builder.rs b/rust/lance-index/src/scalar/inverted/builder.rs index 782752e155..8f3c088761 100644 --- a/rust/lance-index/src/scalar/inverted/builder.rs +++ b/rust/lance-index/src/scalar/inverted/builder.rs @@ -489,6 +489,7 @@ mod tests { use arrow_array::{Array, ArrayRef, GenericStringArray, RecordBatch, UInt64Array}; use datafusion::physical_plan::stream::RecordBatchStreamAdapter; use futures::stream; + use lance_core::cache::{CapacityMode, FileMetadataCache}; use lance_core::ROW_ID_FIELD; use lance_io::object_store::ObjectStore; use object_store::path::Path; @@ -503,7 +504,8 @@ mod tests { ) -> Arc { let tempdir = tempfile::tempdir().unwrap(); let index_dir = Path::from_filesystem_path(tempdir.path()).unwrap(); - let store = LanceIndexStore::new(ObjectStore::local(), index_dir, None); + let cache = FileMetadataCache::with_capacity(128 * 1024 * 1024, CapacityMode::Bytes); + let store = LanceIndexStore::new(ObjectStore::local(), index_dir, cache); let params = super::InvertedIndexParams::default().with_position(with_position); let mut invert_index = super::InvertedIndexBuilder::new(params); diff --git a/rust/lance-index/src/scalar/lance_format.rs b/rust/lance-index/src/scalar/lance_format.rs index 95331087b2..09a6329784 100644 --- a/rust/lance-index/src/scalar/lance_format.rs +++ b/rust/lance-index/src/scalar/lance_format.rs @@ -37,7 +37,7 @@ use super::{IndexReader, IndexStore, IndexWriter}; pub struct LanceIndexStore { object_store: Arc, index_dir: Path, - metadata_cache: Option, + metadata_cache: FileMetadataCache, scheduler: Arc, use_legacy_format: bool, } @@ -55,7 +55,7 @@ impl LanceIndexStore { pub fn new( object_store: ObjectStore, index_dir: Path, - metadata_cache: Option, + metadata_cache: FileMetadataCache, ) -> Self { let object_store = Arc::new(object_store); let scheduler = ScanScheduler::new( @@ -169,7 +169,7 @@ impl IndexReader for v2::reader::FileReader { ReadBatchParams::Range(range), u32::MAX, u32::MAX, - &projection, + projection, FilterExpression::no_filter(), )? .try_collect::>() @@ -236,7 +236,8 @@ impl IndexStore for LanceIndexStore { match v2::reader::FileReader::try_open( file_scheduler, None, - DecoderMiddlewareChain::default(), + Arc::::default(), + &self.metadata_cache, ) .await { @@ -248,7 +249,7 @@ impl IndexStore for LanceIndexStore { let file_reader = FileReader::try_new_self_described( &self.object_store, &path, - self.metadata_cache.as_ref(), + Some(&self.metadata_cache), ) .await?; Ok(Arc::new(file_reader)) @@ -312,7 +313,7 @@ mod tests { use arrow_select::take::TakeOptions; use datafusion::physical_plan::SendableRecordBatchStream; use datafusion_common::ScalarValue; - use lance_core::utils::mask::RowIdTreeMap; + use lance_core::{cache::CapacityMode, utils::mask::RowIdTreeMap}; use lance_datagen::{array, gen, ArrayGeneratorExt, BatchCount, ByteCount, RowCount}; use tempfile::{tempdir, TempDir}; @@ -320,19 +321,22 @@ mod tests { let test_path: &Path = tempdir.path(); let (object_store, test_path) = ObjectStore::from_path(test_path.as_os_str().to_str().unwrap()).unwrap(); + let cache = FileMetadataCache::with_capacity(128 * 1024 * 1024, CapacityMode::Bytes); Arc::new(LanceIndexStore::new( object_store, test_path.to_owned(), - None, + cache, )) } fn legacy_test_store(tempdir: &TempDir) -> Arc { let test_path: &Path = tempdir.path(); + let cache = FileMetadataCache::with_capacity(128 * 1024 * 1024, CapacityMode::Bytes); let (object_store, test_path) = ObjectStore::from_path(test_path.as_os_str().to_str().unwrap()).unwrap(); Arc::new( - LanceIndexStore::new(object_store, test_path.to_owned(), None).with_legacy_format(true), + LanceIndexStore::new(object_store, test_path.to_owned(), cache) + .with_legacy_format(true), ) } diff --git a/rust/lance-index/src/vector/ivf/shuffler.rs b/rust/lance-index/src/vector/ivf/shuffler.rs index abbcf41fd7..f5998e4c3c 100644 --- a/rust/lance-index/src/vector/ivf/shuffler.rs +++ b/rust/lance-index/src/vector/ivf/shuffler.rs @@ -27,6 +27,7 @@ use arrow_schema::{DataType, Field, Fields}; use futures::stream::repeat_with; use futures::{stream, FutureExt, Stream, StreamExt, TryStreamExt}; use lance_arrow::RecordBatchExt; +use lance_core::cache::{CapacityMode, FileMetadataCache}; use lance_core::utils::tokio::get_num_compute_intensive_cpus; use lance_core::{datatypes::Schema, Error, Result, ROW_ID}; use lance_encoding::decoder::{DecoderMiddlewareChain, FilterExpression}; @@ -511,7 +512,11 @@ impl IvfShuffler { let scheduler_config = SchedulerConfig::max_bandwidth(&object_store); let scheduler = ScanScheduler::new(object_store.into(), scheduler_config); let file = scheduler.open_file(&path).await?; - let reader = Lancev2FileReader::try_open(file, None, Default::default()).await?; + let cache = + FileMetadataCache::with_capacity(128 * 1024 * 1024, CapacityMode::Bytes); + + let reader = + Lancev2FileReader::try_open(file, None, Default::default(), &cache).await?; let num_batches = reader.metadata().num_rows / (SHUFFLE_BATCH_SIZE as u64); total_batches.push(num_batches as usize); } @@ -559,7 +564,13 @@ impl IvfShuffler { } } else { let file = scheduler.open_file(&path).await?; - let reader = Lancev2FileReader::try_open(file, None, Default::default()).await?; + let reader = Lancev2FileReader::try_open( + file, + None, + Default::default(), + &FileMetadataCache::no_cache(), + ) + .await?; let mut stream = reader .read_stream( lance_io::ReadBatchParams::Range( @@ -624,7 +635,13 @@ impl IvfShuffler { let scheduler_config = SchedulerConfig::max_bandwidth(&object_store); let scheduler = ScanScheduler::new(Arc::new(object_store), scheduler_config); let file = scheduler.open_file(&path).await?; - let reader = Lancev2FileReader::try_open(file, None, Default::default()).await?; + let reader = Lancev2FileReader::try_open( + file, + None, + Default::default(), + &FileMetadataCache::no_cache(), + ) + .await?; reader .read_stream( lance_io::ReadBatchParams::Range( @@ -795,7 +812,8 @@ impl IvfShuffler { let reader = lance_file::v2::reader::FileReader::try_open( file_scheduler, None, - DecoderMiddlewareChain::default(), + Arc::::default(), + &FileMetadataCache::no_cache(), ) .await?; let stream = reader diff --git a/rust/lance-index/src/vector/v3/shuffler.rs b/rust/lance-index/src/vector/v3/shuffler.rs index 97ca35c2ac..b35d2d4a3a 100644 --- a/rust/lance-index/src/vector/v3/shuffler.rs +++ b/rust/lance-index/src/vector/v3/shuffler.rs @@ -12,6 +12,7 @@ use future::join_all; use futures::prelude::*; use lance_arrow::RecordBatchExt; use lance_core::{ + cache::FileMetadataCache, utils::tokio::{get_num_compute_intensive_cpus, spawn_cpu}, Error, Result, }; @@ -247,7 +248,8 @@ impl ShuffleReader for IvfShufflerReader { let reader = FileReader::try_open( self.scheduler.open_file(&partition_path).await?, None, - DecoderMiddlewareChain::default(), + Arc::::default(), + &FileMetadataCache::no_cache(), ) .await?; let schema = reader.schema().as_ref().into(); diff --git a/rust/lance/benches/scalar_index.rs b/rust/lance/benches/scalar_index.rs index 66df9c23ca..58c261ccf5 100644 --- a/rust/lance/benches/scalar_index.rs +++ b/rust/lance/benches/scalar_index.rs @@ -12,7 +12,7 @@ use criterion::{criterion_group, criterion_main, Criterion}; use datafusion::{physical_plan::SendableRecordBatchStream, scalar::ScalarValue}; use futures::TryStreamExt; use lance::{io::ObjectStore, Dataset}; -use lance_core::Result; +use lance_core::{cache::FileMetadataCache, Result}; use lance_datafusion::utils::reader_to_stream; use lance_datagen::{array, gen, BatchCount, RowCount}; use lance_index::scalar::{ @@ -65,14 +65,21 @@ impl BenchmarkFixture { let test_path = tempdir.path(); let (object_store, test_path) = ObjectStore::from_path(test_path.as_os_str().to_str().unwrap()).unwrap(); - Arc::new(LanceIndexStore::new(object_store, test_path, None)) + Arc::new(LanceIndexStore::new( + object_store, + test_path, + FileMetadataCache::no_cache(), + )) } fn legacy_test_store(tempdir: &TempDir) -> Arc { let test_path = tempdir.path(); let (object_store, test_path) = ObjectStore::from_path(test_path.as_os_str().to_str().unwrap()).unwrap(); - Arc::new(LanceIndexStore::new(object_store, test_path, None).with_legacy_format(true)) + Arc::new( + LanceIndexStore::new(object_store, test_path, FileMetadataCache::no_cache()) + .with_legacy_format(true), + ) } async fn write_baseline_data(tempdir: &TempDir) -> Arc { diff --git a/rust/lance/src/dataset/fragment.rs b/rust/lance/src/dataset/fragment.rs index 90305c0758..8780834f7f 100644 --- a/rust/lance/src/dataset/fragment.rs +++ b/rust/lance/src/dataset/fragment.rs @@ -299,7 +299,7 @@ mod v2_adapter { .read_tasks( ReadBatchParams::Range(range.start as usize..range.end as usize), batch_size, - &projection, + projection, FilterExpression::no_filter(), )? .map(|v2_task| ReadBatchTask { @@ -323,7 +323,7 @@ mod v2_adapter { .read_tasks( ReadBatchParams::RangeFull, batch_size, - &projection, + projection, FilterExpression::no_filter(), )? .map(|v2_task| ReadBatchTask { @@ -349,7 +349,7 @@ mod v2_adapter { .read_tasks( ReadBatchParams::Indices(indices), batch_size, - &projection, + projection, FilterExpression::no_filter(), )? .map(|v2_task| ReadBatchTask { @@ -454,7 +454,8 @@ impl FileFragment { let reader = v2::reader::FileReader::try_open( file_scheduler, None, - DecoderMiddlewareChain::default(), + Arc::::default(), + &dataset.session.file_metadata_cache, ) .await?; // If the schemas are not compatible we can't calculate field id offsets @@ -639,8 +640,9 @@ impl FileFragment { v2::reader::FileReader::try_open_with_file_metadata( file_scheduler, None, - DecoderMiddlewareChain::default(), + Arc::::default(), file_metadata, + &self.dataset.session.file_metadata_cache, ) .await?, ); diff --git a/rust/lance/src/dataset/index.rs b/rust/lance/src/dataset/index.rs index 1d5a15db86..56c211d4c7 100644 --- a/rust/lance/src/dataset/index.rs +++ b/rust/lance/src/dataset/index.rs @@ -80,7 +80,7 @@ impl LanceIndexStoreExt for LanceIndexStore { Self::new( dataset.object_store.as_ref().clone(), index_dir, - Some(dataset.session.file_metadata_cache.clone()), + dataset.session.file_metadata_cache.clone(), ) } } diff --git a/rust/lance/src/index.rs b/rust/lance/src/index.rs index 1918292f68..c5a45111ef 100644 --- a/rust/lance/src/index.rs +++ b/rust/lance/src/index.rs @@ -675,8 +675,13 @@ impl DatasetIndexInternalExt for Dataset { SchedulerConfig::max_bandwidth(&self.object_store), ); let file = scheduler.open_file(&index_file).await?; - let reader = - v2::reader::FileReader::try_open(file, None, Default::default()).await?; + let reader = v2::reader::FileReader::try_open( + file, + None, + Default::default(), + &self.session.file_metadata_cache, + ) + .await?; let index_metadata = reader .schema() .metadata diff --git a/rust/lance/src/index/vector/builder.rs b/rust/lance/src/index/vector/builder.rs index 2ee7e00509..1dcee9c493 100644 --- a/rust/lance/src/index/vector/builder.rs +++ b/rust/lance/src/index/vector/builder.rs @@ -8,6 +8,7 @@ use arrow_array::{RecordBatch, UInt64Array}; use futures::prelude::stream::{StreamExt, TryStreamExt}; use itertools::Itertools; use lance_arrow::RecordBatchExt; +use lance_core::cache::FileMetadataCache; use lance_core::utils::tokio::get_num_compute_intensive_cpus; use lance_core::{Error, Result, ROW_ID_FIELD}; use lance_encoding::decoder::{DecoderMiddlewareChain, FilterExpression}; @@ -499,7 +500,8 @@ impl IvfIndexBuilde let reader = FileReader::try_open( scheduler.open_file(&storage_part_path).await?, None, - DecoderMiddlewareChain::default(), + Arc::::default(), + &FileMetadataCache::no_cache(), ) .await?; let batches = reader @@ -531,7 +533,8 @@ impl IvfIndexBuilde let reader = FileReader::try_open( scheduler.open_file(&index_part_path).await?, None, - DecoderMiddlewareChain::default(), + Arc::::default(), + &FileMetadataCache::no_cache(), ) .await?; let batches = reader diff --git a/rust/lance/src/index/vector/ivf/v2.rs b/rust/lance/src/index/vector/ivf/v2.rs index 89adec3009..2a221246ce 100644 --- a/rust/lance/src/index/vector/ivf/v2.rs +++ b/rust/lance/src/index/vector/ivf/v2.rs @@ -21,6 +21,7 @@ use async_trait::async_trait; use deepsize::DeepSizeOf; use futures::prelude::stream::{self, StreamExt, TryStreamExt}; use lance_arrow::RecordBatchExt; +use lance_core::cache::FileMetadataCache; use lance_core::utils::tokio::get_num_compute_intensive_cpus; use lance_core::{cache::DEFAULT_INDEX_CACHE_SIZE, Error, Result}; use lance_encoding::decoder::{DecoderMiddlewareChain, FilterExpression}; @@ -117,12 +118,17 @@ impl IVFIndex { let scheduler_config = SchedulerConfig::max_bandwidth(&object_store); let scheduler = ScanScheduler::new(object_store, scheduler_config); + let file_metadata_cache = session + .upgrade() + .map(|sess| sess.file_metadata_cache.clone()) + .unwrap_or_else(FileMetadataCache::no_cache); let index_reader = FileReader::try_open( scheduler .open_file(&index_dir.child(uuid.as_str()).child(INDEX_FILE_NAME)) .await?, None, - DecoderMiddlewareChain::default(), + Arc::::default(), + &file_metadata_cache, ) .await?; let index_metadata: IndexMetadata = serde_json::from_str( @@ -173,7 +179,8 @@ impl IVFIndex { ) .await?, None, - DecoderMiddlewareChain::default(), + Arc::::default(), + &file_metadata_cache, ) .await?; let storage = IvfQuantizationStorage::try_new(storage_reader).await?;