diff --git a/rust/lance-encoding/src/decoder.rs b/rust/lance-encoding/src/decoder.rs index dd0394df17..552abd6801 100644 --- a/rust/lance-encoding/src/decoder.rs +++ b/rust/lance-encoding/src/decoder.rs @@ -1696,7 +1696,7 @@ pub fn create_decode_stream( } } -async fn create_scheduler_decoder( +fn create_scheduler_decoder( column_infos: Vec>, requested_rows: RequestedRows, filter: FilterExpression, @@ -1706,19 +1706,6 @@ async fn create_scheduler_decoder( ) -> Result> { let num_rows = requested_rows.num_rows(); - let mut decode_scheduler = DecodeBatchScheduler::try_new( - target_schema.as_ref(), - &column_indices, - &column_infos, - &vec![], - num_rows, - config.decoder_plugins, - config.io.clone(), - config.cache, - &filter, - ) - .await?; - let is_structural = column_infos[0].is_structural(); let (tx, rx) = mpsc::unbounded_channel(); @@ -1732,14 +1719,33 @@ async fn create_scheduler_decoder( rx, ); - let io = config.io; let scheduler_handle = tokio::task::spawn(async move { + let mut decode_scheduler = match DecodeBatchScheduler::try_new( + target_schema.as_ref(), + &column_indices, + &column_infos, + &vec![], + num_rows, + config.decoder_plugins, + config.io.clone(), + config.cache, + &filter, + ) + .await + { + Ok(scheduler) => scheduler, + Err(e) => { + let _ = tx.send(Err(e)); + return; + } + }; + match requested_rows { RequestedRows::Ranges(ranges) => { - decode_scheduler.schedule_ranges(&ranges, &filter, tx, io) + decode_scheduler.schedule_ranges(&ranges, &filter, tx, config.io) } RequestedRows::Indices(indices) => { - decode_scheduler.schedule_take(&indices, &filter, tx, io) + decode_scheduler.schedule_take(&indices, &filter, tx, config.io) } } }); @@ -1766,15 +1772,14 @@ pub fn schedule_and_decode( // For convenience we really want this method to be a snchronous method where all // errors happen on the stream. There is some async initialization that must happen // when creating a scheduler. We wrap that all up in the very first task. - stream::once(create_scheduler_decoder( + match create_scheduler_decoder( column_infos, requested_rows, filter, column_indices, target_schema, config, - )) - .map(|maybe_stream| match maybe_stream { + ) { // If the initialization failed make it look like a failed task Ok(stream) => stream, Err(e) => stream::once(std::future::ready(ReadBatchTask { @@ -1782,9 +1787,7 @@ pub fn schedule_and_decode( task: std::future::ready(Err(e)).boxed(), })) .boxed(), - }) - .flatten() - .boxed() + } } /// A decoder for single-column encodings of primitive data (this includes fixed size