diff --git a/libindy_vdr/Cargo.toml b/libindy_vdr/Cargo.toml index ae5d95f0..cb8046c5 100644 --- a/libindy_vdr/Cargo.toml +++ b/libindy_vdr/Cargo.toml @@ -60,8 +60,6 @@ thiserror = "1.0" time = { version = "=0.3.20", features = ["parsing"] } url = "2.2.2" zmq = "0.9" -async-trait = "0.1.77" -async-lock = "3.3.0" sled = "0.34.7" [dev-dependencies] diff --git a/libindy_vdr/src/pool/cache/mod.rs b/libindy_vdr/src/pool/cache/mod.rs index 7950eb20..fea48abf 100644 --- a/libindy_vdr/src/pool/cache/mod.rs +++ b/libindy_vdr/src/pool/cache/mod.rs @@ -1,17 +1,17 @@ -use async_lock::RwLock; -use async_trait::async_trait; -use std::{fmt::Display, sync::Arc}; +use std::{ + fmt::Display, + sync::{Arc, RwLock}, +}; pub mod storage; pub mod strategy; -#[async_trait] pub trait CacheStrategy: Send + Sync + 'static { - async fn get(&self, key: &K) -> Option; + fn get(&self, key: &K) -> Option; - async fn remove(&mut self, key: &K) -> Option; + fn remove(&self, key: &K) -> Option; - async fn insert(&mut self, key: K, value: V, custom_exp_offset: Option) -> Option; + fn insert(&self, key: K, value: V, custom_exp_offset: Option) -> Option; } pub struct Cache { @@ -34,23 +34,28 @@ impl Cache { } } - pub async fn get(&self, key: &K) -> Option { + pub fn get(&self, key: &K) -> Option { let full_key = self.full_key(key); - self.storage.read().await.get(&full_key).await + if let Ok(storage) = self.storage.read() { + return storage.get(&full_key); + } + None } - pub async fn remove(&self, key: &K) -> Option { + pub fn remove(&self, key: &K) -> Option { let full_key = self.full_key(key); - self.storage.write().await.remove(&full_key).await + if let Ok(storage) = self.storage.write() { + return storage.remove(&full_key); + } + None } - pub async fn insert(&self, key: K, value: V, custom_exp_offset: Option) -> Option { + pub fn insert(&self, key: K, value: V, custom_exp_offset: Option) -> Option { let full_key = self.full_key(&key); - self.storage - .write() - .await - .insert(full_key, value, custom_exp_offset) - .await + if let Ok(storage) = self.storage.write() { + return storage.insert(full_key, value, custom_exp_offset); + } + None } } diff --git a/libindy_vdr/src/pool/cache/strategy.rs b/libindy_vdr/src/pool/cache/strategy.rs index 1f6a2fca..0cf4b2fe 100644 --- a/libindy_vdr/src/pool/cache/strategy.rs +++ b/libindy_vdr/src/pool/cache/strategy.rs @@ -1,8 +1,13 @@ use super::storage::OrderedHashMap; use super::CacheStrategy; -use async_lock::Mutex; -use async_trait::async_trait; -use std::{collections::BTreeMap, fmt::Debug, hash::Hash, sync::Arc, time::SystemTime}; +use std::{ + collections::BTreeMap, + fmt::Debug, + hash::Hash, + ops::Deref, + sync::{Arc, Mutex}, + time::SystemTime, +}; /// A simple struct to hold a value and the expiry offset /// needed because items can be inserted with custom ttl values @@ -50,89 +55,94 @@ impl CacheStrategy for Arc> { - async fn get(&self, key: &K) -> Option { - self.get(key).await + fn get(&self, key: &K) -> Option { + self.deref().get(key) } - async fn remove(&mut self, key: &K) -> Option { - self.remove(key).await + fn remove(&self, key: &K) -> Option { + self.deref().remove(key) } - async fn insert(&mut self, key: K, value: V, custom_exp_offset: Option) -> Option { - self.insert(key, value, custom_exp_offset).await + fn insert(&self, key: K, value: V, custom_exp_offset: Option) -> Option { + self.deref().insert(key, value, custom_exp_offset) } } -#[async_trait] impl CacheStrategy for CacheStrategyTTL { - async fn get(&self, key: &K) -> Option { - let mut store_lock = self.store.lock().await; - let current_time = SystemTime::now() - .duration_since(self.create_time) - .unwrap() - .as_millis(); - let get_res = match store_lock.get(key) { - Some((ts, v)) => { - if current_time < *ts { - Some((*ts, v.clone())) - } else { - store_lock.remove(key); - None + fn get(&self, key: &K) -> Option { + if let Some(mut store_lock) = self.store.lock().ok() { + let current_time = SystemTime::now() + .duration_since(self.create_time) + .unwrap() + .as_millis(); + let get_res = match store_lock.get(key) { + Some((ts, v)) => { + if current_time < *ts { + Some((*ts, v.clone())) + } else { + store_lock.remove(key); + None + } } + None => None, + }; + // update the timestamp if the entry is still valid + if let Some((_, ref v)) = get_res { + store_lock.re_order(key, current_time + v.expire_offset); } - None => None, - }; - // update the timestamp if the entry is still valid - if let Some((_, ref v)) = get_res { - store_lock.re_order(key, current_time + v.expire_offset); + return get_res.map(|(_, v)| v.value); } - get_res.map(|(_, v)| v.value) + None } - async fn remove(&mut self, key: &K) -> Option { - self.store.lock().await.remove(key).map(|(_, v)| v.value) + fn remove(&self, key: &K) -> Option { + if let Some(mut store) = self.store.lock().ok() { + return store.remove(key).map(|(_, v)| v.value); + } + None } - async fn insert(&mut self, key: K, value: V, custom_exp_offset: Option) -> Option { - let mut store_lock = self.store.lock().await; - let current_ts = SystemTime::now() - .duration_since(self.create_time) - .unwrap() - .as_millis(); - - // remove expired entries - while store_lock.len() > 0 - && store_lock - .get_first_key_value() - .map(|(_, ts, _)| ts.clone() < current_ts) - .unwrap_or(false) - { - store_lock.remove_first(); - } + fn insert(&self, key: K, value: V, custom_exp_offset: Option) -> Option { + if let Some(mut store_lock) = self.store.lock().ok() { + let current_ts = SystemTime::now() + .duration_since(self.create_time) + .unwrap() + .as_millis(); - // remove the oldest item if the cache is still full - if store_lock.len() >= self.capacity && store_lock.get(&key).is_none() { - // remove the oldest item - let removal_key = store_lock.get_first_key_value().map(|(k, _, _)| k.clone()); - if let Some(removal_key) = removal_key { - store_lock.remove(&removal_key); + // remove expired entries + while store_lock.len() > 0 + && store_lock + .get_first_key_value() + .map(|(_, ts, _)| ts.clone() < current_ts) + .unwrap_or(false) + { + store_lock.remove_first(); } - }; - let exp_offset = custom_exp_offset.unwrap_or(self.expire_after); - store_lock - .insert( - key, - TTLCacheItem { - value: value, - expire_offset: exp_offset, - }, - current_ts + exp_offset, - ) - .map(|v| v.value) + // remove the oldest item if the cache is still full + if store_lock.len() >= self.capacity && store_lock.get(&key).is_none() { + // remove the oldest item + let removal_key = store_lock.get_first_key_value().map(|(k, _, _)| k.clone()); + if let Some(removal_key) = removal_key { + store_lock.remove(&removal_key); + } + }; + + let exp_offset = custom_exp_offset.unwrap_or(self.expire_after); + return store_lock + .insert( + key, + TTLCacheItem { + value: value, + expire_offset: exp_offset, + }, + current_ts + exp_offset, + ) + .map(|v| v.value); + } + None } } @@ -158,53 +168,31 @@ mod tests { let caches = vec![cache, fs_cache]; block_on(async { for cache in caches { - cache - .insert("key".to_string(), "value".to_string(), None) - .await; - assert_eq!( - cache.get(&"key".to_string()).await, - Some("value".to_string()) - ); - cache - .insert("key1".to_string(), "value1".to_string(), None) - .await; - cache - .insert("key2".to_string(), "value2".to_string(), None) - .await; - assert_eq!(cache.get(&"key".to_string()).await, None); - cache - .insert("key3".to_string(), "value3".to_string(), None) - .await; - cache.get(&"key2".to_string()).await; - cache - .insert("key4".to_string(), "value4".to_string(), None) - .await; + cache.insert("key".to_string(), "value".to_string(), None); + assert_eq!(cache.get(&"key".to_string()), Some("value".to_string())); + cache.insert("key1".to_string(), "value1".to_string(), None); + cache.insert("key2".to_string(), "value2".to_string(), None); + assert_eq!(cache.get(&"key".to_string()), None); + cache.insert("key3".to_string(), "value3".to_string(), None); + cache.get(&"key2".to_string()); + cache.insert("key4".to_string(), "value4".to_string(), None); // key2 should not be evicted because of LRU assert_eq!( - cache.remove(&"key2".to_string()).await, + cache.remove(&"key2".to_string()), Some("value2".to_string()) ); // key3 should be evicted because it was bumped to back after key2 was accessed - assert_eq!(cache.get(&"key3".to_string()).await, None); - cache - .insert("key5".to_string(), "value5".to_string(), None) - .await; + assert_eq!(cache.get(&"key3".to_string()), None); + cache.insert("key5".to_string(), "value5".to_string(), None); thread::sleep(std::time::Duration::from_millis(6)); - assert_eq!(cache.get(&"key5".to_string()).await, None); + assert_eq!(cache.get(&"key5".to_string()), None); // test ttl config - cache - .insert("key6".to_string(), "value6".to_string(), Some(1)) - .await; - cache - .insert("key7".to_string(), "value7".to_string(), None) - .await; + cache.insert("key6".to_string(), "value6".to_string(), Some(1)); + cache.insert("key7".to_string(), "value7".to_string(), None); // wait until value6 expires thread::sleep(std::time::Duration::from_millis(1)); - assert_eq!(cache.get(&"key6".to_string()).await, None); - assert_eq!( - cache.get(&"key7".to_string()).await, - Some("value7".to_string()) - ); + assert_eq!(cache.get(&"key6".to_string()), None); + assert_eq!(cache.get(&"key7".to_string()), Some("value7".to_string())); } std::fs::remove_dir_all(cache_location).unwrap(); }); diff --git a/libindy_vdr/src/pool/helpers.rs b/libindy_vdr/src/pool/helpers.rs index e5f1aea0..ac2955b8 100644 --- a/libindy_vdr/src/pool/helpers.rs +++ b/libindy_vdr/src/pool/helpers.rs @@ -223,7 +223,7 @@ pub async fn perform_ledger_request( if is_read_req { if let Some(cache) = cache_opt.clone() { - if let Some((response, meta)) = cache.get(&cache_key).await { + if let Some((response, meta)) = cache.get(&cache_key) { return Ok((RequestResult::Reply(response), meta)); } } @@ -240,9 +240,7 @@ pub async fn perform_ledger_request( } } if let Some(cache) = cache_opt { - cache - .insert(cache_key, (response.to_string(), meta.clone()), None) - .await; + cache.insert(cache_key, (response.to_string(), meta.clone()), None); } } }