Skip to content

Commit

Permalink
Add an example called reinsert_expired_entries_sync
Browse files Browse the repository at this point in the history
  • Loading branch information
tatsuya6502 committed Jan 19, 2024
1 parent 9303de3 commit d206c68
Show file tree
Hide file tree
Showing 3 changed files with 165 additions and 0 deletions.
4 changes: 4 additions & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -146,6 +146,10 @@ required-features = ["sync"]
name = "eviction_listener_sync"
required-features = ["sync"]

[[example]]
name = "reinsert_expired_entries_sync"
required-features = ["sync"]

[[example]]
name = "size_aware_eviction_sync"
required-features = ["sync"]
Expand Down
8 changes: 8 additions & 0 deletions examples/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,14 @@ existence of the entry.
- Beside the cache APIs, uses `BTreeMap`, `Arc` and mpsc channel (multi-producer,
single consumer channel).

- [reinsert_expired_entries_sync](./reinsert_expired_enties_sync.rs)
- Reinserts the expired entries into the cache using eviction listener and
worker threads.
- Spawns two worker threads; one for reinserting entries, and the other for
calling `run_pending_tasks`.
- Uses a mpsc channel (multi-producer, single consumer channel) to send commands
from the eviction listener to the first worker thread.

## Check out the API Documentation too!

The examples are not meant to be exhaustive. Please check the
Expand Down
153 changes: 153 additions & 0 deletions examples/reinsert_expired_entries_sync.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,153 @@
//! This example demonstrates how to write an eviction listener that will reinsert
//! the expired entries.
//!
//! We cannot make the eviction listener directly reinsert the entries, because it
//! will lead to a deadlock in some conditions. Instead, we will create a worker
//! thread to do the reinsertion, and create a mpsc channel to send commands from the
//! eviction listener to the worker thread.
use std::{
sync::{
atomic::{AtomicBool, Ordering},
mpsc::{self, Sender},
Arc, Mutex, OnceLock,

Check failure on line 13 in examples/reinsert_expired_entries_sync.rs

View workflow job for this annotation

GitHub Actions / test (1.65.0)

use of unstable library feature 'once_cell'

Check failure on line 13 in examples/reinsert_expired_entries_sync.rs

View workflow job for this annotation

GitHub Actions / test (1.65.0)

use of unstable library feature 'once_cell'
},
thread,
time::Duration,
};

use moka::{notification::RemovalCause, sync::Cache};
use quanta::Instant;

Check failure on line 20 in examples/reinsert_expired_entries_sync.rs

View workflow job for this annotation

GitHub Actions / test (beta)

unresolved import `quanta`

Check failure on line 20 in examples/reinsert_expired_entries_sync.rs

View workflow job for this annotation

GitHub Actions / test (stable)

unresolved import `quanta`

Check failure on line 20 in examples/reinsert_expired_entries_sync.rs

View workflow job for this annotation

GitHub Actions / test (1.65.0)

unresolved import `quanta`

Check failure on line 20 in examples/reinsert_expired_entries_sync.rs

View workflow job for this annotation

GitHub Actions / test (stable)

unresolved import `quanta`

Check failure on line 20 in examples/reinsert_expired_entries_sync.rs

View workflow job for this annotation

GitHub Actions / test (beta)

unresolved import `quanta`

/// The cache key type.
pub type Key = String;
/// The cache value type.
pub type Value = u32;

/// Command for the worker thread.
pub enum Command {
/// (Re)insert the entry with the given key and value.
Insert(Key, Value),
/// Shutdown the worker thread.
Shutdown,
}

fn main() {
// Create a multi-producer single-consumer (mpsc) channel to send commands
// from the eviction listener to the worker thread.
let (snd, rcv) = mpsc::channel();

// Wrap the Sender (snd) with a Mutex and set to a static OnceLock.
//
// Cache requires an eviction listener to be Sync as it will be executed by
// multiple threads. However the Sender (snd) of the channel is not Sync, so the
// eviction listener cannot capture the Sender directly.
//
// We are going to solve this by making the Sender globally accessible via the
// static OnceLock, and make the eviction listener to clone it per thread.
static SND: OnceLock<Mutex<Sender<Command>>> = OnceLock::new();

Check failure on line 48 in examples/reinsert_expired_entries_sync.rs

View workflow job for this annotation

GitHub Actions / test (1.65.0)

use of unstable library feature 'once_cell'

Check failure on line 48 in examples/reinsert_expired_entries_sync.rs

View workflow job for this annotation

GitHub Actions / test (1.65.0)

use of unstable library feature 'once_cell'

Check failure on line 48 in examples/reinsert_expired_entries_sync.rs

View workflow job for this annotation

GitHub Actions / test (1.65.0)

use of unstable library feature 'once_cell'

Check failure on line 48 in examples/reinsert_expired_entries_sync.rs

View workflow job for this annotation

GitHub Actions / test (1.65.0)

use of unstable library feature 'once_cell'

Check failure on line 48 in examples/reinsert_expired_entries_sync.rs

View workflow job for this annotation

GitHub Actions / test (1.65.0)

use of unstable library feature 'once_cell'

Check failure on line 48 in examples/reinsert_expired_entries_sync.rs

View workflow job for this annotation

GitHub Actions / test (1.65.0)

use of unstable library feature 'once_cell'
SND.set(Mutex::new(snd.clone())).unwrap();

Check failure on line 49 in examples/reinsert_expired_entries_sync.rs

View workflow job for this annotation

GitHub Actions / test (1.65.0)

use of unstable library feature 'once_cell'

Check failure on line 49 in examples/reinsert_expired_entries_sync.rs

View workflow job for this annotation

GitHub Actions / test (1.65.0)

use of unstable library feature 'once_cell'

// Create the eviction listener.
let listener = move |key: Arc<String>, value: u32, cause: RemovalCause| {
// Keep a clone of the Sender in our thread-local variable, so that we can
// send a command without locking the Mutex every time.
thread_local! {
static THREAD_SND: Sender<Command> = SND.get().unwrap().lock().unwrap().clone();

Check failure on line 56 in examples/reinsert_expired_entries_sync.rs

View workflow job for this annotation

GitHub Actions / test (1.65.0)

use of unstable library feature 'once_cell'

Check failure on line 56 in examples/reinsert_expired_entries_sync.rs

View workflow job for this annotation

GitHub Actions / test (1.65.0)

use of unstable library feature 'once_cell'
}

println!("{} was evicted. value: {} ({:?})", key, value, cause);

// If the entry was removed due to expiration, send a command to the channel
// to reinsert the entry with a modified value.
if cause == RemovalCause::Expired {
let new_value = value * 2;
let command = Command::Insert(key.to_string(), new_value);
THREAD_SND.with(|snd| snd.send(command).expect("Cannot send"));
}

// Do nothing if the entry was removed by one of the following reasons:
// - Reached to the capacity limit. (RemovalCause::Size)
// - Manually invalidated. (RemovalCause::Explicit)
};

const MAX_CAPACITY: u64 = 7;
const TTL: Duration = Duration::from_secs(3);

// Create a cache with the max capacity, time-to-live and the eviction listener.
let cache = Arc::new(
Cache::builder()
.max_capacity(MAX_CAPACITY)
.time_to_live(TTL)
.eviction_listener(listener)
.build(),
);

// Spawn the worker thread that receives commands from the channel and reinserts
// the entries.
let worker1 = {
let cache = Arc::clone(&cache);

thread::spawn(move || {
// Repeat until receiving a shutdown command.
loop {
match rcv.recv() {
Ok(Command::Insert(key, value)) => {
println!("Reinserting {} with value {}.", key, value);
cache.insert(key, value);
}
Ok(Command::Shutdown) => break,
Err(e) => {
eprintln!("Cannot receive a command: {:?}", e);
break;
}
}
}

println!("Shutdown the worker thread.");
})
};

// Spawn another worker thread that calls `cache.run_pending_tasks()` every 300
// milliseconds.
let shutdown = Arc::new(AtomicBool::new(false));
let worker2 = {
let cache = Arc::clone(&cache);
let shutdown = Arc::clone(&shutdown);

thread::spawn(move || {
let interval = Duration::from_millis(300);
let mut sleep_duration = interval;

// Repeat until the shutdown latch is set.
while !shutdown.load(Ordering::Relaxed) {
thread::sleep(sleep_duration);
let start = Instant::now();
cache.run_pending_tasks();
sleep_duration = interval.saturating_sub(start.elapsed());
}
})
};

// Insert 9 entries.
// - The last 2 entries will be evicted due to the capacity limit.
// - The remaining 7 entries will be evicted after 3 seconds, and then the worker
// thread will reinsert them with modified values.
for i in 1..=9 {
thread::sleep(Duration::from_millis(100));
let key = i.to_string();
let value = i;
println!("Inserting {} with value {}.", key, value);
cache.insert(key, value);
}

// Wait for 8 seconds.
thread::sleep(Duration::from_secs(8));

// Shutdown the worker threads.
snd.send(Command::Shutdown).expect("Cannot send");
worker1.join().expect("The worker thread 1 panicked");

shutdown.store(true, Ordering::Relaxed);
worker2.join().expect("The worker thread 2 panicked");
}

0 comments on commit d206c68

Please sign in to comment.