Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: reconnect pushdown to v2 #2913

Merged
merged 3 commits into from
Sep 27, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 9 additions & 3 deletions python/src/file.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ use arrow_schema::Schema as ArrowSchema;
use bytes::Bytes;
use futures::stream::StreamExt;
use lance::io::{ObjectStore, RecordBatchStream};
use lance_core::cache::FileMetadataCache;
use lance_encoding::decoder::{DecoderMiddlewareChain, FilterExpression};
use lance_file::{
v2::{
Expand Down Expand Up @@ -331,9 +332,14 @@ impl LanceFileReader {
},
);
let file = scheduler.open_file(&path).await.infer_error()?;
let inner = FileReader::try_open(file, None, DecoderMiddlewareChain::default())
.await
.infer_error()?;
let inner = FileReader::try_open(
file,
None,
Arc::<DecoderMiddlewareChain>::default(),
&FileMetadataCache::no_cache(),
)
.await
.infer_error()?;
Ok(Self {
inner: Arc::new(inner),
})
Expand Down
89 changes: 78 additions & 11 deletions rust/lance-core/src/cache.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ use futures::Future;
use moka::sync::Cache;
use object_store::path::Path;

use crate::utils::path::LancePathExt;
use crate::Result;

pub const DEFAULT_INDEX_CACHE_SIZE: usize = 128;
Expand All @@ -21,7 +22,7 @@ type ArcAny = Arc<dyn Any + Send + Sync>;
#[derive(Clone)]
struct SizedRecord {
record: ArcAny,
size_accessor: Arc<dyn Fn(ArcAny) -> usize + Send + Sync>,
size_accessor: Arc<dyn Fn(&ArcAny) -> usize + Send + Sync>,
}

impl std::fmt::Debug for SizedRecord {
Expand All @@ -35,7 +36,7 @@ impl std::fmt::Debug for SizedRecord {
impl SizedRecord {
fn new<T: DeepSizeOf + Send + Sync + 'static>(record: Arc<T>) -> Self {
let size_accessor =
|record: ArcAny| -> usize { record.downcast_ref::<T>().unwrap().deep_size_of() };
|record: &ArcAny| -> usize { record.downcast_ref::<T>().unwrap().deep_size_of() };
Self {
record,
size_accessor: Arc::new(size_accessor),
Expand All @@ -48,38 +49,104 @@ impl SizedRecord {
/// The cache is keyed by the file path and the type of metadata.
#[derive(Clone, Debug)]
pub struct FileMetadataCache {
cache: Arc<Cache<(Path, TypeId), SizedRecord>>,
cache: Option<Arc<Cache<(Path, TypeId), SizedRecord>>>,
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There were (and perhaps still are) a lot of spots that use Option<Cache> with None to represent "no cache". However, then you need special logic like...

let thing = if let Some(cache) {
  cache.find_or_insert(key, load_fn)
} else {
  load_fn()
}

Instead, I made it so you can cheaply create a FileMetadataCache::no_cache() which just never inserts anything.

base_path: Option<Path>,
}

impl DeepSizeOf for FileMetadataCache {
fn deep_size_of_children(&self, _: &mut Context) -> usize {
self.cache
.iter()
.map(|(_, v)| (v.size_accessor)(v.record))
.sum()
.as_ref()
.map(|cache| {
cache
.iter()
.map(|(_, v)| (v.size_accessor)(&v.record))
.sum()
})
.unwrap_or(0)
}
}

pub enum CapacityMode {
Items,
Bytes,
}
Comment on lines +70 to +73
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I can back this out. I didn't really end up using it. At one point I thought there might be a dedicated cache for the v2 stuff (and we could prototype bytes mode with it) and then I ended up deciding to just use the existing dataset cache. Still, I do think we want to move that cache to bytes mode at some point but I didn't want to make this a breaking change.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

My recent debugging experiences have made me really want to push on doing size-based (bytes-based) eviction. Doing based on items isn't great for things that can be very large, like index partitions.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Agreed. Size based is much easier for users to wrap their heads around. Maybe we can switch everything over at some point. Though we will want some benchmarking (if its only calculating size on insert then it shouldn't be too terribly expensive but if it is recalculating size for existing objects every time something new gets inserted then it could be expensive).


impl FileMetadataCache {
/// Instantiates a new cache which, for legacy reasons, uses Items capacity mode.
pub fn new(capacity: usize) -> Self {
Self {
cache: Arc::new(Cache::new(capacity as u64)),
cache: Some(Arc::new(Cache::new(capacity as u64))),
base_path: None,
}
}

/// Instantiates a dummy cache that will never cache anything.
pub fn no_cache() -> Self {
Self {
cache: None,
base_path: None,
}
}

/// Instantiates a new cache with a given capacity and capacity mode.
pub fn with_capacity(capacity: usize, mode: CapacityMode) -> Self {
match mode {
CapacityMode::Items => Self::new(capacity),
CapacityMode::Bytes => Self {
cache: Some(Arc::new(
Cache::builder()
.weigher(|_, v: &SizedRecord| {
(v.size_accessor)(&v.record).try_into().unwrap_or(u32::MAX)
})
.build(),
)),
base_path: None,
},
}
}

/// Creates a new cache which shares the same underlying cache but prepends `base_path` to all
/// keys.
pub fn with_base_path(&self, base_path: Path) -> Self {
Self {
cache: self.cache.clone(),
base_path: Some(base_path),
}
}

pub fn size(&self) -> usize {
self.cache.entry_count() as usize
if let Some(cache) = self.cache.as_ref() {
cache.entry_count() as usize
} else {
0
}
}

pub fn get<T: Send + Sync + 'static>(&self, path: &Path) -> Option<Arc<T>> {
self.cache
let cache = self.cache.as_ref()?;
let temp: Path;
let path = if let Some(base_path) = &self.base_path {
temp = base_path.child_path(path);
Comment on lines +129 to +130
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not worth optimizing yet, but I will note that I've seen flamegraphs of servers with high throughput show the path construction before getting something in a cache as a small hotstop.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, I think string cloning and string building has the potential to become a very difficult to extract hotspot. Not entirely sure how to avoid it other than making cache keys configurable. There's a lot of pieces that will be making up these hash entries:

File index
Column index
Encoding (e.g. stats)
Specific item (e.g. zone maps)

That can probably be a Lance 2.2 problem 😆 (though this is just joking, we can change the cache keys without any change to format / backwards compatibility / etc.)

&temp
} else {
path
};
cache
.get(&(path.to_owned(), TypeId::of::<T>()))
.map(|metadata| metadata.record.clone().downcast::<T>().unwrap())
}

pub fn insert<T: DeepSizeOf + Send + Sync + 'static>(&self, path: Path, metadata: Arc<T>) {
self.cache
.insert((path, TypeId::of::<T>()), SizedRecord::new(metadata));
let Some(cache) = self.cache.as_ref() else {
return;
};
let path = if let Some(base_path) = &self.base_path {
base_path.child_path(&path)
} else {
path
};
cache.insert((path, TypeId::of::<T>()), SizedRecord::new(metadata));
}

/// Get an item
Expand Down
1 change: 1 addition & 0 deletions rust/lance-core/src/utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ pub mod cpu;
pub mod deletion;
pub mod futures;
pub mod mask;
pub mod path;
pub mod testing;
pub mod tokio;
pub mod tracing;
18 changes: 18 additions & 0 deletions rust/lance-core/src/utils/path.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
// SPDX-License-Identifier: Apache-2.0
// SPDX-FileCopyrightText: Copyright The Lance Authors

use object_store::path::Path;

pub trait LancePathExt {
fn child_path(&self, path: &Path) -> Path;
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I couldn't figure out how to join two object_store::Path. If I missed something let me know.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think there is.

}

impl LancePathExt for Path {
fn child_path(&self, path: &Path) -> Path {
let mut new_path = self.clone();
for part in path.parts() {
new_path = path.child(part);
}
new_path
}
}
39 changes: 24 additions & 15 deletions rust/lance-datafusion/src/substrait.rs
Original file line number Diff line number Diff line change
Expand Up @@ -271,22 +271,31 @@ pub async fn parse_substrait(expr: &[u8], input_schema: Arc<Schema>) -> Result<E
}),
}?;

let (substrait_schema, input_schema, index_mapping) =
remove_extension_types(envelope.base_schema.as_ref().unwrap(), input_schema.clone())?;
let (substrait_schema, input_schema) =
if envelope.base_schema.as_ref().unwrap().r#struct.is_some() {
Comment on lines +274 to +275
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The zone maps / substrait / datafusion stuff is a bit of a hack atm. I am trying really hard to avoid datafusion being a dependency of the file reader. Unfortunately, that means we need something to represent a filter expression in the file reader and so I'm using substrait.

However, lance (top level / dataset package) IS using datafusion for its filters. So this means we go from dataset filter (datafusion) to substrait and then (in the decoder plugin) back to a datafusion filter.

Ideally there would just be a lightweight expressions library I could use and, once I figure out cloning, it might be fun to build one.

In the meantime this hack is because we need to encode DF filters into substrait somehow and, when I do so (in encode_substrait above) I've being lazy and not including the r#struct field which we're using here to detect if extension types are used.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ideally there would just be a lightweight expressions library I could use

One could argue that DataFusion's goal is to be modular like that, and datafusion-expr could serve this purpose. However, I think the current list of dependencies make it not so lightweight: https://crates.io/crates/datafusion-expr/42.0.0/dependencies

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm, if we can make sqlparser optional and get rid of arrow (arrow-buffer and arrow-array are fine) then it would probably be fine (I don't know how big those datafusion-...-common crates are but hopefully they're pretty small). I'd also want to drag the substrait expression parsing into the crate (under a feature flag probably). I'll add it to my potential list of xmas break projects 😆

let (substrait_schema, input_schema, index_mapping) = remove_extension_types(
envelope.base_schema.as_ref().unwrap(),
input_schema.clone(),
)?;

if substrait_schema.r#struct.as_ref().unwrap().types.len()
!= envelope
.base_schema
.as_ref()
.unwrap()
.r#struct
.as_ref()
.unwrap()
.types
.len()
{
remap_expr_references(&mut expr, &index_mapping)?;
}
if substrait_schema.r#struct.as_ref().unwrap().types.len()
!= envelope
.base_schema
.as_ref()
.unwrap()
.r#struct
.as_ref()
.unwrap()
.types
.len()
{
remap_expr_references(&mut expr, &index_mapping)?;
}

(substrait_schema, input_schema)
} else {
(envelope.base_schema.as_ref().unwrap().clone(), input_schema)
};

// Datafusion's substrait consumer only supports Plan (not ExtendedExpression) and so
// we need to create a dummy plan with a single project node
Expand Down
55 changes: 25 additions & 30 deletions rust/lance-encoding-datafusion/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,10 @@
// SPDX-License-Identifier: Apache-2.0
// SPDX-FileCopyrightText: Copyright The Lance Authors

use std::sync::{Arc, Mutex};
use std::{
collections::HashMap,
sync::{Arc, Mutex},
};

use arrow_schema::DataType;
use lance_core::{
Expand All @@ -27,7 +30,7 @@ struct LanceDfFieldDecoderState {
rows_per_map: Option<u32>,
/// As we visit the decoding tree we populate this with the pushdown
/// information that is available.
zone_map_buffers: Vec<UnloadedPushdown>,
zone_map_buffers: HashMap<u32, UnloadedPushdown>,
}

/// This strategy is responsible for creating the field scheduler
Expand Down Expand Up @@ -60,15 +63,20 @@ impl LanceDfFieldDecoderStrategy {
if state.is_none() {
*state = Some(LanceDfFieldDecoderState {
rows_per_map: None,
zone_map_buffers: Vec::new(),
zone_map_buffers: HashMap::new(),
});
true
} else {
false
}
}

fn add_pushdown_field(&self, rows_per_map: u32, unloaded_pushdown: UnloadedPushdown) {
fn add_pushdown_field(
&self,
field: &Field,
rows_per_map: u32,
unloaded_pushdown: UnloadedPushdown,
) {
let mut state = self.state.lock().unwrap();
let state = state.as_mut().unwrap();
match state.rows_per_map {
Expand All @@ -79,7 +87,9 @@ impl LanceDfFieldDecoderStrategy {
state.rows_per_map = Some(rows_per_map);
}
}
state.zone_map_buffers.push(unloaded_pushdown);
state
.zone_map_buffers
.insert(field.id as u32, unloaded_pushdown);
}
}

Expand All @@ -96,55 +106,40 @@ impl FieldDecoderStrategy for LanceDfFieldDecoderStrategy {
)> {
let is_root = self.initialize();

if let Some((rows_per_map, unloaded_pushdown)) = extract_zone_info(
column_infos.next().unwrap(),
&field.data_type(),
chain.current_path(),
) {
if let Some((rows_per_map, unloaded_pushdown)) =
extract_zone_info(column_infos, &field.data_type(), chain.current_path())
{
// If there is pushdown info then record it and unwrap the
// pushdown encoding layer.
self.add_pushdown_field(rows_per_map, unloaded_pushdown);
self.add_pushdown_field(field, rows_per_map, unloaded_pushdown);
}
// Delegate to the rest of the chain to create the decoder
let (chain, next) = chain.next(field, column_infos, buffers)?;

// If this is the top level decoder then wrap it with our
// pushdown filtering scheduler.
let state = if is_root {
self.state.lock().unwrap().take()
} else {
None
};
let schema = self.schema.clone();
let _io = chain.io().clone();

let next = next?;
if is_root {
let state = state.unwrap();
let state = self.state.lock().unwrap().take().unwrap();
let schema = self.schema.clone();
let rows_per_map = state.rows_per_map;
let zone_map_buffers = state.zone_map_buffers;
let next = next?;
let num_rows = next.num_rows();
if rows_per_map.is_none() {
// No columns had any pushdown info
Ok((chain, Ok(next)))
} else {
let mut _scheduler = ZoneMapsFieldScheduler::new(
let scheduler = ZoneMapsFieldScheduler::new(
next,
schema,
zone_map_buffers,
rows_per_map.unwrap(),
num_rows,
);
// Load all the zone maps from disk
// TODO: it would be slightly more efficient to do this
// later when we know what columns are actually used
// for filtering.
// scheduler.initialize(io.as_ref()).await?;
// Ok(Arc::new(scheduler) as Arc<dyn FieldScheduler>)
todo!()
Ok((chain, Ok(Arc::new(scheduler))))
}
} else {
Ok((chain, Ok(next)))
Ok((chain, next))
}
}
}
Expand Down
Loading
Loading