From 8a7806b7c2afdfe6be6cbfa36e020f81f0a08190 Mon Sep 17 00:00:00 2001 From: Weston Pace Date: Fri, 20 Sep 2024 13:58:25 -0700 Subject: [PATCH] Fix unit tests --- python/src/file.rs | 12 ++- rust/lance-datafusion/src/substrait.rs | 39 ++++++---- rust/lance-encoding-datafusion/src/zone.rs | 77 ++++++++++++------- rust/lance-encoding/benches/decoder.rs | 66 ++++++++-------- rust/lance-encoding/src/decoder.rs | 45 +++++++---- rust/lance-encoding/src/encoder.rs | 1 + .../src/encodings/logical/list.rs | 2 +- .../src/encodings/logical/primitive.rs | 2 +- .../src/encodings/logical/struct.rs | 19 ++++- rust/lance-encoding/src/testing.rs | 1 + rust/lance-file/src/v2/reader.rs | 3 + 11 files changed, 169 insertions(+), 98 deletions(-) diff --git a/python/src/file.rs b/python/src/file.rs index 938428b9b9..9d14d13ff0 100644 --- a/python/src/file.rs +++ b/python/src/file.rs @@ -19,6 +19,7 @@ use arrow_schema::Schema as ArrowSchema; use bytes::Bytes; use futures::stream::StreamExt; use lance::io::{ObjectStore, RecordBatchStream}; +use lance_core::cache::FileMetadataCache; use lance_encoding::decoder::{DecoderMiddlewareChain, FilterExpression}; use lance_file::{ v2::{ @@ -331,9 +332,14 @@ impl LanceFileReader { }, ); let file = scheduler.open_file(&path).await.infer_error()?; - let inner = FileReader::try_open(file, None, Arc::::default()) - .await - .infer_error()?; + let inner = FileReader::try_open( + file, + None, + Arc::::default(), + &FileMetadataCache::no_cache(), + ) + .await + .infer_error()?; Ok(Self { inner: Arc::new(inner), }) diff --git a/rust/lance-datafusion/src/substrait.rs b/rust/lance-datafusion/src/substrait.rs index a2db60b871..7c86479622 100644 --- a/rust/lance-datafusion/src/substrait.rs +++ b/rust/lance-datafusion/src/substrait.rs @@ -271,22 +271,31 @@ pub async fn parse_substrait(expr: &[u8], input_schema: Arc) -> Result col_info, } @@ -185,6 +186,13 @@ struct ZoneMap { items: Vec<(Expr, NullableInterval)>, } +#[derive(Debug)] +struct InitializedState { + zone_maps: Vec, + filter: Option, + df_schema: Option, +} + /// A top level scheduler that refines the requested range based on /// pushdown filtering with zone maps #[derive(Debug)] @@ -195,9 +203,7 @@ pub struct ZoneMapsFieldScheduler { pushdown_buffers: HashMap, rows_per_zone: u32, num_rows: u64, - zone_maps: Vec, - filter: Option, - df_schema: Option, + initialized_state: Mutex>, } impl ZoneMapsFieldScheduler { @@ -215,9 +221,7 @@ impl ZoneMapsFieldScheduler { rows_per_zone, num_rows, // These are set during initialization - zone_maps: Vec::new(), - filter: None, - df_schema: None, + initialized_state: Mutex::new(None), } } @@ -233,13 +237,14 @@ impl ZoneMapsFieldScheduler { .map(|pushdown| pushdown.position..pushdown.position + pushdown.size) .collect(); let buffers = io.submit_request(ranges, 0).await?; - let maps = buffers - .into_iter() - .zip(pushdowns.iter()) - .map(|(buffer, pushdown)| { - self.parse_zone(buffer, &pushdown.data_type, &pushdown.column) - }) - .collect::>>()?; + let mut maps = Vec::new(); + for (buffer, pushdown) in buffers.into_iter().zip(pushdowns.iter()) { + // There's no point in running this in parallel since it's actually synchronous + let map = self + .parse_zone(buffer, &pushdown.data_type, &pushdown.column) + .await?; + maps.push(map); + } // A this point each item in `maps` is a vector of guarantees for a single field // We need to transpose this so that each item is a vector of guarantees for a single zone let zone_maps = transpose2(maps) @@ -269,11 +274,15 @@ impl ZoneMapsFieldScheduler { } async fn do_initialize( - &mut self, + &self, io: &dyn EncodingsIo, cache: &FileMetadataCache, filter: &FilterExpression, ) -> Result<()> { + if filter.is_noop() { + return Ok(()); + } + let arrow_schema = ArrowSchema::from(self.schema.as_ref()); let df_schema = DFSchema::try_from(arrow_schema.clone())?; let df_filter = filter.substrait_to_df(Arc::new(arrow_schema))?; @@ -281,21 +290,31 @@ impl ZoneMapsFieldScheduler { let columns = Planner::column_names_in_expr(&df_filter); let referenced_schema = self.schema.project(&columns)?; - self.df_schema = Some(Arc::new(df_schema)); - self.zone_maps = self.load_maps(io, cache, &referenced_schema).await?; - self.filter = Some(df_filter); + let df_schema = Some(Arc::new(df_schema)); + let zone_maps = self.load_maps(io, cache, &referenced_schema).await?; + let filter = Some(df_filter); + + let state = InitializedState { + zone_maps, + filter, + df_schema, + }; + let mut initialized_state = self.initialized_state.lock().unwrap(); + *initialized_state = Some(state); Ok(()) } fn create_filter(&self) -> Result bool + '_> { Ok(move |zone_idx| { - let zone_map = &self.zone_maps[zone_idx as usize]; + let state = self.initialized_state.lock().unwrap(); + let state = state.as_ref().unwrap(); + let zone_map = &state.zone_maps[zone_idx as usize]; let props = ExecutionProps::new(); let context = - SimplifyContext::new(&props).with_schema(self.df_schema.as_ref().unwrap().clone()); + SimplifyContext::new(&props).with_schema(state.df_schema.as_ref().unwrap().clone()); let mut simplifier = ExprSimplifier::new(context); simplifier = simplifier.with_guarantees(zone_map.items.clone()); - match simplifier.simplify(self.filter.as_ref().unwrap().clone()) { + match simplifier.simplify(state.filter.as_ref().unwrap().clone()) { Ok(expr) => match expr { // Predicate, given guarantees, is always false, we can skip the zone Expr::Literal(ScalarValue::Boolean(Some(false))) => false, @@ -350,7 +369,7 @@ impl ZoneMapsFieldScheduler { guarantees } - fn parse_zone( + async fn parse_zone( &self, buffer: Bytes, data_type: &DataType, @@ -367,7 +386,8 @@ impl ZoneMapsFieldScheduler { &zone_maps_batch, &FilterExpression::no_filter(), Arc::::default(), - )?; + ) + .await?; Ok(Self::extract_guarantees( &zone_maps_batch, @@ -419,7 +439,7 @@ impl SchedulingJob for EmptySchedulingJob { impl FieldScheduler for ZoneMapsFieldScheduler { fn initialize<'a>( - &'a mut self, + &'a self, filter: &'a FilterExpression, context: &'a SchedulerContext, ) -> BoxFuture<'a, Result<()>> { @@ -435,6 +455,9 @@ impl FieldScheduler for ZoneMapsFieldScheduler { ranges: &[std::ops::Range], filter: &FilterExpression, ) -> Result> { + if filter.is_noop() { + return self.inner.schedule_ranges(ranges, filter); + } let zone_filter_fn = self.create_filter()?; let zone_filter = ZoneMapsFilter::new(zone_filter_fn, self.rows_per_zone as u64); let ranges = zone_filter.refine_ranges(ranges); diff --git a/rust/lance-encoding/benches/decoder.rs b/rust/lance-encoding/benches/decoder.rs index 99e6de627e..7c22bb71e1 100644 --- a/rust/lance-encoding/benches/decoder.rs +++ b/rust/lance-encoding/benches/decoder.rs @@ -84,12 +84,13 @@ fn bench_decode(c: &mut Criterion) { let func_name = format!("{:?}", data_type).to_lowercase(); group.bench_function(func_name, |b| { b.iter(|| { - let batch = lance_encoding::decoder::decode_batch( - &encoded, - &FilterExpression::no_filter(), - Arc::::default(), - ) - .unwrap(); + let batch = rt + .block_on(lance_encoding::decoder::decode_batch( + &encoded, + &FilterExpression::no_filter(), + Arc::::default(), + )) + .unwrap(); assert_eq!(data.num_rows(), batch.num_rows()); }) }); @@ -122,12 +123,13 @@ fn bench_decode_fsl(c: &mut Criterion) { let func_name = format!("{:?}", data_type).to_lowercase(); group.bench_function(func_name, |b| { b.iter(|| { - let batch = lance_encoding::decoder::decode_batch( - &encoded, - &FilterExpression::no_filter(), - Arc::::default(), - ) - .unwrap(); + let batch = rt + .block_on(lance_encoding::decoder::decode_batch( + &encoded, + &FilterExpression::no_filter(), + Arc::::default(), + )) + .unwrap(); assert_eq!(data.num_rows(), batch.num_rows()); }) }); @@ -177,12 +179,13 @@ fn bench_decode_str_with_dict_encoding(c: &mut Criterion) { let func_name = format!("{:?}", data_type).to_lowercase(); group.bench_function(func_name, |b| { b.iter(|| { - let batch = lance_encoding::decoder::decode_batch( - &encoded, - &FilterExpression::no_filter(), - Arc::::default(), - ) - .unwrap(); + let batch = rt + .block_on(lance_encoding::decoder::decode_batch( + &encoded, + &FilterExpression::no_filter(), + Arc::::default(), + )) + .unwrap(); assert_eq!(data.num_rows(), batch.num_rows()); }) }); @@ -215,7 +218,6 @@ fn bench_decode_packed_struct(c: &mut Criterion) { .iter() .map(|field| { if matches!(field.data_type(), &DataType::Struct(_)) { - println!("Match"); let mut metadata = HashMap::new(); metadata.insert("packed".to_string(), "true".to_string()); let field = @@ -246,12 +248,13 @@ fn bench_decode_packed_struct(c: &mut Criterion) { let func_name = "struct"; group.bench_function(func_name, |b| { b.iter(|| { - let batch = lance_encoding::decoder::decode_batch( - &encoded, - &FilterExpression::no_filter(), - Arc::::default(), - ) - .unwrap(); + let batch = rt + .block_on(lance_encoding::decoder::decode_batch( + &encoded, + &FilterExpression::no_filter(), + Arc::::default(), + )) + .unwrap(); assert_eq!(data.num_rows(), batch.num_rows()); }) }); @@ -293,12 +296,13 @@ fn bench_decode_str_with_fixed_size_binary_encoding(c: &mut Criterion) { let func_name = "fixed-utf8".to_string(); group.bench_function(func_name, |b| { b.iter(|| { - let batch = lance_encoding::decoder::decode_batch( - &encoded, - &FilterExpression::no_filter(), - Arc::::default(), - ) - .unwrap(); + let batch = rt + .block_on(lance_encoding::decoder::decode_batch( + &encoded, + &FilterExpression::no_filter(), + Arc::::default(), + )) + .unwrap(); assert_eq!(data.num_rows(), batch.num_rows()); }) }); diff --git a/rust/lance-encoding/src/decoder.rs b/rust/lance-encoding/src/decoder.rs index 4b34e89d9f..8229a7ac27 100644 --- a/rust/lance-encoding/src/decoder.rs +++ b/rust/lance-encoding/src/decoder.rs @@ -888,7 +888,9 @@ impl DecodeBatchScheduler { decoder_strategy: Arc, io: Arc, cache: Arc, + filter: &FilterExpression, ) -> Result { + assert!(num_rows > 0); let buffers = FileBuffers { positions_and_sizes: file_buffer_positions_and_sizes, }; @@ -904,14 +906,22 @@ impl DecodeBatchScheduler { let mut column_iter = ColumnInfoIter::new(columns, &adjusted_column_indices); let root_type = DataType::Struct(root_fields.clone()); let mut root_field = Field::try_from(&ArrowField::new("root", root_type, false))?; + // root_field.children and schema.fields should be identical at this point but the latter + // has field ids and the former does not. This line restores that. + // TODO: Is there another way to create the root field without forcing a trip through arrow? + root_field.children = schema.fields.clone(); root_field .metadata .insert("__lance_decoder_root".to_string(), "true".to_string()); let (_, root_scheduler) = decoder_strategy - .cursor(io) + .cursor(io.clone()) .start(&root_field, &mut column_iter, buffers)?; let root_scheduler = root_scheduler?; + + let context = SchedulerContext::new(io, cache.clone()); + root_scheduler.initialize(filter, &context).await?; + Ok(Self { root_scheduler, root_fields, @@ -1356,6 +1366,7 @@ async fn create_scheduler_decoder( config.decoder_strategy, config.io.clone(), config.cache, + &filter, ) .await?; @@ -1397,6 +1408,9 @@ pub fn schedule_and_decode( target_schema: Arc, config: SchedulerDecoderConfig, ) -> BoxStream<'static, ReadBatchTask> { + if requested_rows.num_rows() == 0 { + return stream::empty().boxed(); + } // For convenience we really want this method to be a snchronous method where all // errors happen on the stream. There is some async initialization that must happen // when creating a scheduler. We wrap that all up in the very first task. @@ -1709,6 +1723,11 @@ impl FilterExpression { pub fn no_filter() -> Self { Self(Bytes::new()) } + + /// Returns true if the filter is the same as the [`Self::no_filter`] filter + pub fn is_noop(&self) -> bool { + self.0.is_empty() + } } /// A scheduler for a field's worth of data @@ -1738,7 +1757,7 @@ impl FilterExpression { pub trait FieldScheduler: Send + Sync + std::fmt::Debug { /// Called at the beginning of scheduling to initialize the scheduler fn initialize<'a>( - &'a mut self, + &'a self, filter: &'a FilterExpression, context: &'a SchedulerContext, ) -> BoxFuture<'a, Result<()>>; @@ -1856,11 +1875,15 @@ pub trait LogicalPageDecoder: std::fmt::Debug + Send { } /// Decodes a batch of data from an in-memory structure created by [`crate::encoder::encode_batch`] -pub fn decode_batch( +pub async fn decode_batch( batch: &EncodedBatch, filter: &FilterExpression, field_decoder_strategy: Arc, ) -> Result { + // The io is synchronous so it shouldn't be possible for any async stuff to still be in progress + // Still, if we just use now_or_never we hit misfires because some futures (channels) need to be + // polled twice. + let io_scheduler = Arc::new(BufferScheduler::new(batch.data.clone())) as Arc; let cache = Arc::new(FileMetadataCache::with_capacity( 128 * 1024 * 1024, @@ -1875,23 +1898,13 @@ pub fn decode_batch( field_decoder_strategy, io_scheduler.clone(), cache, + filter, ) - // The io is synchronous so it shouldn't be possible for any async stuff to still be in progress - // and we use a lot of now_or_never instead - .now_or_never() - .unwrap()?; + .await?; let (tx, rx) = unbounded_channel(); decode_scheduler.schedule_range(0..batch.num_rows, filter, tx, io_scheduler); #[allow(clippy::single_range_in_vec_init)] let root_decoder = decode_scheduler.new_root_decoder_ranges(&[0..batch.num_rows]); let stream = BatchDecodeStream::new(rx, batch.num_rows as u32, batch.num_rows, root_decoder); - stream - .into_stream() - .next() - .now_or_never() - .unwrap() - .unwrap() - .task - .now_or_never() - .unwrap() + stream.into_stream().next().await.unwrap().task.await } diff --git a/rust/lance-encoding/src/encoder.rs b/rust/lance-encoding/src/encoder.rs index c2afe88eaa..0a07e02991 100644 --- a/rust/lance-encoding/src/encoder.rs +++ b/rust/lance-encoding/src/encoder.rs @@ -744,6 +744,7 @@ impl BatchEncoder { /// An encoded batch of data and a page table describing it /// /// This is returned by [`crate::encoder::encode_batch`] +#[derive(Debug)] pub struct EncodedBatch { pub data: Bytes, pub page_table: Vec>, diff --git a/rust/lance-encoding/src/encodings/logical/list.rs b/rust/lance-encoding/src/encodings/logical/list.rs index 97a3f51063..9e6b9cd332 100644 --- a/rust/lance-encoding/src/encodings/logical/list.rs +++ b/rust/lance-encoding/src/encodings/logical/list.rs @@ -565,7 +565,7 @@ impl FieldScheduler for ListFieldScheduler { } fn initialize<'a>( - &'a mut self, + &'a self, _filter: &'a FilterExpression, _context: &'a SchedulerContext, ) -> BoxFuture<'a, Result<()>> { diff --git a/rust/lance-encoding/src/encodings/logical/primitive.rs b/rust/lance-encoding/src/encodings/logical/primitive.rs index bd09daf969..946aee8100 100644 --- a/rust/lance-encoding/src/encodings/logical/primitive.rs +++ b/rust/lance-encoding/src/encodings/logical/primitive.rs @@ -227,7 +227,7 @@ impl FieldScheduler for PrimitiveFieldScheduler { } fn initialize<'a>( - &'a mut self, + &'a self, _filter: &'a FilterExpression, _context: &'a SchedulerContext, ) -> BoxFuture<'a, Result<()>> { diff --git a/rust/lance-encoding/src/encodings/logical/struct.rs b/rust/lance-encoding/src/encodings/logical/struct.rs index 191a9566ca..00fe3effde 100644 --- a/rust/lance-encoding/src/encodings/logical/struct.rs +++ b/rust/lance-encoding/src/encodings/logical/struct.rs @@ -9,7 +9,7 @@ use std::{ use arrow_array::{cast::AsArray, Array, ArrayRef, StructArray}; use arrow_schema::{DataType, Fields}; -use futures::{future::BoxFuture, FutureExt}; +use futures::{future::BoxFuture, stream::FuturesUnordered, FutureExt, StreamExt, TryStreamExt}; use log::trace; use snafu::{location, Location}; @@ -201,12 +201,23 @@ impl FieldScheduler for SimpleStructScheduler { } fn initialize<'a>( - &'a mut self, + &'a self, _filter: &'a FilterExpression, _context: &'a SchedulerContext, ) -> BoxFuture<'a, Result<()>> { - // 2.0 schedulers do not need to initialize - std::future::ready(Ok(())).boxed() + let futures = self + .children + .iter() + .map(|child| child.initialize(_filter, _context)) + .collect::>(); + async move { + futures + .map(|res| res.map(|_| ())) + .try_collect::>() + .await?; + Ok(()) + } + .boxed() } } diff --git a/rust/lance-encoding/src/testing.rs b/rust/lance-encoding/src/testing.rs index f8268a98e1..6b6c525249 100644 --- a/rust/lance-encoding/src/testing.rs +++ b/rust/lance-encoding/src/testing.rs @@ -144,6 +144,7 @@ async fn test_decode( decode_and_validate, io, cache, + &FilterExpression::no_filter(), ) .await .unwrap(); diff --git a/rust/lance-file/src/v2/reader.rs b/rust/lance-file/src/v2/reader.rs index 6e15d9adf3..1980f4cabc 100644 --- a/rust/lance-file/src/v2/reader.rs +++ b/rust/lance-file/src/v2/reader.rs @@ -1224,6 +1224,7 @@ pub mod tests { &FilterExpression::no_filter(), Arc::::default(), ) + .await .unwrap(); assert_eq!(data, decoded); @@ -1237,6 +1238,7 @@ pub mod tests { &FilterExpression::no_filter(), Arc::::default(), ) + .await .unwrap(); assert_eq!(data, decoded); @@ -1524,6 +1526,7 @@ pub mod tests { Arc::::default(), file_reader.scheduler.clone(), test_cache(), + &FilterExpression::no_filter(), ) .await .unwrap();