diff --git a/Cargo.lock b/Cargo.lock index 1dfc250c11..c2a92965b3 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1333,6 +1333,7 @@ dependencies = [ "murmur3", "paste 1.0.12", "prost", + "seahash", "serde", "serde_json", "snafu 0.6.10", diff --git a/analytic_engine/src/setup.rs b/analytic_engine/src/setup.rs index 789a51b7ab..c6dd982ee2 100644 --- a/analytic_engine/src/setup.rs +++ b/analytic_engine/src/setup.rs @@ -498,6 +498,7 @@ fn open_storage( opts.disk_cache_capacity.as_byte() as usize, opts.disk_cache_page_size.as_byte() as usize, store, + opts.disk_cache_partition_bits, ) .await .context(OpenObjectStore)?, diff --git a/analytic_engine/src/storage_options.rs b/analytic_engine/src/storage_options.rs index 28e4bba20b..f9558c2736 100644 --- a/analytic_engine/src/storage_options.rs +++ b/analytic_engine/src/storage_options.rs @@ -14,9 +14,11 @@ pub struct StorageOptions { pub mem_cache_capacity: ReadableSize, pub mem_cache_partition_bits: usize, // 0 means disable disk cache - // Note: disk_cache_capacity % disk_cache_page_size should be 0 + // Note: disk_cache_capacity % (disk_cache_page_size * (1 << disk_cache_partition_bits)) should + // be 0 pub disk_cache_capacity: ReadableSize, pub disk_cache_page_size: ReadableSize, + pub disk_cache_partition_bits: usize, pub disk_cache_dir: String, pub object_store: ObjectStoreOptions, } @@ -31,6 +33,7 @@ impl Default for StorageOptions { disk_cache_dir: root_path.clone(), disk_cache_capacity: ReadableSize::gb(0), disk_cache_page_size: ReadableSize::mb(2), + disk_cache_partition_bits: 4, object_store: ObjectStoreOptions::Local(LocalOptions { data_dir: root_path, }), diff --git a/analytic_engine/src/tests/util.rs b/analytic_engine/src/tests/util.rs index 4981da435e..1cb4e25cac 100644 --- a/analytic_engine/src/tests/util.rs +++ b/analytic_engine/src/tests/util.rs @@ -428,6 +428,7 @@ impl Builder { disk_cache_dir: "".to_string(), disk_cache_capacity: ReadableSize::mb(0), disk_cache_page_size: ReadableSize::mb(0), + disk_cache_partition_bits: 0, object_store: ObjectStoreOptions::Local(LocalOptions { data_dir: dir.path().to_str().unwrap().to_string(), }), @@ -490,6 +491,7 @@ impl Default for RocksDBEngineBuildContext { disk_cache_dir: "".to_string(), disk_cache_capacity: ReadableSize::mb(0), disk_cache_page_size: ReadableSize::mb(0), + disk_cache_partition_bits: 0, object_store: ObjectStoreOptions::Local(LocalOptions { data_dir: dir.path().to_str().unwrap().to_string(), }), @@ -517,6 +519,7 @@ impl Clone for RocksDBEngineBuildContext { disk_cache_dir: "".to_string(), disk_cache_capacity: ReadableSize::mb(0), disk_cache_page_size: ReadableSize::mb(0), + disk_cache_partition_bits: 0, object_store: ObjectStoreOptions::Local(LocalOptions { data_dir: dir.path().to_str().unwrap().to_string(), }), @@ -560,6 +563,7 @@ impl Default for MemoryEngineBuildContext { disk_cache_dir: "".to_string(), disk_cache_capacity: ReadableSize::mb(0), disk_cache_page_size: ReadableSize::mb(0), + disk_cache_partition_bits: 0, object_store: ObjectStoreOptions::Local(LocalOptions { data_dir: dir.path().to_str().unwrap().to_string(), }), diff --git a/common_types/Cargo.toml b/common_types/Cargo.toml index 2ffc52188e..5f9a0c5b6b 100644 --- a/common_types/Cargo.toml +++ b/common_types/Cargo.toml @@ -27,6 +27,7 @@ datafusion = { workspace = true, optional = true } murmur3 = "0.4.1" paste = { workspace = true } prost = { workspace = true } +seahash = "4.1.0" serde = { workspace = true } serde_json = { workspace = true } snafu = { workspace = true } diff --git a/common_types/src/hash.rs b/common_types/src/hash.rs index 2094a0ab0d..d9fd04c5f9 100644 --- a/common_types/src/hash.rs +++ b/common_types/src/hash.rs @@ -16,6 +16,7 @@ use std::hash::BuildHasher; use ahash::AHasher; use byteorder::{ByteOrder, LittleEndian}; use murmur3::murmur3_x64_128; +pub use seahash::SeaHasher; pub fn hash64(mut bytes: &[u8]) -> u64 { let mut out = [0; 16]; murmur3_x64_128(&mut bytes, 0, &mut out); diff --git a/common_util/src/partitioned_lock.rs b/common_util/src/partitioned_lock.rs index da22cb3e7d..da7b5dd7a1 100644 --- a/common_util/src/partitioned_lock.rs +++ b/common_util/src/partitioned_lock.rs @@ -7,7 +7,9 @@ use std::{ sync::{Mutex, MutexGuard, RwLock, RwLockReadGuard, RwLockWriteGuard}, }; -use common_types::hash::build_fixed_seed_ahasher; +use common_types::hash::{build_fixed_seed_ahasher, SeaHasher}; +use tokio; + /// Simple partitioned `RwLock` pub struct PartitionedRwLock { partitions: Vec>, @@ -100,6 +102,45 @@ impl PartitionedMutex { } } +#[derive(Debug)] +pub struct PartitionedMutexAsync { + partitions: Vec>, + partition_mask: usize, +} + +impl PartitionedMutexAsync { + pub fn new(init_fn: F, partition_bit: usize) -> Self + where + F: Fn() -> T, + { + let partition_num = 1 << partition_bit; + let partitions = (0..partition_num) + .map(|_| tokio::sync::Mutex::new(init_fn())) + .collect::>>(); + Self { + partitions, + partition_mask: partition_num - 1, + } + } + + pub async fn lock(&self, key: &K) -> tokio::sync::MutexGuard<'_, T> { + let mutex = self.get_partition(key); + + mutex.lock().await + } + + fn get_partition(&self, key: &K) -> &tokio::sync::Mutex { + let mut hasher = SeaHasher::new(); + key.hash(&mut hasher); + &self.partitions[(hasher.finish() as usize) & self.partition_mask] + } + + #[cfg(test)] + async fn get_partition_by_index(&self, index: usize) -> &tokio::sync::Mutex { + &self.partitions[index] + } +} + #[cfg(test)] mod tests { use std::collections::HashMap; @@ -142,6 +183,24 @@ mod tests { } } + #[tokio::test] + async fn test_partitioned_mutex_async() { + let init_hmap = HashMap::new; + let test_locked_map = PartitionedMutexAsync::new(init_hmap, 4); + let test_key = "test_key".to_string(); + let test_value = "test_value".to_string(); + + { + let mut map = test_locked_map.lock(&test_key).await; + map.insert(test_key.clone(), test_value.clone()); + } + + { + let map = test_locked_map.lock(&test_key).await; + assert_eq!(map.get(&test_key).unwrap(), &test_value); + } + } + #[test] fn test_partitioned_mutex_vis_different_partition() { let init_vec = Vec::::new; @@ -168,4 +227,18 @@ mod tests { assert!(mutex_second_try_lock.try_write().is_ok()); assert!(mutex_first.try_write().is_err()); } + + #[tokio::test] + async fn test_partitioned_mutex_async_vis_different_partition() { + let init_vec = Vec::::new; + let test_locked_map = PartitionedMutexAsync::new(init_vec, 4); + let mutex_first = test_locked_map.get_partition_by_index(0).await; + + let mut _tmp_data = mutex_first.lock().await; + assert!(mutex_first.try_lock().is_err()); + + let mutex_second = test_locked_map.get_partition_by_index(1).await; + assert!(mutex_second.try_lock().is_ok()); + assert!(mutex_first.try_lock().is_err()); + } } diff --git a/components/object_store/src/disk_cache.rs b/components/object_store/src/disk_cache.rs index 09ef61eecd..a87a20b406 100644 --- a/components/object_store/src/disk_cache.rs +++ b/components/object_store/src/disk_cache.rs @@ -11,7 +11,7 @@ use std::{collections::BTreeMap, fmt::Display, ops::Range, sync::Arc}; use async_trait::async_trait; use bytes::{Bytes, BytesMut}; -use common_util::time::current_as_rfc3339; +use common_util::{partitioned_lock::PartitionedMutexAsync, time::current_as_rfc3339}; use crc::{Crc, CRC_32_ISCSI}; use futures::stream::BoxStream; use log::{debug, error, info}; @@ -93,6 +93,8 @@ enum Error { source: prost::DecodeError, backtrace: Backtrace, }, + #[snafu(display("disk cache cap must large than 0",))] + InvalidCapacity, } impl From for ObjectStoreError { @@ -117,15 +119,16 @@ struct DiskCache { root_dir: String, cap: usize, // Cache key is used as filename on disk. - cache: Mutex>, + cache: PartitionedMutexAsync>, } impl DiskCache { - fn new(root_dir: String, cap: usize) -> Self { + fn new(root_dir: String, cap: usize, partition_bits: usize) -> Self { + let init_lru = || LruCache::new(cap); Self { root_dir, cap, - cache: Mutex::new(LruCache::new(cap)), + cache: PartitionedMutexAsync::new(init_lru, partition_bits), } } @@ -134,7 +137,7 @@ impl DiskCache { /// The returned value denotes whether succeed. // TODO: We now hold lock when doing IO, possible to release it? async fn update_cache(&self, key: String, value: Option) -> bool { - let mut cache = self.cache.lock().await; + let mut cache = self.cache.lock(&key).await; debug!( "Disk cache update, key:{}, len:{}, cap:{}.", &key, @@ -180,7 +183,7 @@ impl DiskCache { } async fn get(&self, key: &str) -> Option { - let mut cache = self.cache.lock().await; + let mut cache = self.cache.lock(&key).await; if cache.get(key).is_some() { // TODO: release lock when doing IO match self.read_bytes(key).await { @@ -286,11 +289,16 @@ impl DiskCacheStore { cap: usize, page_size: usize, underlying_store: Arc, + partition_bits: usize, ) -> Result { - assert!(cap % page_size == 0); - + ensure!( + cap % (page_size * (1 << partition_bits)) == 0, + InvalidCapacity + ); + let cap_per_part = cap / page_size / (1 << partition_bits); + ensure!(cap_per_part != 0, InvalidCapacity); let _ = Self::create_manifest_if_not_exists(&cache_dir, page_size).await?; - let cache = DiskCache::new(cache_dir.clone(), cap / page_size); + let cache = DiskCache::new(cache_dir.clone(), cap_per_part, partition_bits); Self::recover_cache(&cache_dir, &cache).await?; let size_cache = Arc::new(Mutex::new(LruCache::new(cap / page_size))); @@ -536,7 +544,11 @@ mod test { cache_dir: TempDir, } - async fn prepare_store(page_size: usize, cap: usize) -> StoreWithCacheDir { + async fn prepare_store( + page_size: usize, + cap: usize, + partition_bits: usize, + ) -> StoreWithCacheDir { let local_path = tempdir().unwrap(); let local_store = Arc::new(LocalFileSystem::new_with_prefix(local_path.path()).unwrap()); @@ -546,6 +558,7 @@ mod test { cap, page_size, local_store, + partition_bits, ) .await .unwrap(); @@ -570,7 +583,7 @@ mod test { ), ]; - let store = prepare_store(page_size, 1024).await; + let store = prepare_store(page_size, 1024, 0).await; for (input, expected) in testcases { assert_eq!(store.inner.normalize_range(1024, &input), expected); } @@ -587,7 +600,7 @@ mod test { (32..100, vec![]), ]; - let store = prepare_store(page_size, 1024).await; + let store = prepare_store(page_size, 1024, 0).await; for (input, expected) in testcases { assert_eq!(store.inner.normalize_range(20, &input), expected); } @@ -606,7 +619,7 @@ mod test { // 51 byte let data = b"a b c d e f g h i j k l m n o p q r s t u v w x y z"; let location = Path::from("1.sst"); - let store = prepare_store(page_size, 1024).await; + let store = prepare_store(page_size, 1024, 0).await; let mut buf = BytesMut::with_capacity(data.len() * 4); // extend 4 times, then location will contain 200 bytes @@ -634,13 +647,24 @@ mod test { // remove cached values, then get again { - let mut data_cache = store.inner.cache.cache.lock().await; for range in vec![0..16, 16..32, 32..48, 48..64, 64..80, 80..96, 96..112] { + let data_cache = store + .inner + .cache + .cache + .lock(&DiskCacheStore::cache_key(&location, &range).as_str()) + .await; assert!(data_cache.contains(DiskCacheStore::cache_key(&location, &range).as_str())); assert!(test_file_exists(&store.cache_dir, &location, &range)); } for range in vec![16..32, 48..64, 80..96] { + let mut data_cache = store + .inner + .cache + .cache + .lock(&DiskCacheStore::cache_key(&location, &range).as_str()) + .await; assert!(data_cache .pop(&DiskCacheStore::cache_key(&location, &range)) .is_some()); @@ -661,7 +685,7 @@ mod test { // 51 byte let data = b"a b c d e f g h i j k l m n o p q r s t u v w x y z"; let location = Path::from("remove_cache_file.sst"); - let store = prepare_store(page_size, 32).await; + let store = prepare_store(page_size, 32, 0).await; let mut buf = BytesMut::with_capacity(data.len() * 4); // extend 4 times, then location will contain 200 bytes, but cache cap is 32 for _ in 0..4 { @@ -686,6 +710,70 @@ mod test { assert!(test_file_exists(&store.cache_dir, &location, &(48..64))); } + #[tokio::test] + async fn test_disk_cache_remove_cache_file_two_partition() { + let page_size = 16; + // 51 byte + let data = b"a b c d e f g h i j k l m n o p q r s t u v w x y z"; + let location = Path::from("remove_cache_file_two_partition.sst"); + // partition_cap: 64 / 16 / 2 = 2 + let store = prepare_store(page_size, 64, 1).await; + let mut buf = BytesMut::with_capacity(data.len() * 8); + // extend 8 times + for _ in 0..8 { + buf.extend_from_slice(data); + } + store.inner.put(&location, buf.freeze()).await.unwrap(); + // use seahash + // 0..16: partition 1 + // 16..32 partition 1 + // 32..48 partition 0 + // 48..64 partition 1 + // 64..80 partition 1 + // 80..96 partition 0 + // 96..112 partition 0 + // 112..128 partition 0 + // 128..144 partition 0 + let _ = store.inner.get_range(&location, 0..16).await.unwrap(); + let _ = store.inner.get_range(&location, 16..32).await.unwrap(); + // partition 1 cache is full now + assert!(test_file_exists(&store.cache_dir, &location, &(0..16))); + assert!(test_file_exists(&store.cache_dir, &location, &(16..32))); + + let _ = store.inner.get_range(&location, 32..48).await.unwrap(); + let _ = store.inner.get_range(&location, 80..96).await.unwrap(); + // partition 0 cache is full now + + assert!(test_file_exists(&store.cache_dir, &location, &(32..48))); + assert!(test_file_exists(&store.cache_dir, &location, &(80..96))); + + // insert new entry into partition 0, evict partition 0's oldest entry + let _ = store.inner.get_range(&location, 96..112).await.unwrap(); + assert!(!test_file_exists(&store.cache_dir, &location, &(32..48))); + assert!(test_file_exists(&store.cache_dir, &location, &(80..96))); + + assert!(test_file_exists(&store.cache_dir, &location, &(0..16))); + assert!(test_file_exists(&store.cache_dir, &location, &(16..32))); + + // insert new entry into partition 0, evict partition 0's oldest entry + let _ = store.inner.get_range(&location, 128..144).await.unwrap(); + assert!(!test_file_exists(&store.cache_dir, &location, &(80..96))); + assert!(test_file_exists(&store.cache_dir, &location, &(96..112))); + assert!(test_file_exists(&store.cache_dir, &location, &(128..144))); + + assert!(test_file_exists(&store.cache_dir, &location, &(0..16))); + assert!(test_file_exists(&store.cache_dir, &location, &(16..32))); + + // insert new entry into partition 1, evict partition 1's oldest entry + let _ = store.inner.get_range(&location, 64..80).await.unwrap(); + assert!(!test_file_exists(&store.cache_dir, &location, &(0..16))); + assert!(test_file_exists(&store.cache_dir, &location, &(16..32))); + assert!(test_file_exists(&store.cache_dir, &location, &(64..80))); + + assert!(test_file_exists(&store.cache_dir, &location, &(96..112))); + assert!(test_file_exists(&store.cache_dir, &location, &(128..144))); + } + #[tokio::test] async fn test_disk_cache_manifest() { let cache_dir = tempdir().unwrap(); @@ -696,7 +784,7 @@ mod test { let local_path = tempdir().unwrap(); let local_store = Arc::new(LocalFileSystem::new_with_prefix(local_path.path()).unwrap()); - DiskCacheStore::try_new(cache_root_dir.clone(), 160, 8, local_store) + DiskCacheStore::try_new(cache_root_dir.clone(), 160, 8, local_store, 0) .await .unwrap() }; @@ -716,7 +804,7 @@ mod test { let local_path = tempdir().unwrap(); let local_store = Arc::new(LocalFileSystem::new_with_prefix(local_path.path()).unwrap()); - DiskCacheStore::try_new(cache_root_dir.clone(), 160, 8, local_store) + DiskCacheStore::try_new(cache_root_dir.clone(), 160, 8, local_store, 0) .await .unwrap() }; @@ -740,6 +828,7 @@ mod test { 160, page_size * 2, local_store, + 0, ) .await; @@ -758,7 +847,7 @@ mod test { let local_path = tempdir().unwrap(); let local_store = Arc::new(LocalFileSystem::new_with_prefix(local_path.path()).unwrap()); - DiskCacheStore::try_new(cache_root_dir.clone(), 10240, page_size, local_store) + DiskCacheStore::try_new(cache_root_dir.clone(), 10240, page_size, local_store, 0) .await .unwrap() }; @@ -781,12 +870,16 @@ mod test { let local_path = tempdir().unwrap(); let local_store = Arc::new(LocalFileSystem::new_with_prefix(local_path.path()).unwrap()); - DiskCacheStore::try_new(cache_root_dir.clone(), 160, page_size, local_store) + DiskCacheStore::try_new(cache_root_dir.clone(), 160, page_size, local_store, 0) .await .unwrap() }; - let cache = store.cache.cache.lock().await; for range in vec![16..32, 32..48, 48..64, 64..80, 80..96, 96..112] { + let cache = store + .cache + .cache + .lock(&DiskCacheStore::cache_key(&location, &range).as_str()) + .await; assert!(cache.contains(&DiskCacheStore::cache_key(&location, &range))); assert!(test_file_exists(&cache_dir, &location, &range)); } @@ -808,7 +901,7 @@ mod test { let StoreWithCacheDir { inner: store, cache_dir, - } = prepare_store(16, 1024).await; + } = prepare_store(16, 1024, 0).await; let test_file_name = "corrupted_disk_cache_file"; let test_file_path = Path::from(test_file_name); let test_file_bytes = Bytes::from("corrupted_disk_cache_file_data"); diff --git a/components/object_store/src/mem_cache.rs b/components/object_store/src/mem_cache.rs index d28680be06..926f0795fa 100644 --- a/components/object_store/src/mem_cache.rs +++ b/components/object_store/src/mem_cache.rs @@ -54,9 +54,9 @@ impl MemCache { let cap_per_part = NonZeroUsize::new((mem_cap.get() as f64 / partition_num as f64) as usize) .context(InvalidCapacity)?; - let inin_lru = + let init_lru = || CLruCache::with_config(CLruCacheConfig::new(cap_per_part).with_scale(CustomScale)); - let inner = PartitionedMutex::new(inin_lru, partition_bits); + let inner = PartitionedMutex::new(init_lru, partition_bits); Ok(Self { mem_cap, inner }) }