-
Notifications
You must be signed in to change notification settings - Fork 245
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat: reconnect pushdown to v2 #2913
Conversation
Leaving in draft as I have a few tests to get passing but I wanted a CI run. |
866182d
to
8a7806b
Compare
@@ -48,38 +49,104 @@ impl SizedRecord { | |||
/// The cache is keyed by the file path and the type of metadata. | |||
#[derive(Clone, Debug)] | |||
pub struct FileMetadataCache { | |||
cache: Arc<Cache<(Path, TypeId), SizedRecord>>, | |||
cache: Option<Arc<Cache<(Path, TypeId), SizedRecord>>>, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There were (and perhaps still are) a lot of spots that use Option<Cache>
with None
to represent "no cache". However, then you need special logic like...
let thing = if let Some(cache) {
cache.find_or_insert(key, load_fn)
} else {
load_fn()
}
Instead, I made it so you can cheaply create a FileMetadataCache::no_cache()
which just never inserts anything.
pub enum CapacityMode { | ||
Items, | ||
Bytes, | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I can back this out. I didn't really end up using it. At one point I thought there might be a dedicated cache for the v2 stuff (and we could prototype bytes mode with it) and then I ended up deciding to just use the existing dataset cache. Still, I do think we want to move that cache to bytes mode at some point but I didn't want to make this a breaking change.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
My recent debugging experiences have made me really want to push on doing size-based (bytes-based) eviction. Doing based on items isn't great for things that can be very large, like index partitions.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Agreed. Size based is much easier for users to wrap their heads around. Maybe we can switch everything over at some point. Though we will want some benchmarking (if its only calculating size on insert then it shouldn't be too terribly expensive but if it is recalculating size for existing objects every time something new gets inserted then it could be expensive).
use object_store::path::Path; | ||
|
||
pub trait LancePathExt { | ||
fn child_path(&self, path: &Path) -> Path; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I couldn't figure out how to join two object_store::Path
. If I missed something let me know.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't think there is.
let (substrait_schema, input_schema) = | ||
if envelope.base_schema.as_ref().unwrap().r#struct.is_some() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The zone maps / substrait / datafusion stuff is a bit of a hack atm. I am trying really hard to avoid datafusion being a dependency of the file reader. Unfortunately, that means we need something to represent a filter expression in the file reader and so I'm using substrait.
However, lance (top level / dataset package) IS using datafusion for its filters. So this means we go from dataset filter (datafusion) to substrait and then (in the decoder plugin) back to a datafusion filter.
Ideally there would just be a lightweight expressions library I could use and, once I figure out cloning, it might be fun to build one.
In the meantime this hack is because we need to encode DF filters into substrait somehow and, when I do so (in encode_substrait
above) I've being lazy and not including the r#struct
field which we're using here to detect if extension types are used.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ideally there would just be a lightweight expressions library I could use
One could argue that DataFusion's goal is to be modular like that, and datafusion-expr
could serve this purpose. However, I think the current list of dependencies make it not so lightweight: https://crates.io/crates/datafusion-expr/42.0.0/dependencies
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hmm, if we can make sqlparser optional and get rid of arrow (arrow-buffer and arrow-array are fine) then it would probably be fine (I don't know how big those datafusion-...-common crates are but hopefully they're pretty small). I'd also want to drag the substrait expression parsing into the crate (under a feature flag probably). I'll add it to my potential list of xmas break projects 😆
@@ -467,14 +466,14 @@ impl<'a> DecoderMiddlewareChainCursor<'a> { | |||
} | |||
|
|||
pub struct ColumnInfoIter<'a> { | |||
column_infos: &'a [ColumnInfo], | |||
column_infos: Vec<Arc<ColumnInfo>>, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I need the decoder to make a copy of the column metadata because it unwraps it (peels off the zone info) during decoding.
let check_scheduler = stream::unfold((), move |_| { | ||
let handle = scheduler_handle.take(); | ||
async move { | ||
if let Some(handle) = handle { | ||
handle.await.unwrap(); | ||
} | ||
None | ||
} | ||
}); | ||
stream.chain(check_scheduler).boxed() | ||
} | ||
|
||
async fn create_scheduler_decoder( | ||
column_infos: Vec<Arc<ColumnInfo>>, | ||
requested_rows: RequestedRows, | ||
filter: FilterExpression, | ||
column_indices: Vec<u32>, | ||
target_schema: Arc<Schema>, | ||
config: SchedulerDecoderConfig, | ||
) -> Result<BoxStream<'static, ReadBatchTask>> { | ||
let num_rows = requested_rows.num_rows(); | ||
|
||
let mut decode_scheduler = DecodeBatchScheduler::try_new( | ||
target_schema.as_ref(), | ||
&column_indices, | ||
&column_infos, | ||
&vec![], | ||
num_rows, | ||
config.decoder_strategy, | ||
config.io.clone(), | ||
config.cache, | ||
&filter, | ||
) | ||
.await?; | ||
|
||
let root_decoder = match &requested_rows { | ||
RequestedRows::Ranges(ranges) => decode_scheduler.new_root_decoder_ranges(ranges), | ||
RequestedRows::Indices(indices) => decode_scheduler.new_root_decoder_indices(indices), | ||
}; | ||
|
||
let (tx, rx) = mpsc::unbounded_channel(); | ||
|
||
let io = config.io; | ||
let scheduler_handle = tokio::task::spawn(async move { | ||
match requested_rows { | ||
RequestedRows::Ranges(ranges) => { | ||
decode_scheduler.schedule_ranges(&ranges, &filter, tx, io) | ||
} | ||
RequestedRows::Indices(indices) => { | ||
decode_scheduler.schedule_take(&indices, &filter, tx, io) | ||
} | ||
} | ||
}); | ||
|
||
let decode_stream = | ||
BatchDecodeStream::new(rx, config.batch_size, num_rows, root_decoder).into_stream(); | ||
|
||
Ok(check_scheduler_on_drop(decode_stream, scheduler_handle)) | ||
} | ||
|
||
/// Launches a scheduler on a dedicated (spawned) task and creates a decoder to | ||
/// decode the scheduled data and returns the decoder as a stream of record batches. | ||
/// | ||
/// This is a convenience function that creates both the scheduler and the decoder | ||
/// which can be a little tricky to get right. | ||
pub fn schedule_and_decode( | ||
column_infos: Vec<Arc<ColumnInfo>>, | ||
requested_rows: RequestedRows, | ||
filter: FilterExpression, | ||
column_indices: Vec<u32>, | ||
target_schema: Arc<Schema>, | ||
config: SchedulerDecoderConfig, | ||
) -> BoxStream<'static, ReadBatchTask> { | ||
if requested_rows.num_rows() == 0 { | ||
return stream::empty().boxed(); | ||
} | ||
// For convenience we really want this method to be a snchronous method where all | ||
// errors happen on the stream. There is some async initialization that must happen | ||
// when creating a scheduler. We wrap that all up in the very first task. | ||
stream::once(create_scheduler_decoder( | ||
column_infos, | ||
requested_rows, | ||
filter, | ||
column_indices, | ||
target_schema, | ||
config, | ||
)) | ||
.map(|maybe_stream| match maybe_stream { | ||
// If the initialization failed make it look like a failed task | ||
Ok(stream) => stream, | ||
Err(e) => stream::once(std::future::ready(ReadBatchTask { | ||
num_rows: 0, | ||
task: std::future::ready(Err(e)).boxed(), | ||
})) | ||
.boxed(), | ||
}) | ||
.flatten() | ||
.boxed() | ||
} | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This code is mostly a port of code that was in FileReader
(there were actually two copies of it in FileReader
). It's been slightly changed to now call initialize
after creating the DecodeBatchScheduler
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Excited to have this back.
I was thinking about it, and realized the place where users will probably notice the lack of statistics is in delete queries, where they often do things like id in (1, 2, 3)
. So good we will have this in V2 soon.
let path = if let Some(base_path) = &self.base_path { | ||
temp = base_path.child_path(path); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Not worth optimizing yet, but I will note that I've seen flamegraphs of servers with high throughput show the path construction before getting something in a cache as a small hotstop.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah, I think string cloning and string building has the potential to become a very difficult to extract hotspot. Not entirely sure how to avoid it other than making cache keys configurable. There's a lot of pieces that will be making up these hash entries:
File index
Column index
Encoding (e.g. stats)
Specific item (e.g. zone maps)
That can probably be a Lance 2.2 problem 😆 (though this is just joking, we can change the cache keys without any change to format / backwards compatibility / etc.)
use object_store::path::Path; | ||
|
||
pub trait LancePathExt { | ||
fn child_path(&self, path: &Path) -> Path; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't think there is.
let (substrait_schema, input_schema) = | ||
if envelope.base_schema.as_ref().unwrap().r#struct.is_some() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ideally there would just be a lightweight expressions library I could use
One could argue that DataFusion's goal is to be modular like that, and datafusion-expr
could serve this purpose. However, I think the current list of dependencies make it not so lightweight: https://crates.io/crates/datafusion-expr/42.0.0/dependencies
Add initialization method to v2 scheduler Reconnect zone maps using cache and initialization
8a7806b
to
750a1f2
Compare
) In #2913 we added an initialization routine to the scheduler. Unfortunately, this caused a problem. It put `schedule_ranges` behind an await which means that we wouldn't schedule ranges for all files in a scan immediately. Instead, we wouldn't begin scheduling file X until we had reached nearly the last batch of file X-1. This introduced stuttering in the read path and had a significant effect on performance. The fix here moves the initialization into the dedicated scheduler thread and keeps the creation of the scheduler a synchronous task which is actually a bit simpler anyways I think.
There is still a bit more testing work to do before pushdown is fully supported in v2 and until we start using
LanceDfFieldDecoderStrategy
in the file reader it won't be accessible to users. However, this PR has a number of structural refactors for v2 and is big enough as it is.This adds a cache to the v2 schedulers. This is needed in this PR because we want to use the cache to store zone maps. However, it will be needed in future 2.1 work as well because we want to cache things like "rows per chunk" and "dictionaries".
This adds an initialization routine to v2 schedulers. Again, this is needed for zone maps but will also be used by 2.1 features.
Lastly, this PR does, in fact, reconnect the zone maps feature, restoring blocks that had been commented out.