Skip to content

Commit

Permalink
fix: fix performance regression introduced during reader refactor (#3170
Browse files Browse the repository at this point in the history
)

In #2913 we added an initialization
routine to the scheduler. Unfortunately, this caused a problem. It put
`schedule_ranges` behind an await which means that we wouldn't schedule
ranges for all files in a scan immediately. Instead, we wouldn't begin
scheduling file X until we had reached nearly the last batch of file
X-1. This introduced stuttering in the read path and had a significant
effect on performance.

The fix here moves the initialization into the dedicated scheduler
thread and keeps the creation of the scheduler a synchronous task which
is actually a bit simpler anyways I think.
  • Loading branch information
westonpace authored Nov 25, 2024
1 parent bfd8ec9 commit d196ab8
Showing 1 changed file with 26 additions and 23 deletions.
49 changes: 26 additions & 23 deletions rust/lance-encoding/src/decoder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1696,7 +1696,7 @@ pub fn create_decode_stream(
}
}

async fn create_scheduler_decoder(
fn create_scheduler_decoder(
column_infos: Vec<Arc<ColumnInfo>>,
requested_rows: RequestedRows,
filter: FilterExpression,
Expand All @@ -1706,19 +1706,6 @@ async fn create_scheduler_decoder(
) -> Result<BoxStream<'static, ReadBatchTask>> {
let num_rows = requested_rows.num_rows();

let mut decode_scheduler = DecodeBatchScheduler::try_new(
target_schema.as_ref(),
&column_indices,
&column_infos,
&vec![],
num_rows,
config.decoder_plugins,
config.io.clone(),
config.cache,
&filter,
)
.await?;

let is_structural = column_infos[0].is_structural();

let (tx, rx) = mpsc::unbounded_channel();
Expand All @@ -1732,14 +1719,33 @@ async fn create_scheduler_decoder(
rx,
);

let io = config.io;
let scheduler_handle = tokio::task::spawn(async move {
let mut decode_scheduler = match DecodeBatchScheduler::try_new(
target_schema.as_ref(),
&column_indices,
&column_infos,
&vec![],
num_rows,
config.decoder_plugins,
config.io.clone(),
config.cache,
&filter,
)
.await
{
Ok(scheduler) => scheduler,
Err(e) => {
let _ = tx.send(Err(e));
return;
}
};

match requested_rows {
RequestedRows::Ranges(ranges) => {
decode_scheduler.schedule_ranges(&ranges, &filter, tx, io)
decode_scheduler.schedule_ranges(&ranges, &filter, tx, config.io)
}
RequestedRows::Indices(indices) => {
decode_scheduler.schedule_take(&indices, &filter, tx, io)
decode_scheduler.schedule_take(&indices, &filter, tx, config.io)
}
}
});
Expand All @@ -1766,25 +1772,22 @@ pub fn schedule_and_decode(
// For convenience we really want this method to be a snchronous method where all
// errors happen on the stream. There is some async initialization that must happen
// when creating a scheduler. We wrap that all up in the very first task.
stream::once(create_scheduler_decoder(
match create_scheduler_decoder(
column_infos,
requested_rows,
filter,
column_indices,
target_schema,
config,
))
.map(|maybe_stream| match maybe_stream {
) {
// If the initialization failed make it look like a failed task
Ok(stream) => stream,
Err(e) => stream::once(std::future::ready(ReadBatchTask {
num_rows: 0,
task: std::future::ready(Err(e)).boxed(),
}))
.boxed(),
})
.flatten()
.boxed()
}
}

/// A decoder for single-column encodings of primitive data (this includes fixed size
Expand Down

0 comments on commit d196ab8

Please sign in to comment.