-
-
Notifications
You must be signed in to change notification settings - Fork 75
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Support TTL and TTI jitter #453
Comments
Hi. I am glad to hear that you guys are using
Yes, and there is no need to extend // Cargo.toml
//
// [dependencies]
// moka = { version = "0.12.8", features = ["sync"] }
// rand = "0.8.5"
use std::time::{Duration, Instant};
use moka::{sync::Cache, Expiry};
use rand::{
distributions::{Distribution, Uniform},
Rng,
};
/// A `moka::Expiry` implementation that adds jitter to the expiry duration.
pub struct JitteredExpiry<J> {
/// Optional time-to-live duration.
time_to_live: Option<Duration>,
/// Optional time-to-idle duration.
time_to_idle: Option<Duration>,
/// The distribution to randomly generate the jitter. The jitter is added to
/// or subtracted from the expiry duration.
jitter_gen: J,
}
impl<J> JitteredExpiry<J>
where
J: Distribution<Duration>,
{
pub fn new(
time_to_live: Option<Duration>,
time_to_idle: Option<Duration>,
jitter_gen: J,
) -> Self {
Self {
time_to_live,
time_to_idle,
jitter_gen,
}
}
/// Calculates the expiry duration after a write operation.
pub fn calc_expiry_for_write(&self) -> Option<Duration> {
if matches!((self.time_to_live, self.time_to_idle), (None, None)) {
return None;
}
let expiry = match (self.time_to_live, self.time_to_idle) {
(Some(ttl), None) => ttl,
(None, Some(tti)) => tti,
(Some(ttl), Some(tti)) => ttl.min(tti),
(None, None) => unreachable!(),
};
Some(self.add_jitter(expiry))
}
/// Calculates the expiry duration after a read operation.
pub fn calc_expiry_for_read(&self, read_at: Instant, modified_at: Instant) -> Option<Duration> {
if matches!((self.time_to_live, self.time_to_idle), (None, None)) {
return None;
}
let expiry = match (self.time_to_live, self.time_to_idle) {
(Some(ttl), None) => {
let elapsed = Self::elapsed_since_write(read_at, modified_at);
Self::remaining_to_ttl(ttl, elapsed)
}
(None, Some(tti)) => tti,
(Some(ttl), Some(tti)) => {
// Ensure that the expiry duration does not exceed the
// time-to-live since last write.
let elapsed = Self::elapsed_since_write(read_at, modified_at);
let remaining = Self::remaining_to_ttl(ttl, elapsed);
tti.min(remaining)
}
(None, None) => unreachable!(),
};
Some(self.add_jitter(expiry))
}
/// Calculates the elapsed time between `modified_at` and `read_at`.
fn elapsed_since_write(read_at: Instant, modified_at: Instant) -> Duration {
// NOTE: `duration_since` panics if `read_at` is earlier than `modified_at`.
if read_at >= modified_at {
read_at.duration_since(modified_at)
} else {
Duration::default() // zero duration
}
}
/// Calculates the remaining time to live based on the `ttl` and `elapsed` time.
fn remaining_to_ttl(ttl: Duration, elapsed: Duration) -> Duration {
ttl.saturating_sub(elapsed)
}
/// Adds jitter to the given duration.
fn add_jitter(&self, duration: Duration) -> Duration {
let mut rng = rand::thread_rng();
let jitter = self.jitter_gen.sample(&mut rng);
// Add or subtract the jitter to/from the duration.
if rng.gen() {
duration.saturating_add(jitter)
} else {
duration.saturating_sub(jitter)
}
}
}
/// The implementation of the `moka::Expiry` trait for `JitteredExpiry`.
/// https://docs.rs/moka/latest/moka/policy/trait.Expiry.html
impl<K, V, J> Expiry<K, V> for JitteredExpiry<J>
where
J: Distribution<Duration>,
{
/// Specifies that the entry should be automatically removed from the cache
/// once the duration has elapsed after the entry’s creation. This method is
/// called for cache write methods such as `insert` and `get_with` but only
/// when the key was not present in the cache.
fn expire_after_create(&self, _key: &K, _value: &V, _created_at: Instant) -> Option<Duration> {
dbg!(self.calc_expiry_for_write())
}
/// Specifies that the entry should be automatically removed from the cache
/// once the duration has elapsed after the replacement of its value. This
/// method is called for cache write methods such as `insert` but only when
/// the key is already present in the cache.
fn expire_after_update(
&self,
_key: &K,
_value: &V,
_updated_at: Instant,
duration_until_expiry: Option<Duration>,
) -> Option<Duration> {
dbg!(self.calc_expiry_for_write().or(duration_until_expiry))
}
/// Specifies that the entry should be automatically removed from the cache
/// once the duration has elapsed after its last read. This method is called
/// for cache read methods such as `get` and `get_with` but only when the
/// key is present in the cache.
fn expire_after_read(
&self,
_key: &K,
_value: &V,
read_at: Instant,
duration_until_expiry: Option<Duration>,
last_modified_at: Instant,
) -> Option<Duration> {
dbg!(self
.calc_expiry_for_read(read_at, last_modified_at)
.or(duration_until_expiry))
}
}
fn main() {
let expiry = JitteredExpiry::new(
// TTL 10 minutes
Some(Duration::from_secs(10 * 60)),
// TTI 3 minutes
Some(Duration::from_secs(3 * 60)),
// Jitter +/- 30 seconds, 1 second resolution, uniformly distributed
Uniform::from(0..30).map(Duration::from_secs),
);
let cache = Cache::builder().expire_after(expiry).build();
const NUM_KEYS: usize = 10;
// Insert some key-value pairs.
for key in 0..NUM_KEYS {
cache.insert(key, format!("value-{key}"));
}
// Get all entries.
for key in 0..NUM_KEYS {
assert_eq!(cache.get(&key), Some(format!("value-{key}")));
}
// Update all entries.
for key in 0..NUM_KEYS {
cache.insert(key, format!("new-value-{key}"));
}
} I did not test the code, so please let me know if you encounter any issues. If it works for you, I will add it to the |
Thank you so much! |
I am adding the example |
Hi, I absolutely love the project. We use it in many places.
Would it be possible to allow jitter on the durations? We use moka for various RPC caching. When many entries expire at the same time, we end up sending a big spike to the upstream. I want to jitter ttl duration to combat this issue. I'd love to contribute too, but probably gonna need a few pointers.
The text was updated successfully, but these errors were encountered: