Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Enable ObjectMeta Cache for ListingTable #10110

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions datafusion/execution/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -46,3 +46,4 @@ parking_lot = { workspace = true }
rand = { workspace = true }
tempfile = { workspace = true }
url = { workspace = true }
sequence_trie = "*"
15 changes: 12 additions & 3 deletions datafusion/execution/src/cache/cache_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,13 +15,15 @@
// specific language governing permissions and limitations
// under the License.

use super::cache_unit::{self, DefaultFileStatisticsCache};
use crate::cache::CacheAccessor;
use datafusion_common::{Result, Statistics};
use object_store::path::Path;
use object_store::ObjectMeta;
use parking_lot::Mutex;
use parking_lot::RwLock;
use std::fmt::{Debug, Formatter};
use std::sync::Arc;

/// The cache of listing files statistics.
/// if set [`CacheManagerConfig::with_files_statistics_cache`]
/// Will avoid infer same file statistics repeatedly during the session lifetime,
Expand Down Expand Up @@ -73,7 +75,7 @@ impl CacheManager {
}
}

#[derive(Clone, Default)]
#[derive(Clone)]
pub struct CacheManagerConfig {
/// Enable cache of files statistics when listing files.
/// Avoid get same file statistics repeatedly in same datafusion session.
Expand All @@ -87,7 +89,14 @@ pub struct CacheManagerConfig {
/// Default is disable.
pub list_files_cache: Option<ListFilesCache>,
}

impl Default for CacheManagerConfig {
fn default() -> Self {
Self {
table_files_statistics_cache: None,
list_files_cache: Some(Arc::new(cache_unit::TrieFileCache::default())),
}
}
}
impl CacheManagerConfig {
pub fn with_files_statistics_cache(
mut self,
Expand Down
90 changes: 89 additions & 1 deletion datafusion/execution/src/cache/cache_unit.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,10 +15,12 @@
// specific language governing permissions and limitations
// under the License.

use core::panic;
use std::sync::Arc;

use crate::cache::CacheAccessor;

use datafusion_common::tree_node::TreeNode;
use datafusion_common::Statistics;

use dashmap::DashMap;
Expand Down Expand Up @@ -156,10 +158,74 @@ impl CacheAccessor<Path, Arc<Vec<ObjectMeta>>> for DefaultListFilesCache {
"DefaultListFilesCache".to_string()
}
}
use parking_lot::RwLock;
use sequence_trie::SequenceTrie;
#[derive(Default)]
pub struct TrieFileCache {
stores: RwLock<SequenceTrie<char, Arc<Vec<ObjectMeta>>>>,
}
impl TrieFileCache {
fn path_to_char_seq(path: &Path) -> Vec<char> {
path.filename().unwrap_or_default().chars().collect()
}
}

impl CacheAccessor<Path, Arc<Vec<ObjectMeta>>> for TrieFileCache {
type Extra = ObjectMeta;

fn get_with_extra(&self, k: &Path, e: &Self::Extra) -> Option<Arc<Vec<ObjectMeta>>> {
panic!("Not supported DefaultListFilesCache put_with_extra")
}
fn get(&self, k: &Path) -> Option<Arc<Vec<ObjectMeta>>> {
let key_seq = Self::path_to_char_seq(k);
self.stores.read().get(&key_seq).map(|x| x.clone())
}
fn put(
&self,
key: &Path,
value: Arc<Vec<ObjectMeta>>,
) -> Option<Arc<Vec<ObjectMeta>>> {
let key_seq = Self::path_to_char_seq(key);
self.stores.write().insert(&key_seq, value)
}

fn put_with_extra(
&self,
_key: &Path,
_value: Arc<Vec<ObjectMeta>>,
_e: &Self::Extra,
) -> Option<Arc<Vec<ObjectMeta>>> {
panic!("Not supported TrieFileCache put_with_extra")
}

fn remove(&mut self, key: &Path) -> Option<Arc<Vec<ObjectMeta>>> {
let key_seq = Self::path_to_char_seq(key);
self.stores.write().remove(&key_seq);
Some(Arc::new(Default::default()))
}

fn contains_key(&self, k: &Path) -> bool {
let key_seq = Self::path_to_char_seq(k);
self.stores.read().get(&key_seq).is_some()
}

fn len(&self) -> usize {
panic!("did not implement len")
}

fn clear(&self) {
self.stores.write().children().clear()
}

fn name(&self) -> String {
"TrieFileCache".to_string()
}
}
#[cfg(test)]
mod tests {
use crate::cache::cache_unit::{DefaultFileStatisticsCache, DefaultListFilesCache};
use crate::cache::cache_unit::{
DefaultFileStatisticsCache, DefaultListFilesCache, TrieFileCache,
};
use crate::cache::CacheAccessor;
use arrow::datatypes::{DataType, Field, Schema, TimeUnit};
use chrono::DateTime;
Expand Down Expand Up @@ -232,4 +298,26 @@ mod tests {
meta.clone()
);
}

#[test]
fn test_trie_cache() {
let meta = ObjectMeta {
location: Path::from("test"),
last_modified: DateTime::parse_from_rfc3339("2022-09-27T22:36:00+02:00")
.unwrap()
.into(),
size: 1024,
e_tag: None,
version: None,
};

let cache = TrieFileCache::default();
assert!(cache.get(&meta.location).is_none());

cache.put(&meta.location, vec![meta.clone()].into());
assert_eq!(
cache.get(&meta.location).unwrap().first().unwrap().clone(),
meta.clone()
);
}
}
Loading