Skip to content

Commit

Permalink
refactor(server): improve cache size calculation and memory tracking (#…
Browse files Browse the repository at this point in the history
…1529)

This commit refactors the cache size calculation by replacing the
`LocalSizeable` trait with a new `RealSize` trait. The `RealSize`
trait provides a more accurate calculation of the memory size of
types, including overhead from containers like `Arc` and `Vec`.
  • Loading branch information
hubcio authored Feb 14, 2025
1 parent 683e98e commit 44de98c
Show file tree
Hide file tree
Showing 5 changed files with 52 additions and 14 deletions.
2 changes: 1 addition & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion server/Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "server"
version = "0.4.200"
version = "0.4.201"
edition = "2021"
build = "src/build.rs"
license = "Apache-2.0"
Expand Down
24 changes: 12 additions & 12 deletions server/src/streaming/cache/buffer.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use crate::streaming::local_sizeable::LocalSizeable;
use crate::streaming::local_sizeable::RealSize;

use super::memory_tracker::CacheMemoryTracker;
use atone::Vc;
Expand All @@ -11,27 +11,27 @@ use std::sync::{
};

#[derive(Debug)]
pub struct SmartCache<T: LocalSizeable + Debug> {
current_size: IggyByteSize,
pub struct SmartCache<T: RealSize + Debug> {
buffer: Vc<T>,
memory_tracker: Arc<CacheMemoryTracker>,
current_size: IggyByteSize,
hits: AtomicU64,
misses: AtomicU64,
}

impl<T> SmartCache<T>
where
T: LocalSizeable + Clone + Debug,
T: RealSize + Clone + Debug,
{
pub fn new() -> Self {
let current_size = IggyByteSize::default();
let buffer = Vc::new();
let memory_tracker = CacheMemoryTracker::get_instance().unwrap();

Self {
current_size,
buffer,
memory_tracker,
current_size,
hits: AtomicU64::new(0),
misses: AtomicU64::new(0),
}
Expand All @@ -49,11 +49,11 @@ where
/// removes the oldest elements until there's enough space for the new element.
/// It's preferred to use `extend` instead of this method.
pub fn push_safe(&mut self, element: T) {
let element_size = element.get_size_bytes();
let element_size = element.real_size();

while !self.memory_tracker.will_fit_into_cache(element_size) {
if let Some(oldest_element) = self.buffer.pop_front() {
let oldest_size = oldest_element.get_size_bytes();
let oldest_size = oldest_element.real_size();
self.memory_tracker
.decrement_used_memory(oldest_size.as_bytes_u64());
self.current_size -= oldest_size;
Expand All @@ -74,7 +74,7 @@ where
if removed_size >= size_to_remove {
break;
}
let elem_size = element.get_size_bytes();
let elem_size = element.real_size();
self.memory_tracker
.decrement_used_memory(elem_size.as_bytes_u64());
self.current_size -= elem_size;
Expand All @@ -101,7 +101,7 @@ where
/// even if it exceeds the memory limit.
pub fn extend(&mut self, elements: impl IntoIterator<Item = T>) {
let elements = elements.into_iter().inspect(|element| {
let element_size = element.get_size_bytes();
let element_size = element.real_size();
self.memory_tracker
.increment_used_memory(element_size.as_bytes_u64());
self.current_size += element_size;
Expand All @@ -111,7 +111,7 @@ where

/// Always appends the element into the buffer, even if it exceeds the memory limit.
pub fn append(&mut self, element: T) {
let element_size = element.get_size_bytes();
let element_size = element.real_size();
self.memory_tracker
.increment_used_memory(element_size.as_bytes_u64());
self.current_size += element_size;
Expand Down Expand Up @@ -154,7 +154,7 @@ where

impl<T> Index<usize> for SmartCache<T>
where
T: LocalSizeable + Clone + Debug,
T: RealSize + Clone + Debug,
{
type Output = T;

Expand All @@ -163,7 +163,7 @@ where
}
}

impl<T: LocalSizeable + Clone + Debug> Default for SmartCache<T> {
impl<T: RealSize + Clone + Debug> Default for SmartCache<T> {
fn default() -> Self {
Self::new()
}
Expand Down
6 changes: 6 additions & 0 deletions server/src/streaming/local_sizeable.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,3 +5,9 @@ use iggy::utils::byte_size::IggyByteSize;
pub trait LocalSizeable {
fn get_size_bytes(&self) -> IggyByteSize;
}

/// Trait for calculating the real memory size of a type, including all its fields
/// and any additional overhead from containers like Arc, Vec, etc.
pub trait RealSize {
fn real_size(&self) -> IggyByteSize;
}
32 changes: 32 additions & 0 deletions server/src/streaming/models/messages.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
use crate::streaming::local_sizeable::LocalSizeable;
use crate::streaming::local_sizeable::RealSize;
use crate::streaming::models::COMPONENT;
use bytes::{BufMut, Bytes, BytesMut};
use error_set::ErrContext;
Expand All @@ -11,6 +12,7 @@ use iggy::utils::sizeable::Sizeable;
use iggy::{messages::send_messages::Message, models::messages::MessageState};
use serde::{Deserialize, Serialize};
use std::collections::HashMap;
use std::mem;
use std::ops::Deref;
use std::sync::Arc;

Expand Down Expand Up @@ -152,6 +154,36 @@ impl Sizeable for RetainedMessage {
}
}

impl RealSize for RetainedMessage {
fn real_size(&self) -> IggyByteSize {
let mut total_size = 0;

total_size += mem::size_of::<u128>(); // id
total_size += mem::size_of::<u64>(); // offset
total_size += mem::size_of::<u64>(); // timestamp
total_size += mem::size_of::<u32>(); // checksum
total_size += mem::size_of::<MessageState>(); // message_state

total_size += mem::size_of::<Option<Bytes>>(); // headers
if let Some(headers) = &self.headers {
total_size += headers.len(); // headers length
total_size += mem::size_of::<Bytes>() * 2; // Bytes overhead
}

total_size += self.payload.len(); // payload length
total_size += mem::size_of::<Bytes>() * 2; // Bytes overhead

IggyByteSize::from(total_size as u64)
}
}

impl RealSize for Arc<RetainedMessage> {
fn real_size(&self) -> IggyByteSize {
let arc_overhead = mem::size_of::<usize>() as u64 * 2;
self.deref().real_size() + IggyByteSize::from(arc_overhead)
}
}

impl<T> LocalSizeable for T
where
T: Deref<Target = RetainedMessage>,
Expand Down

0 comments on commit 44de98c

Please sign in to comment.