Skip to content

Commit

Permalink
Merge pull request #114 from moka-rs/iterator
Browse files Browse the repository at this point in the history
Add `iter` method to create an iterator to `unsync`, `sync` and `future` caches
  • Loading branch information
tatsuya6502 authored Apr 10, 2022
2 parents 8d7a6a2 + 2f0a316 commit aedb98c
Show file tree
Hide file tree
Showing 15 changed files with 1,041 additions and 35 deletions.
14 changes: 14 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,18 @@
# Moka — Change Log

## Version 0.8.2

### Added

- Add iterator to the following caches: ([#114][gh-pull-0114])
- `sync::Cache`
- `sync::SegmentedCache`
- `future::Cache`
- `unsync::Cache`
- Implement `IntoIterator` to the all caches (including experimental `dash::Cache`)
([#114][gh-pull-0114])


## Version 0.8.1

### Added
Expand Down Expand Up @@ -287,6 +300,7 @@ The minimum supported Rust version (MSRV) is now 1.51.0 (2021-03-25).
[gh-issue-0038]: https://github.com/moka-rs/moka/issues/38/
[gh-issue-0031]: https://github.com/moka-rs/moka/issues/31/

[gh-pull-0114]: https://github.com/moka-rs/moka/pull/114/
[gh-pull-0105]: https://github.com/moka-rs/moka/pull/105/
[gh-pull-0104]: https://github.com/moka-rs/moka/pull/104/
[gh-pull-0103]: https://github.com/moka-rs/moka/pull/103/
Expand Down
2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "moka"
version = "0.8.1"
version = "0.8.2"
edition = "2018"
rust-version = "1.51"

Expand Down
27 changes: 27 additions & 0 deletions src/cht/map/bucket.rs
Original file line number Diff line number Diff line change
Expand Up @@ -334,6 +334,33 @@ impl<'g, K: 'g + Eq, V: 'g> BucketArray<K, V> {

loop_result.returned().flatten()
}

pub(crate) fn keys<F, T>(
&self,
guard: &'g Guard,
with_key: &mut F,
) -> Result<Vec<T>, RelocatedError>
where
F: FnMut(&K) -> T,
{
let mut keys = Vec::new();

for bucket in self.buckets.iter() {
let bucket_ptr = bucket.load_consume(guard);

if is_sentinel(bucket_ptr) {
return Err(RelocatedError);
}

if let Some(bucket_ref) = unsafe { bucket_ptr.as_ref() } {
if !is_tombstone(bucket_ptr) {
keys.push(with_key(&bucket_ref.key));
}
}
}

Ok(keys)
}
}

impl<'g, K: 'g, V: 'g> BucketArray<K, V> {
Expand Down
28 changes: 28 additions & 0 deletions src/cht/map/bucket_array_ref.rs
Original file line number Diff line number Diff line change
Expand Up @@ -266,6 +266,34 @@ where

result
}

pub(crate) fn keys<F, T>(&self, mut with_key: F) -> Vec<T>
where
F: FnMut(&K) -> T,
{
let guard = &crossbeam_epoch::pin();
let current_ref = self.get(guard);
let mut bucket_array_ref = current_ref;

let result;

loop {
match bucket_array_ref.keys(guard, &mut with_key) {
Ok(keys) => {
result = keys;
break;
}
Err(_) => {
bucket_array_ref =
bucket_array_ref.rehash(guard, self.build_hasher, RehashOp::Expand);
}
}
}

self.swing(guard, current_ref, bucket_array_ref);

result
}
}

impl<'a, 'g, K, V, S> BucketArrayRef<'a, K, V, S> {
Expand Down
71 changes: 71 additions & 0 deletions src/cht/segment.rs
Original file line number Diff line number Diff line change
Expand Up @@ -530,6 +530,28 @@ impl<K: Hash + Eq, V, S: BuildHasher> HashMap<K, V, S> {
result
}

pub(crate) fn keys<F, T>(&self, segment: usize, with_key: F) -> Option<Vec<T>>
where
F: FnMut(&K) -> T,
{
if segment >= self.segments.len() {
return None;
}

let Segment {
ref bucket_array,
ref len,
} = self.segments[segment];

let bucket_array_ref = BucketArrayRef {
bucket_array,
build_hasher: &self.build_hasher,
len,
};

Some(bucket_array_ref.keys(with_key))
}

#[inline]
pub(crate) fn hash<Q>(&self, key: &Q) -> u64
where
Expand All @@ -538,6 +560,10 @@ impl<K: Hash + Eq, V, S: BuildHasher> HashMap<K, V, S> {
{
bucket::hash(&self.build_hasher, key)
}

pub(crate) fn actual_num_segments(&self) -> usize {
self.segments.len()
}
}

impl<K, V, S> Drop for HashMap<K, V, S> {
Expand Down Expand Up @@ -1610,4 +1636,49 @@ mod tests {

run_deferred();
}

#[test]
fn keys_in_single_segment() {
let map =
HashMap::with_num_segments_capacity_and_hasher(1, 0, DefaultHashBuilder::default());

assert!(map.is_empty());
assert_eq!(map.len(), 0);

const NUM_KEYS: usize = 200;

for i in 0..NUM_KEYS {
let key = Arc::new(i);
let hash = map.hash(&key);
assert_eq!(map.insert_entry_and(key, hash, i, |_, v| *v), None);
}

assert!(!map.is_empty());
assert_eq!(map.len(), NUM_KEYS);

let mut keys = map.keys(0, |k| Arc::clone(k)).unwrap();
assert_eq!(keys.len(), NUM_KEYS);
keys.sort_unstable();

for (i, key) in keys.into_iter().enumerate() {
assert_eq!(i, *key);
}

for i in (0..NUM_KEYS).step_by(2) {
assert_eq!(map.remove(&i, map.hash(&i)), Some(i));
}

assert!(!map.is_empty());
assert_eq!(map.len(), NUM_KEYS / 2);

let mut keys = map.keys(0, |k| Arc::clone(k)).unwrap();
assert_eq!(keys.len(), NUM_KEYS / 2);
keys.sort_unstable();

for (i, key) in keys.into_iter().enumerate() {
assert_eq!(i, *key / 2);
}

run_deferred();
}
}
56 changes: 43 additions & 13 deletions src/dash/cache.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use super::{
base_cache::{BaseCache, HouseKeeperArc, MAX_SYNC_REPEATS, WRITE_RETRY_INTERVAL_MICROS},
CacheBuilder, ConcurrentCacheExt, Iter,
CacheBuilder, ConcurrentCacheExt, EntryRef, Iter,
};
use crate::{
sync::{housekeeper::InnerSync, Weigher, WriteOp},
Expand All @@ -25,9 +25,6 @@ use std::{
/// Since `DashMap` employs read-write locks on internal shards, it will have lower
/// concurrency on retrievals and updates than other caches.
///
/// On the other hand, `dash` cache provides iterator, which returns immutable
/// references to the entries in a cache. Other caches do not provide iterator.
///
/// `dash` cache performs a best-effort bounding of the map using an entry
/// replacement algorithm to determine which entries to evict when the capacity is
/// exceeded.
Expand Down Expand Up @@ -436,13 +433,24 @@ where
V: 'a,
S: BuildHasher + Clone,
{
/// Creates an iterator over a `moka::dash::Cache` yielding immutable references.
/// Creates an iterator visiting all key-value pairs in arbitrary order. The
/// iterator element type is [`EntryRef<'a, K, V, S>`][moka-entry-ref].
///
/// Unlike the `get` method, visiting entries via an iterator do not update the
/// historic popularity estimator or reset idle timers for keys.
///
/// # Guarantees
///
/// **TODO**
///
/// # Locking behavior
///
/// **Locking behavior**: This iterator relies on the iterator of
/// [`dashmap::DashMap`][dashmap-iter], which employs read-write locks. May
/// deadlock if the thread holding an iterator attempts to update the cache.
/// This iterator relies on the iterator of [`dashmap::DashMap`][dashmap-iter],
/// which employs read-write locks. May deadlock if the thread holding an
/// iterator attempts to update the cache.
///
/// [dashmap-iter]: https://docs.rs/dashmap/5.2.0/dashmap/struct.DashMap.html#method.iter
/// [moka-entry-ref]: ./struct.EntryRef.html
/// [dashmap-iter]: <https://docs.rs/dashmap/*/dashmap/struct.DashMap.html#method.iter>
///
/// # Examples
///
Expand Down Expand Up @@ -478,6 +486,21 @@ where
}
}

impl<'a, K, V, S> IntoIterator for &'a Cache<K, V, S>
where
K: 'a + Eq + Hash,
V: 'a,
S: BuildHasher + Clone,
{
type Item = EntryRef<'a, K, V, S>;

type IntoIter = Iter<'a, K, V, S>;

fn into_iter(self) -> Self::IntoIter {
self.iter()
}
}

// private methods
impl<K, V, S> Cache<K, V, S>
where
Expand Down Expand Up @@ -902,7 +925,7 @@ mod tests {

let mut key_set = std::collections::HashSet::new();

for entry in cache.iter() {
for entry in &cache {
let (key, value) = entry.pair();
assert_eq!(value, &make_value(*key));

Expand Down Expand Up @@ -934,7 +957,10 @@ mod tests {
///
#[test]
fn test_iter_multi_threads() {
use std::collections::HashSet;

const NUM_KEYS: usize = 1024;
const NUM_THREADS: usize = 16;

fn make_value(key: usize) -> String {
format!("val: {}", key)
Expand All @@ -955,7 +981,7 @@ mod tests {

// https://rust-lang.github.io/rust-clippy/master/index.html#needless_collect
#[allow(clippy::needless_collect)]
let handles = (0..16usize)
let handles = (0..NUM_THREADS)
.map(|n| {
let cache = cache.clone();
let rw_lock = Arc::clone(&rw_lock);
Expand All @@ -974,8 +1000,8 @@ mod tests {
// This thread will iterate the cache.
std::thread::spawn(move || {
let read_lock = rw_lock.read().unwrap();
let mut key_set = std::collections::HashSet::new();
for entry in cache.iter() {
let mut key_set = HashSet::new();
for entry in &cache {
let (key, value) = entry.pair();
assert_eq!(value, &make_value(*key));
key_set.insert(*key);
Expand All @@ -992,6 +1018,10 @@ mod tests {
std::mem::drop(write_lock);

handles.into_iter().for_each(|h| h.join().expect("Failed"));

// Ensure there are no missing or duplicate keys in the iteration.
let key_set = cache.iter().map(|ent| *ent.key()).collect::<HashSet<_>>();
assert_eq!(key_set.len(), NUM_KEYS);
}

#[test]
Expand Down
Loading

0 comments on commit aedb98c

Please sign in to comment.