Skip to content

Commit

Permalink
refactor: disk cache use partition lock (apache#974)
Browse files Browse the repository at this point in the history
## Rationale
Close apache#914

## Detailed Changes
Use `partition lock` in `disk cache`

## Test Plan
add ut.
  • Loading branch information
tanruixiang authored and dust1 committed Aug 9, 2023
1 parent d640c5b commit a1bfe42
Show file tree
Hide file tree
Showing 9 changed files with 202 additions and 25 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions analytic_engine/src/setup.rs
Original file line number Diff line number Diff line change
Expand Up @@ -498,6 +498,7 @@ fn open_storage(
opts.disk_cache_capacity.as_byte() as usize,
opts.disk_cache_page_size.as_byte() as usize,
store,
opts.disk_cache_partition_bits,
)
.await
.context(OpenObjectStore)?,
Expand Down
5 changes: 4 additions & 1 deletion analytic_engine/src/storage_options.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,9 +14,11 @@ pub struct StorageOptions {
pub mem_cache_capacity: ReadableSize,
pub mem_cache_partition_bits: usize,
// 0 means disable disk cache
// Note: disk_cache_capacity % disk_cache_page_size should be 0
// Note: disk_cache_capacity % (disk_cache_page_size * (1 << disk_cache_partition_bits)) should
// be 0
pub disk_cache_capacity: ReadableSize,
pub disk_cache_page_size: ReadableSize,
pub disk_cache_partition_bits: usize,
pub disk_cache_dir: String,
pub object_store: ObjectStoreOptions,
}
Expand All @@ -31,6 +33,7 @@ impl Default for StorageOptions {
disk_cache_dir: root_path.clone(),
disk_cache_capacity: ReadableSize::gb(0),
disk_cache_page_size: ReadableSize::mb(2),
disk_cache_partition_bits: 4,
object_store: ObjectStoreOptions::Local(LocalOptions {
data_dir: root_path,
}),
Expand Down
4 changes: 4 additions & 0 deletions analytic_engine/src/tests/util.rs
Original file line number Diff line number Diff line change
Expand Up @@ -428,6 +428,7 @@ impl Builder {
disk_cache_dir: "".to_string(),
disk_cache_capacity: ReadableSize::mb(0),
disk_cache_page_size: ReadableSize::mb(0),
disk_cache_partition_bits: 0,
object_store: ObjectStoreOptions::Local(LocalOptions {
data_dir: dir.path().to_str().unwrap().to_string(),
}),
Expand Down Expand Up @@ -490,6 +491,7 @@ impl Default for RocksDBEngineBuildContext {
disk_cache_dir: "".to_string(),
disk_cache_capacity: ReadableSize::mb(0),
disk_cache_page_size: ReadableSize::mb(0),
disk_cache_partition_bits: 0,
object_store: ObjectStoreOptions::Local(LocalOptions {
data_dir: dir.path().to_str().unwrap().to_string(),
}),
Expand Down Expand Up @@ -517,6 +519,7 @@ impl Clone for RocksDBEngineBuildContext {
disk_cache_dir: "".to_string(),
disk_cache_capacity: ReadableSize::mb(0),
disk_cache_page_size: ReadableSize::mb(0),
disk_cache_partition_bits: 0,
object_store: ObjectStoreOptions::Local(LocalOptions {
data_dir: dir.path().to_str().unwrap().to_string(),
}),
Expand Down Expand Up @@ -560,6 +563,7 @@ impl Default for MemoryEngineBuildContext {
disk_cache_dir: "".to_string(),
disk_cache_capacity: ReadableSize::mb(0),
disk_cache_page_size: ReadableSize::mb(0),
disk_cache_partition_bits: 0,
object_store: ObjectStoreOptions::Local(LocalOptions {
data_dir: dir.path().to_str().unwrap().to_string(),
}),
Expand Down
1 change: 1 addition & 0 deletions common_types/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ datafusion = { workspace = true, optional = true }
murmur3 = "0.4.1"
paste = { workspace = true }
prost = { workspace = true }
seahash = "4.1.0"
serde = { workspace = true }
serde_json = { workspace = true }
snafu = { workspace = true }
Expand Down
1 change: 1 addition & 0 deletions common_types/src/hash.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ use std::hash::BuildHasher;
use ahash::AHasher;
use byteorder::{ByteOrder, LittleEndian};
use murmur3::murmur3_x64_128;
pub use seahash::SeaHasher;
pub fn hash64(mut bytes: &[u8]) -> u64 {
let mut out = [0; 16];
murmur3_x64_128(&mut bytes, 0, &mut out);
Expand Down
75 changes: 74 additions & 1 deletion common_util/src/partitioned_lock.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,9 @@ use std::{
sync::{Mutex, MutexGuard, RwLock, RwLockReadGuard, RwLockWriteGuard},
};

use common_types::hash::build_fixed_seed_ahasher;
use common_types::hash::{build_fixed_seed_ahasher, SeaHasher};
use tokio;

/// Simple partitioned `RwLock`
pub struct PartitionedRwLock<T> {
partitions: Vec<RwLock<T>>,
Expand Down Expand Up @@ -100,6 +102,45 @@ impl<T> PartitionedMutex<T> {
}
}

#[derive(Debug)]
pub struct PartitionedMutexAsync<T> {
partitions: Vec<tokio::sync::Mutex<T>>,
partition_mask: usize,
}

impl<T> PartitionedMutexAsync<T> {
pub fn new<F>(init_fn: F, partition_bit: usize) -> Self
where
F: Fn() -> T,
{
let partition_num = 1 << partition_bit;
let partitions = (0..partition_num)
.map(|_| tokio::sync::Mutex::new(init_fn()))
.collect::<Vec<tokio::sync::Mutex<T>>>();
Self {
partitions,
partition_mask: partition_num - 1,
}
}

pub async fn lock<K: Eq + Hash>(&self, key: &K) -> tokio::sync::MutexGuard<'_, T> {
let mutex = self.get_partition(key);

mutex.lock().await
}

fn get_partition<K: Eq + Hash>(&self, key: &K) -> &tokio::sync::Mutex<T> {
let mut hasher = SeaHasher::new();
key.hash(&mut hasher);
&self.partitions[(hasher.finish() as usize) & self.partition_mask]
}

#[cfg(test)]
async fn get_partition_by_index(&self, index: usize) -> &tokio::sync::Mutex<T> {
&self.partitions[index]
}
}

#[cfg(test)]
mod tests {
use std::collections::HashMap;
Expand Down Expand Up @@ -142,6 +183,24 @@ mod tests {
}
}

#[tokio::test]
async fn test_partitioned_mutex_async() {
let init_hmap = HashMap::new;
let test_locked_map = PartitionedMutexAsync::new(init_hmap, 4);
let test_key = "test_key".to_string();
let test_value = "test_value".to_string();

{
let mut map = test_locked_map.lock(&test_key).await;
map.insert(test_key.clone(), test_value.clone());
}

{
let map = test_locked_map.lock(&test_key).await;
assert_eq!(map.get(&test_key).unwrap(), &test_value);
}
}

#[test]
fn test_partitioned_mutex_vis_different_partition() {
let init_vec = Vec::<i32>::new;
Expand All @@ -168,4 +227,18 @@ mod tests {
assert!(mutex_second_try_lock.try_write().is_ok());
assert!(mutex_first.try_write().is_err());
}

#[tokio::test]
async fn test_partitioned_mutex_async_vis_different_partition() {
let init_vec = Vec::<i32>::new;
let test_locked_map = PartitionedMutexAsync::new(init_vec, 4);
let mutex_first = test_locked_map.get_partition_by_index(0).await;

let mut _tmp_data = mutex_first.lock().await;
assert!(mutex_first.try_lock().is_err());

let mutex_second = test_locked_map.get_partition_by_index(1).await;
assert!(mutex_second.try_lock().is_ok());
assert!(mutex_first.try_lock().is_err());
}
}
Loading

0 comments on commit a1bfe42

Please sign in to comment.