-
Notifications
You must be signed in to change notification settings - Fork 245
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat: reconnect pushdown to v2 #2913
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -11,6 +11,7 @@ use futures::Future; | |
use moka::sync::Cache; | ||
use object_store::path::Path; | ||
|
||
use crate::utils::path::LancePathExt; | ||
use crate::Result; | ||
|
||
pub const DEFAULT_INDEX_CACHE_SIZE: usize = 128; | ||
|
@@ -21,7 +22,7 @@ type ArcAny = Arc<dyn Any + Send + Sync>; | |
#[derive(Clone)] | ||
struct SizedRecord { | ||
record: ArcAny, | ||
size_accessor: Arc<dyn Fn(ArcAny) -> usize + Send + Sync>, | ||
size_accessor: Arc<dyn Fn(&ArcAny) -> usize + Send + Sync>, | ||
} | ||
|
||
impl std::fmt::Debug for SizedRecord { | ||
|
@@ -35,7 +36,7 @@ impl std::fmt::Debug for SizedRecord { | |
impl SizedRecord { | ||
fn new<T: DeepSizeOf + Send + Sync + 'static>(record: Arc<T>) -> Self { | ||
let size_accessor = | ||
|record: ArcAny| -> usize { record.downcast_ref::<T>().unwrap().deep_size_of() }; | ||
|record: &ArcAny| -> usize { record.downcast_ref::<T>().unwrap().deep_size_of() }; | ||
Self { | ||
record, | ||
size_accessor: Arc::new(size_accessor), | ||
|
@@ -48,38 +49,104 @@ impl SizedRecord { | |
/// The cache is keyed by the file path and the type of metadata. | ||
#[derive(Clone, Debug)] | ||
pub struct FileMetadataCache { | ||
cache: Arc<Cache<(Path, TypeId), SizedRecord>>, | ||
cache: Option<Arc<Cache<(Path, TypeId), SizedRecord>>>, | ||
base_path: Option<Path>, | ||
} | ||
|
||
impl DeepSizeOf for FileMetadataCache { | ||
fn deep_size_of_children(&self, _: &mut Context) -> usize { | ||
self.cache | ||
.iter() | ||
.map(|(_, v)| (v.size_accessor)(v.record)) | ||
.sum() | ||
.as_ref() | ||
.map(|cache| { | ||
cache | ||
.iter() | ||
.map(|(_, v)| (v.size_accessor)(&v.record)) | ||
.sum() | ||
}) | ||
.unwrap_or(0) | ||
} | ||
} | ||
|
||
pub enum CapacityMode { | ||
Items, | ||
Bytes, | ||
} | ||
Comment on lines
+70
to
+73
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I can back this out. I didn't really end up using it. At one point I thought there might be a dedicated cache for the v2 stuff (and we could prototype bytes mode with it) and then I ended up deciding to just use the existing dataset cache. Still, I do think we want to move that cache to bytes mode at some point but I didn't want to make this a breaking change. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. My recent debugging experiences have made me really want to push on doing size-based (bytes-based) eviction. Doing based on items isn't great for things that can be very large, like index partitions. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Agreed. Size based is much easier for users to wrap their heads around. Maybe we can switch everything over at some point. Though we will want some benchmarking (if its only calculating size on insert then it shouldn't be too terribly expensive but if it is recalculating size for existing objects every time something new gets inserted then it could be expensive). |
||
|
||
impl FileMetadataCache { | ||
/// Instantiates a new cache which, for legacy reasons, uses Items capacity mode. | ||
pub fn new(capacity: usize) -> Self { | ||
Self { | ||
cache: Arc::new(Cache::new(capacity as u64)), | ||
cache: Some(Arc::new(Cache::new(capacity as u64))), | ||
base_path: None, | ||
} | ||
} | ||
|
||
/// Instantiates a dummy cache that will never cache anything. | ||
pub fn no_cache() -> Self { | ||
Self { | ||
cache: None, | ||
base_path: None, | ||
} | ||
} | ||
|
||
/// Instantiates a new cache with a given capacity and capacity mode. | ||
pub fn with_capacity(capacity: usize, mode: CapacityMode) -> Self { | ||
match mode { | ||
CapacityMode::Items => Self::new(capacity), | ||
CapacityMode::Bytes => Self { | ||
cache: Some(Arc::new( | ||
Cache::builder() | ||
.weigher(|_, v: &SizedRecord| { | ||
(v.size_accessor)(&v.record).try_into().unwrap_or(u32::MAX) | ||
}) | ||
.build(), | ||
)), | ||
base_path: None, | ||
}, | ||
} | ||
} | ||
|
||
/// Creates a new cache which shares the same underlying cache but prepends `base_path` to all | ||
/// keys. | ||
pub fn with_base_path(&self, base_path: Path) -> Self { | ||
Self { | ||
cache: self.cache.clone(), | ||
base_path: Some(base_path), | ||
} | ||
} | ||
|
||
pub fn size(&self) -> usize { | ||
self.cache.entry_count() as usize | ||
if let Some(cache) = self.cache.as_ref() { | ||
cache.entry_count() as usize | ||
} else { | ||
0 | ||
} | ||
} | ||
|
||
pub fn get<T: Send + Sync + 'static>(&self, path: &Path) -> Option<Arc<T>> { | ||
self.cache | ||
let cache = self.cache.as_ref()?; | ||
let temp: Path; | ||
let path = if let Some(base_path) = &self.base_path { | ||
temp = base_path.child_path(path); | ||
Comment on lines
+129
to
+130
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Not worth optimizing yet, but I will note that I've seen flamegraphs of servers with high throughput show the path construction before getting something in a cache as a small hotstop. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Yeah, I think string cloning and string building has the potential to become a very difficult to extract hotspot. Not entirely sure how to avoid it other than making cache keys configurable. There's a lot of pieces that will be making up these hash entries: File index That can probably be a Lance 2.2 problem 😆 (though this is just joking, we can change the cache keys without any change to format / backwards compatibility / etc.) |
||
&temp | ||
} else { | ||
path | ||
}; | ||
cache | ||
.get(&(path.to_owned(), TypeId::of::<T>())) | ||
.map(|metadata| metadata.record.clone().downcast::<T>().unwrap()) | ||
} | ||
|
||
pub fn insert<T: DeepSizeOf + Send + Sync + 'static>(&self, path: Path, metadata: Arc<T>) { | ||
self.cache | ||
.insert((path, TypeId::of::<T>()), SizedRecord::new(metadata)); | ||
let Some(cache) = self.cache.as_ref() else { | ||
return; | ||
}; | ||
let path = if let Some(base_path) = &self.base_path { | ||
base_path.child_path(&path) | ||
} else { | ||
path | ||
}; | ||
cache.insert((path, TypeId::of::<T>()), SizedRecord::new(metadata)); | ||
} | ||
|
||
/// Get an item | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,18 @@ | ||
// SPDX-License-Identifier: Apache-2.0 | ||
// SPDX-FileCopyrightText: Copyright The Lance Authors | ||
|
||
use object_store::path::Path; | ||
|
||
pub trait LancePathExt { | ||
fn child_path(&self, path: &Path) -> Path; | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I couldn't figure out how to join two There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I don't think there is. |
||
} | ||
|
||
impl LancePathExt for Path { | ||
fn child_path(&self, path: &Path) -> Path { | ||
let mut new_path = self.clone(); | ||
for part in path.parts() { | ||
new_path = path.child(part); | ||
} | ||
new_path | ||
} | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -271,22 +271,31 @@ pub async fn parse_substrait(expr: &[u8], input_schema: Arc<Schema>) -> Result<E | |
}), | ||
}?; | ||
|
||
let (substrait_schema, input_schema, index_mapping) = | ||
remove_extension_types(envelope.base_schema.as_ref().unwrap(), input_schema.clone())?; | ||
let (substrait_schema, input_schema) = | ||
if envelope.base_schema.as_ref().unwrap().r#struct.is_some() { | ||
Comment on lines
+274
to
+275
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The zone maps / substrait / datafusion stuff is a bit of a hack atm. I am trying really hard to avoid datafusion being a dependency of the file reader. Unfortunately, that means we need something to represent a filter expression in the file reader and so I'm using substrait. However, lance (top level / dataset package) IS using datafusion for its filters. So this means we go from dataset filter (datafusion) to substrait and then (in the decoder plugin) back to a datafusion filter. Ideally there would just be a lightweight expressions library I could use and, once I figure out cloning, it might be fun to build one. In the meantime this hack is because we need to encode DF filters into substrait somehow and, when I do so (in There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
One could argue that DataFusion's goal is to be modular like that, and There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Hmm, if we can make sqlparser optional and get rid of arrow (arrow-buffer and arrow-array are fine) then it would probably be fine (I don't know how big those datafusion-...-common crates are but hopefully they're pretty small). I'd also want to drag the substrait expression parsing into the crate (under a feature flag probably). I'll add it to my potential list of xmas break projects 😆 |
||
let (substrait_schema, input_schema, index_mapping) = remove_extension_types( | ||
envelope.base_schema.as_ref().unwrap(), | ||
input_schema.clone(), | ||
)?; | ||
|
||
if substrait_schema.r#struct.as_ref().unwrap().types.len() | ||
!= envelope | ||
.base_schema | ||
.as_ref() | ||
.unwrap() | ||
.r#struct | ||
.as_ref() | ||
.unwrap() | ||
.types | ||
.len() | ||
{ | ||
remap_expr_references(&mut expr, &index_mapping)?; | ||
} | ||
if substrait_schema.r#struct.as_ref().unwrap().types.len() | ||
!= envelope | ||
.base_schema | ||
.as_ref() | ||
.unwrap() | ||
.r#struct | ||
.as_ref() | ||
.unwrap() | ||
.types | ||
.len() | ||
{ | ||
remap_expr_references(&mut expr, &index_mapping)?; | ||
} | ||
|
||
(substrait_schema, input_schema) | ||
} else { | ||
(envelope.base_schema.as_ref().unwrap().clone(), input_schema) | ||
}; | ||
|
||
// Datafusion's substrait consumer only supports Plan (not ExtendedExpression) and so | ||
// we need to create a dummy plan with a single project node | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There were (and perhaps still are) a lot of spots that use
Option<Cache>
withNone
to represent "no cache". However, then you need special logic like...Instead, I made it so you can cheaply create a
FileMetadataCache::no_cache()
which just never inserts anything.