Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: reconnect pushdown to v2 #2913

Merged
merged 3 commits into from
Sep 27, 2024

Conversation

westonpace
Copy link
Contributor

@westonpace westonpace commented Sep 19, 2024

There is still a bit more testing work to do before pushdown is fully supported in v2 and until we start using LanceDfFieldDecoderStrategy in the file reader it won't be accessible to users. However, this PR has a number of structural refactors for v2 and is big enough as it is.

This adds a cache to the v2 schedulers. This is needed in this PR because we want to use the cache to store zone maps. However, it will be needed in future 2.1 work as well because we want to cache things like "rows per chunk" and "dictionaries".

This adds an initialization routine to v2 schedulers. Again, this is needed for zone maps but will also be used by 2.1 features.

Lastly, this PR does, in fact, reconnect the zone maps feature, restoring blocks that had been commented out.

@westonpace westonpace marked this pull request as draft September 19, 2024 14:43
@github-actions github-actions bot added enhancement New feature or request python labels Sep 19, 2024
@westonpace
Copy link
Contributor Author

Leaving in draft as I have a few tests to get passing but I wanted a CI run.

@westonpace westonpace force-pushed the feat/reconnect-pushdown branch from 866182d to 8a7806b Compare September 20, 2024 20:59
@codecov-commenter
Copy link

codecov-commenter commented Sep 20, 2024

Codecov Report

Attention: Patch coverage is 87.33553% with 77 lines in your changes missing coverage. Please review.

Please upload report for BASE (main@7cc14d9). Learn more about missing BASE report.

Files with missing lines Patch % Lines
rust/lance-index/src/vector/ivf/shuffler.rs 10.00% 18 Missing ⚠️
rust/lance-encoding/src/decoder.rs 89.44% 11 Missing and 6 partials ⚠️
rust/lance-encoding-datafusion/src/zone.rs 90.22% 4 Missing and 9 partials ⚠️
rust/lance-core/src/cache.rs 80.76% 6 Missing and 4 partials ⚠️
rust/lance-core/src/utils/path.rs 0.00% 7 Missing ⚠️
rust/lance-file/src/v2/reader.rs 95.34% 0 Missing and 4 partials ⚠️
rust/lance-datafusion/src/substrait.rs 90.00% 1 Missing and 1 partial ⚠️
rust/lance-encoding-datafusion/src/substrait.rs 50.00% 1 Missing and 1 partial ⚠️
rust/lance-encoding-datafusion/src/lib.rs 95.00% 0 Missing and 1 partial ⚠️
...ust/lance-encoding/src/encodings/logical/struct.rs 94.73% 1 Missing ⚠️
... and 2 more
Additional details and impacted files
@@           Coverage Diff           @@
##             main    #2913   +/-   ##
=======================================
  Coverage        ?   78.52%           
=======================================
  Files           ?      232           
  Lines           ?    70897           
  Branches        ?    70897           
=======================================
  Hits            ?    55674           
  Misses          ?    12271           
  Partials        ?     2952           
Flag Coverage Δ
unittests 78.52% <87.33%> (?)

Flags with carried forward coverage won't be shown. Click here to find out more.

☔ View full report in Codecov by Sentry.
📢 Have feedback on the report? Share it here.

@westonpace westonpace marked this pull request as ready for review September 20, 2024 21:27
@@ -48,38 +49,104 @@ impl SizedRecord {
/// The cache is keyed by the file path and the type of metadata.
#[derive(Clone, Debug)]
pub struct FileMetadataCache {
cache: Arc<Cache<(Path, TypeId), SizedRecord>>,
cache: Option<Arc<Cache<(Path, TypeId), SizedRecord>>>,
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There were (and perhaps still are) a lot of spots that use Option<Cache> with None to represent "no cache". However, then you need special logic like...

let thing = if let Some(cache) {
  cache.find_or_insert(key, load_fn)
} else {
  load_fn()
}

Instead, I made it so you can cheaply create a FileMetadataCache::no_cache() which just never inserts anything.

Comment on lines +70 to +73
pub enum CapacityMode {
Items,
Bytes,
}
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I can back this out. I didn't really end up using it. At one point I thought there might be a dedicated cache for the v2 stuff (and we could prototype bytes mode with it) and then I ended up deciding to just use the existing dataset cache. Still, I do think we want to move that cache to bytes mode at some point but I didn't want to make this a breaking change.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

My recent debugging experiences have made me really want to push on doing size-based (bytes-based) eviction. Doing based on items isn't great for things that can be very large, like index partitions.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Agreed. Size based is much easier for users to wrap their heads around. Maybe we can switch everything over at some point. Though we will want some benchmarking (if its only calculating size on insert then it shouldn't be too terribly expensive but if it is recalculating size for existing objects every time something new gets inserted then it could be expensive).

use object_store::path::Path;

pub trait LancePathExt {
fn child_path(&self, path: &Path) -> Path;
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I couldn't figure out how to join two object_store::Path. If I missed something let me know.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think there is.

Comment on lines +274 to +275
let (substrait_schema, input_schema) =
if envelope.base_schema.as_ref().unwrap().r#struct.is_some() {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The zone maps / substrait / datafusion stuff is a bit of a hack atm. I am trying really hard to avoid datafusion being a dependency of the file reader. Unfortunately, that means we need something to represent a filter expression in the file reader and so I'm using substrait.

However, lance (top level / dataset package) IS using datafusion for its filters. So this means we go from dataset filter (datafusion) to substrait and then (in the decoder plugin) back to a datafusion filter.

Ideally there would just be a lightweight expressions library I could use and, once I figure out cloning, it might be fun to build one.

In the meantime this hack is because we need to encode DF filters into substrait somehow and, when I do so (in encode_substrait above) I've being lazy and not including the r#struct field which we're using here to detect if extension types are used.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ideally there would just be a lightweight expressions library I could use

One could argue that DataFusion's goal is to be modular like that, and datafusion-expr could serve this purpose. However, I think the current list of dependencies make it not so lightweight: https://crates.io/crates/datafusion-expr/42.0.0/dependencies

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm, if we can make sqlparser optional and get rid of arrow (arrow-buffer and arrow-array are fine) then it would probably be fine (I don't know how big those datafusion-...-common crates are but hopefully they're pretty small). I'd also want to drag the substrait expression parsing into the crate (under a feature flag probably). I'll add it to my potential list of xmas break projects 😆

@@ -467,14 +466,14 @@ impl<'a> DecoderMiddlewareChainCursor<'a> {
}

pub struct ColumnInfoIter<'a> {
column_infos: &'a [ColumnInfo],
column_infos: Vec<Arc<ColumnInfo>>,
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I need the decoder to make a copy of the column metadata because it unwraps it (peels off the zone info) during decoding.

Comment on lines +1330 to +1437
let check_scheduler = stream::unfold((), move |_| {
let handle = scheduler_handle.take();
async move {
if let Some(handle) = handle {
handle.await.unwrap();
}
None
}
});
stream.chain(check_scheduler).boxed()
}

async fn create_scheduler_decoder(
column_infos: Vec<Arc<ColumnInfo>>,
requested_rows: RequestedRows,
filter: FilterExpression,
column_indices: Vec<u32>,
target_schema: Arc<Schema>,
config: SchedulerDecoderConfig,
) -> Result<BoxStream<'static, ReadBatchTask>> {
let num_rows = requested_rows.num_rows();

let mut decode_scheduler = DecodeBatchScheduler::try_new(
target_schema.as_ref(),
&column_indices,
&column_infos,
&vec![],
num_rows,
config.decoder_strategy,
config.io.clone(),
config.cache,
&filter,
)
.await?;

let root_decoder = match &requested_rows {
RequestedRows::Ranges(ranges) => decode_scheduler.new_root_decoder_ranges(ranges),
RequestedRows::Indices(indices) => decode_scheduler.new_root_decoder_indices(indices),
};

let (tx, rx) = mpsc::unbounded_channel();

let io = config.io;
let scheduler_handle = tokio::task::spawn(async move {
match requested_rows {
RequestedRows::Ranges(ranges) => {
decode_scheduler.schedule_ranges(&ranges, &filter, tx, io)
}
RequestedRows::Indices(indices) => {
decode_scheduler.schedule_take(&indices, &filter, tx, io)
}
}
});

let decode_stream =
BatchDecodeStream::new(rx, config.batch_size, num_rows, root_decoder).into_stream();

Ok(check_scheduler_on_drop(decode_stream, scheduler_handle))
}

/// Launches a scheduler on a dedicated (spawned) task and creates a decoder to
/// decode the scheduled data and returns the decoder as a stream of record batches.
///
/// This is a convenience function that creates both the scheduler and the decoder
/// which can be a little tricky to get right.
pub fn schedule_and_decode(
column_infos: Vec<Arc<ColumnInfo>>,
requested_rows: RequestedRows,
filter: FilterExpression,
column_indices: Vec<u32>,
target_schema: Arc<Schema>,
config: SchedulerDecoderConfig,
) -> BoxStream<'static, ReadBatchTask> {
if requested_rows.num_rows() == 0 {
return stream::empty().boxed();
}
// For convenience we really want this method to be a snchronous method where all
// errors happen on the stream. There is some async initialization that must happen
// when creating a scheduler. We wrap that all up in the very first task.
stream::once(create_scheduler_decoder(
column_infos,
requested_rows,
filter,
column_indices,
target_schema,
config,
))
.map(|maybe_stream| match maybe_stream {
// If the initialization failed make it look like a failed task
Ok(stream) => stream,
Err(e) => stream::once(std::future::ready(ReadBatchTask {
num_rows: 0,
task: std::future::ready(Err(e)).boxed(),
}))
.boxed(),
})
.flatten()
.boxed()
}

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This code is mostly a port of code that was in FileReader (there were actually two copies of it in FileReader). It's been slightly changed to now call initialize after creating the DecodeBatchScheduler.

Copy link
Contributor

@wjones127 wjones127 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Excited to have this back.

I was thinking about it, and realized the place where users will probably notice the lack of statistics is in delete queries, where they often do things like id in (1, 2, 3). So good we will have this in V2 soon.

Comment on lines +129 to +130
let path = if let Some(base_path) = &self.base_path {
temp = base_path.child_path(path);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not worth optimizing yet, but I will note that I've seen flamegraphs of servers with high throughput show the path construction before getting something in a cache as a small hotstop.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, I think string cloning and string building has the potential to become a very difficult to extract hotspot. Not entirely sure how to avoid it other than making cache keys configurable. There's a lot of pieces that will be making up these hash entries:

File index
Column index
Encoding (e.g. stats)
Specific item (e.g. zone maps)

That can probably be a Lance 2.2 problem 😆 (though this is just joking, we can change the cache keys without any change to format / backwards compatibility / etc.)

use object_store::path::Path;

pub trait LancePathExt {
fn child_path(&self, path: &Path) -> Path;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think there is.

Comment on lines +274 to +275
let (substrait_schema, input_schema) =
if envelope.base_schema.as_ref().unwrap().r#struct.is_some() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ideally there would just be a lightweight expressions library I could use

One could argue that DataFusion's goal is to be modular like that, and datafusion-expr could serve this purpose. However, I think the current list of dependencies make it not so lightweight: https://crates.io/crates/datafusion-expr/42.0.0/dependencies

Add initialization method to v2 scheduler
Reconnect zone maps using cache and initialization
@westonpace westonpace force-pushed the feat/reconnect-pushdown branch from 8a7806b to 750a1f2 Compare September 27, 2024 14:11
@westonpace westonpace merged commit d97a93d into lancedb:main Sep 27, 2024
22 checks passed
westonpace added a commit that referenced this pull request Nov 25, 2024
)

In #2913 we added an initialization
routine to the scheduler. Unfortunately, this caused a problem. It put
`schedule_ranges` behind an await which means that we wouldn't schedule
ranges for all files in a scan immediately. Instead, we wouldn't begin
scheduling file X until we had reached nearly the last batch of file
X-1. This introduced stuttering in the read path and had a significant
effect on performance.

The fix here moves the initialization into the dedicated scheduler
thread and keeps the creation of the scheduler a synchronous task which
is actually a bit simpler anyways I think.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
enhancement New feature or request python
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants