Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support TTL and TTI jitter #453

Closed
chebbyChefNEQ opened this issue Aug 24, 2024 · 3 comments · Fixed by #489
Closed

Support TTL and TTI jitter #453

chebbyChefNEQ opened this issue Aug 24, 2024 · 3 comments · Fixed by #489

Comments

@chebbyChefNEQ
Copy link

Hi, I absolutely love the project. We use it in many places.

Would it be possible to allow jitter on the durations? We use moka for various RPC caching. When many entries expire at the same time, we end up sending a big spike to the upstream. I want to jitter ttl duration to combat this issue. I'd love to contribute too, but probably gonna need a few pointers.

@tatsuya6502
Copy link
Member

Hi. I am glad to hear that you guys are using moka and happy with it!

Would it be possible to allow jitter on the durations?

Yes, and there is no need to extend moka. You can use moka::Expiry trait (doc) to implement a custom expiry policy. Here is an example:

// Cargo.toml
//
// [dependencies]
// moka = { version = "0.12.8", features = ["sync"] }
// rand = "0.8.5"

use std::time::{Duration, Instant};

use moka::{sync::Cache, Expiry};
use rand::{
    distributions::{Distribution, Uniform},
    Rng,
};

/// A `moka::Expiry` implementation that adds jitter to the expiry duration.
pub struct JitteredExpiry<J> {
    /// Optional time-to-live duration.
    time_to_live: Option<Duration>,
    /// Optional time-to-idle duration.
    time_to_idle: Option<Duration>,
    /// The distribution to randomly generate the jitter. The jitter is added to
    /// or subtracted from the expiry duration.
    jitter_gen: J,
}

impl<J> JitteredExpiry<J>
where
    J: Distribution<Duration>,
{
    pub fn new(
        time_to_live: Option<Duration>,
        time_to_idle: Option<Duration>,
        jitter_gen: J,
    ) -> Self {
        Self {
            time_to_live,
            time_to_idle,
            jitter_gen,
        }
    }

    /// Calculates the expiry duration after a write operation.
    pub fn calc_expiry_for_write(&self) -> Option<Duration> {
        if matches!((self.time_to_live, self.time_to_idle), (None, None)) {
            return None;
        }

        let expiry = match (self.time_to_live, self.time_to_idle) {
            (Some(ttl), None) => ttl,
            (None, Some(tti)) => tti,
            (Some(ttl), Some(tti)) => ttl.min(tti),
            (None, None) => unreachable!(),
        };

        Some(self.add_jitter(expiry))
    }

    /// Calculates the expiry duration after a read operation.
    pub fn calc_expiry_for_read(&self, read_at: Instant, modified_at: Instant) -> Option<Duration> {
        if matches!((self.time_to_live, self.time_to_idle), (None, None)) {
            return None;
        }

        let expiry = match (self.time_to_live, self.time_to_idle) {
            (Some(ttl), None) => {
                let elapsed = Self::elapsed_since_write(read_at, modified_at);
                Self::remaining_to_ttl(ttl, elapsed)
            }
            (None, Some(tti)) => tti,
            (Some(ttl), Some(tti)) => {
                // Ensure that the expiry duration does not exceed the
                // time-to-live since last write.
                let elapsed = Self::elapsed_since_write(read_at, modified_at);
                let remaining = Self::remaining_to_ttl(ttl, elapsed);
                tti.min(remaining)
            }
            (None, None) => unreachable!(),
        };

        Some(self.add_jitter(expiry))
    }

    /// Calculates the elapsed time between `modified_at` and `read_at`.
    fn elapsed_since_write(read_at: Instant, modified_at: Instant) -> Duration {
        // NOTE: `duration_since` panics if `read_at` is earlier than `modified_at`.
        if read_at >= modified_at {
            read_at.duration_since(modified_at)
        } else {
            Duration::default() // zero duration
        }
    }

    /// Calculates the remaining time to live based on the `ttl` and `elapsed` time.
    fn remaining_to_ttl(ttl: Duration, elapsed: Duration) -> Duration {
        ttl.saturating_sub(elapsed)
    }

    /// Adds jitter to the given duration.
    fn add_jitter(&self, duration: Duration) -> Duration {
        let mut rng = rand::thread_rng();
        let jitter = self.jitter_gen.sample(&mut rng);

        // Add or subtract the jitter to/from the duration.
        if rng.gen() {
            duration.saturating_add(jitter)
        } else {
            duration.saturating_sub(jitter)
        }
    }
}

/// The implementation of the `moka::Expiry` trait for `JitteredExpiry`.
/// https://docs.rs/moka/latest/moka/policy/trait.Expiry.html
impl<K, V, J> Expiry<K, V> for JitteredExpiry<J>
where
    J: Distribution<Duration>,
{
    /// Specifies that the entry should be automatically removed from the cache
    /// once the duration has elapsed after the entry’s creation. This method is
    /// called for cache write methods such as `insert` and `get_with` but only
    /// when the key was not present in the cache.
    fn expire_after_create(&self, _key: &K, _value: &V, _created_at: Instant) -> Option<Duration> {
        dbg!(self.calc_expiry_for_write())
    }

    /// Specifies that the entry should be automatically removed from the cache
    /// once the duration has elapsed after the replacement of its value. This
    /// method is called for cache write methods such as `insert` but only when
    /// the key is already present in the cache.
    fn expire_after_update(
        &self,
        _key: &K,
        _value: &V,
        _updated_at: Instant,
        duration_until_expiry: Option<Duration>,
    ) -> Option<Duration> {
        dbg!(self.calc_expiry_for_write().or(duration_until_expiry))
    }

    /// Specifies that the entry should be automatically removed from the cache
    /// once the duration has elapsed after its last read. This method is called
    /// for cache read methods such as `get` and `get_with` but only when the
    /// key is present in the cache.
    fn expire_after_read(
        &self,
        _key: &K,
        _value: &V,
        read_at: Instant,
        duration_until_expiry: Option<Duration>,
        last_modified_at: Instant,
    ) -> Option<Duration> {
        dbg!(self
            .calc_expiry_for_read(read_at, last_modified_at)
            .or(duration_until_expiry))
    }
}

fn main() {
    let expiry = JitteredExpiry::new(
        // TTL 10 minutes
        Some(Duration::from_secs(10 * 60)),
        // TTI 3 minutes
        Some(Duration::from_secs(3 * 60)),
        // Jitter +/- 30 seconds, 1 second resolution, uniformly distributed
        Uniform::from(0..30).map(Duration::from_secs),
    );

    let cache = Cache::builder().expire_after(expiry).build();

    const NUM_KEYS: usize = 10;

    // Insert some key-value pairs.
    for key in 0..NUM_KEYS {
        cache.insert(key, format!("value-{key}"));
    }

    // Get all entries.
    for key in 0..NUM_KEYS {
        assert_eq!(cache.get(&key), Some(format!("value-{key}")));
    }

    // Update all entries.
    for key in 0..NUM_KEYS {
        cache.insert(key, format!("new-value-{key}"));
    }
}

I did not test the code, so please let me know if you encounter any issues. If it works for you, I will add it to the examples directory.

@chebbyChefNEQ
Copy link
Author

Thank you so much!

@tatsuya6502
Copy link
Member

I am adding the example jittered_expiry_policy_sync to the example directory via #489.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging a pull request may close this issue.

2 participants