Skip to content

Commit

Permalink
perf: don't iter evicted entries with noop disk cache (#834)
Browse files Browse the repository at this point in the history
Signed-off-by: MrCroxx <[email protected]>
  • Loading branch information
MrCroxx authored Jan 10, 2025
1 parent 170a410 commit bdf4846
Show file tree
Hide file tree
Showing 7 changed files with 91 additions and 73 deletions.
1 change: 1 addition & 0 deletions foyer-memory/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ readme = { workspace = true }

[dependencies]
ahash = { workspace = true }
arc-swap = "1"
bitflags = "2"
cmsketch = "0.2.1"
equivalent = { workspace = true }
Expand Down
25 changes: 11 additions & 14 deletions foyer-memory/src/cache.rs
Original file line number Diff line number Diff line change
Expand Up @@ -292,7 +292,6 @@ where
weighter: Arc<dyn Weighter<K, V>>,

event_listener: Option<Arc<dyn EventListener<Key = K, Value = V>>>,
pipe: Option<Arc<dyn Pipe<Key = K, Value = V>>>,

registry: BoxedRegistry,
metrics: Option<Arc<Metrics>>,
Expand All @@ -315,7 +314,6 @@ where
hash_builder: RandomState::default(),
weighter: Arc::new(|_, _| 1),
event_listener: None,
pipe: None,

registry: Box::new(NoopMetricsRegistry),
metrics: None,
Expand Down Expand Up @@ -367,7 +365,6 @@ where
hash_builder,
weighter: self.weighter,
event_listener: self.event_listener,
pipe: self.pipe,
registry: self.registry,
metrics: self.metrics,
}
Expand All @@ -385,13 +382,6 @@ where
self
}

/// Set pipe.
#[doc(hidden)]
pub fn with_pipe(mut self, pipe: Arc<dyn Pipe<Key = K, Value = V>>) -> Self {
self.pipe = Some(pipe);
self
}

/// Set metrics registry.
///
/// Default: [`NoopMetricsRegistry`].
Expand Down Expand Up @@ -431,7 +421,6 @@ where
hash_builder: self.hash_builder,
weighter: self.weighter,
event_listener: self.event_listener,
pipe: self.pipe,
metrics,
}))),
EvictionConfig::S3Fifo(eviction_config) => Cache::S3Fifo(Arc::new(RawCache::new(RawCacheConfig {
Expand All @@ -441,7 +430,6 @@ where
hash_builder: self.hash_builder,
weighter: self.weighter,
event_listener: self.event_listener,
pipe: self.pipe,
metrics,
}))),
EvictionConfig::Lru(eviction_config) => Cache::Lru(Arc::new(RawCache::new(RawCacheConfig {
Expand All @@ -451,7 +439,6 @@ where
hash_builder: self.hash_builder,
weighter: self.weighter,
event_listener: self.event_listener,
pipe: self.pipe,
metrics,
}))),
EvictionConfig::Lfu(eviction_config) => Cache::Lfu(Arc::new(RawCache::new(RawCacheConfig {
Expand All @@ -461,7 +448,6 @@ where
hash_builder: self.hash_builder,
weighter: self.weighter,
event_listener: self.event_listener,
pipe: self.pipe,
metrics,
}))),
}
Expand Down Expand Up @@ -702,6 +688,17 @@ where
Cache::Lfu(cache) => cache.shards(),
}
}

/// Set the pipe for the hybrid cache.
#[doc(hidden)]
pub fn set_pipe(&self, pipe: Box<dyn Pipe<Key = K, Value = V>>) {
match self {
Cache::Fifo(cache) => cache.set_pipe(pipe),
Cache::S3Fifo(cache) => cache.set_pipe(pipe),
Cache::Lru(cache) => cache.set_pipe(pipe),
Cache::Lfu(cache) => cache.set_pipe(pipe),
}
}
}

/// A future that is used to get entry value from the remote storage for the in-memory cache.
Expand Down
32 changes: 31 additions & 1 deletion foyer-memory/src/pipe.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,9 @@
// See the License for the specific language governing permissions and
// limitations under the License.

use std::{fmt::Debug, sync::Arc};
use std::{fmt::Debug, marker::PhantomData, sync::Arc};

use foyer_common::code::{Key, Value};

use crate::{record::Record, Eviction};

Expand Down Expand Up @@ -91,10 +93,38 @@ pub trait Pipe: Send + Sync + 'static {
/// Type of the value of the record.
type Value;

/// Decide whether to send evicted entry to pipe.
fn is_enabled(&self) -> bool;

/// Send the piece to the disk cache.
fn send(&self, piece: Piece<Self::Key, Self::Value>);
}

/// An no-op pipe that is never enabled.
#[derive(Debug)]
pub struct NoopPipe<K, V>(PhantomData<(K, V)>);

impl<K, V> Default for NoopPipe<K, V> {
fn default() -> Self {
Self(PhantomData)
}
}

impl<K, V> Pipe for NoopPipe<K, V>
where
K: Key,
V: Value,
{
type Key = K;
type Value = V;

fn is_enabled(&self) -> bool {
false
}

fn send(&self, _: Piece<Self::Key, Self::Value>) {}
}

#[cfg(test)]
mod tests {
use super::*;
Expand Down
41 changes: 20 additions & 21 deletions foyer-memory/src/raw.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ use std::{
task::{Context, Poll},
};

use arc_swap::ArcSwap;
use equivalent::Equivalent;
use fastrace::{
future::{FutureExt, InSpan},
Expand All @@ -46,6 +47,7 @@ use crate::{
error::{Error, Result},
eviction::{Eviction, Op},
indexer::{hash_table::HashTableIndexer, sentry::Sentry, Indexer},
pipe::NoopPipe,
record::{Data, Record},
Piece, Pipe,
};
Expand All @@ -67,7 +69,6 @@ where
pub hash_builder: S,
pub weighter: Arc<dyn Weighter<E::Key, E::Value>>,
pub event_listener: Option<Arc<dyn EventListener<Key = E::Key, Value = E::Value>>>,
pub pipe: Option<Arc<dyn Pipe<Key = E::Key, Value = E::Value>>>,
pub metrics: Arc<Metrics>,
}

Expand Down Expand Up @@ -370,7 +371,7 @@ where

metrics: Arc<Metrics>,
event_listener: Option<Arc<dyn EventListener<Key = E::Key, Value = E::Value>>>,
pipe: Option<Arc<dyn Pipe<Key = E::Key, Value = E::Value>>>,
pipe: ArcSwap<Box<dyn Pipe<Key = E::Key, Value = E::Value>>>,
}

impl<E, S, I> RawCacheInner<E, S, I>
Expand Down Expand Up @@ -452,14 +453,16 @@ where
.map(RwLock::new)
.collect_vec();

let pipe: Box<dyn Pipe<Key = E::Key, Value = E::Value>> = Box::new(NoopPipe::default());

let inner = RawCacheInner {
shards,
capacity: config.capacity,
hash_builder: Arc::new(config.hash_builder),
weighter: config.weighter,
metrics: config.metrics,
event_listener: config.event_listener,
pipe: config.pipe,
pipe: ArcSwap::new(Arc::new(pipe)),
};

Self { inner: Arc::new(inner) }
Expand All @@ -482,15 +485,15 @@ where
})
});
// Deallocate data out of the lock critical section.
if inner.event_listener.is_some() || inner.pipe.is_some() {
let pipe = inner.pipe.load();
let piped = pipe.is_enabled();
if inner.event_listener.is_some() || piped {
for (event, record) in garbages {
if let Some(listener) = inner.event_listener.as_ref() {
listener.on_leave(event, record.key(), record.value())
}
if event == Event::Evict {
if let Some(pipe) = inner.pipe.as_ref() {
pipe.send(Piece::new(record));
}
if piped && event == Event::Evict {
pipe.send(Piece::new(record));
}
}
}
Expand Down Expand Up @@ -564,15 +567,15 @@ where
}

// Deallocate data out of the lock critical section.
if self.inner.event_listener.is_some() || self.inner.pipe.is_some() {
let pipe = self.inner.pipe.load();
let piped = pipe.is_enabled();
if self.inner.event_listener.is_some() || piped {
for (event, record) in garbages {
if let Some(listener) = self.inner.event_listener.as_ref() {
listener.on_leave(event, record.key(), record.value())
}
if event == Event::Evict {
if let Some(pipe) = self.inner.pipe.as_ref() {
pipe.send(Piece::new(record));
}
if piped && event == Event::Evict {
pipe.send(Piece::new(record));
}
}
}
Expand Down Expand Up @@ -685,6 +688,10 @@ where
self.inner.shards.len()
}

pub fn set_pipe(&self, pipe: Box<dyn Pipe<Key = E::Key, Value = E::Value>>) {
self.inner.pipe.store(Arc::new(pipe));
}

fn shard(&self, hash: u64) -> usize {
hash as usize % self.inner.shards.len()
}
Expand Down Expand Up @@ -1034,7 +1041,6 @@ mod tests {
hash_builder: Default::default(),
weighter: Arc::new(|_, _| 1),
event_listener: None,
pipe: None,
metrics: Arc::new(Metrics::noop()),
})
}
Expand All @@ -1047,7 +1053,6 @@ mod tests {
hash_builder: Default::default(),
weighter: Arc::new(|_, _| 1),
event_listener: None,
pipe: None,
metrics: Arc::new(Metrics::noop()),
})
}
Expand All @@ -1060,7 +1065,6 @@ mod tests {
hash_builder: Default::default(),
weighter: Arc::new(|_, _| 1),
event_listener: None,
pipe: None,
metrics: Arc::new(Metrics::noop()),
})
}
Expand All @@ -1073,7 +1077,6 @@ mod tests {
hash_builder: Default::default(),
weighter: Arc::new(|_, _| 1),
event_listener: None,
pipe: None,
metrics: Arc::new(Metrics::noop()),
})
}
Expand Down Expand Up @@ -1178,7 +1181,6 @@ mod tests {
hash_builder: Default::default(),
weighter: Arc::new(|_, _| 1),
event_listener: None,
pipe: None,
metrics: Arc::new(Metrics::noop()),
});
let hints = vec![FifoHint];
Expand All @@ -1194,7 +1196,6 @@ mod tests {
hash_builder: Default::default(),
weighter: Arc::new(|_, _| 1),
event_listener: None,
pipe: None,
metrics: Arc::new(Metrics::noop()),
});
let hints = vec![S3FifoHint];
Expand All @@ -1210,7 +1211,6 @@ mod tests {
hash_builder: Default::default(),
weighter: Arc::new(|_, _| 1),
event_listener: None,
pipe: None,
metrics: Arc::new(Metrics::noop()),
});
let hints = vec![LruHint::HighPriority, LruHint::LowPriority];
Expand All @@ -1226,7 +1226,6 @@ mod tests {
hash_builder: Default::default(),
weighter: Arc::new(|_, _| 1),
event_listener: None,
pipe: None,
metrics: Arc::new(Metrics::noop()),
});
let hints = vec![LfuHint];
Expand Down
5 changes: 5 additions & 0 deletions foyer-storage/src/store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -647,6 +647,11 @@ where

Ok(store)
}

/// Return true the builder is based on a noop device.
pub fn is_noop(&self) -> bool {
matches! {self.device_options, DeviceOptions::None}
}
}

/// Large object disk cache engine default options.
Expand Down
Loading

0 comments on commit bdf4846

Please sign in to comment.