From 00bdcf5b305422ecfedc4fbea4421fbc1c01e715 Mon Sep 17 00:00:00 2001 From: Hubert Gruszecki Date: Thu, 13 Feb 2025 21:58:21 +0100 Subject: [PATCH] refactor(server): improve cache size calculation and memory tracking This commit refactors the cache size calculation by replacing the `LocalSizeable` trait with a new `RealSize` trait. The `RealSize` trait provides a more accurate calculation of the memory size of types, including overhead from containers like `Arc` and `Vec`. --- server/src/streaming/cache/buffer.rs | 24 +++++++++---------- server/src/streaming/local_sizeable.rs | 6 +++++ server/src/streaming/models/messages.rs | 32 +++++++++++++++++++++++++ 3 files changed, 50 insertions(+), 12 deletions(-) diff --git a/server/src/streaming/cache/buffer.rs b/server/src/streaming/cache/buffer.rs index 8e3f3872c..67ebba600 100644 --- a/server/src/streaming/cache/buffer.rs +++ b/server/src/streaming/cache/buffer.rs @@ -1,4 +1,4 @@ -use crate::streaming::local_sizeable::LocalSizeable; +use crate::streaming::local_sizeable::RealSize; use super::memory_tracker::CacheMemoryTracker; use atone::Vc; @@ -11,17 +11,17 @@ use std::sync::{ }; #[derive(Debug)] -pub struct SmartCache { - current_size: IggyByteSize, +pub struct SmartCache { buffer: Vc, memory_tracker: Arc, + current_size: IggyByteSize, hits: AtomicU64, misses: AtomicU64, } impl SmartCache where - T: LocalSizeable + Clone + Debug, + T: RealSize + Clone + Debug, { pub fn new() -> Self { let current_size = IggyByteSize::default(); @@ -29,9 +29,9 @@ where let memory_tracker = CacheMemoryTracker::get_instance().unwrap(); Self { - current_size, buffer, memory_tracker, + current_size, hits: AtomicU64::new(0), misses: AtomicU64::new(0), } @@ -49,11 +49,11 @@ where /// removes the oldest elements until there's enough space for the new element. /// It's preferred to use `extend` instead of this method. pub fn push_safe(&mut self, element: T) { - let element_size = element.get_size_bytes(); + let element_size = element.real_size(); while !self.memory_tracker.will_fit_into_cache(element_size) { if let Some(oldest_element) = self.buffer.pop_front() { - let oldest_size = oldest_element.get_size_bytes(); + let oldest_size = oldest_element.real_size(); self.memory_tracker .decrement_used_memory(oldest_size.as_bytes_u64()); self.current_size -= oldest_size; @@ -74,7 +74,7 @@ where if removed_size >= size_to_remove { break; } - let elem_size = element.get_size_bytes(); + let elem_size = element.real_size(); self.memory_tracker .decrement_used_memory(elem_size.as_bytes_u64()); self.current_size -= elem_size; @@ -101,7 +101,7 @@ where /// even if it exceeds the memory limit. pub fn extend(&mut self, elements: impl IntoIterator) { let elements = elements.into_iter().inspect(|element| { - let element_size = element.get_size_bytes(); + let element_size = element.real_size(); self.memory_tracker .increment_used_memory(element_size.as_bytes_u64()); self.current_size += element_size; @@ -111,7 +111,7 @@ where /// Always appends the element into the buffer, even if it exceeds the memory limit. pub fn append(&mut self, element: T) { - let element_size = element.get_size_bytes(); + let element_size = element.real_size(); self.memory_tracker .increment_used_memory(element_size.as_bytes_u64()); self.current_size += element_size; @@ -154,7 +154,7 @@ where impl Index for SmartCache where - T: LocalSizeable + Clone + Debug, + T: RealSize + Clone + Debug, { type Output = T; @@ -163,7 +163,7 @@ where } } -impl Default for SmartCache { +impl Default for SmartCache { fn default() -> Self { Self::new() } diff --git a/server/src/streaming/local_sizeable.rs b/server/src/streaming/local_sizeable.rs index 9d121cb39..892739a22 100644 --- a/server/src/streaming/local_sizeable.rs +++ b/server/src/streaming/local_sizeable.rs @@ -5,3 +5,9 @@ use iggy::utils::byte_size::IggyByteSize; pub trait LocalSizeable { fn get_size_bytes(&self) -> IggyByteSize; } + +/// Trait for calculating the real memory size of a type, including all its fields +/// and any additional overhead from containers like Arc, Vec, etc. +pub trait RealSize { + fn real_size(&self) -> IggyByteSize; +} diff --git a/server/src/streaming/models/messages.rs b/server/src/streaming/models/messages.rs index 3dbf852c3..cef761f7e 100644 --- a/server/src/streaming/models/messages.rs +++ b/server/src/streaming/models/messages.rs @@ -1,4 +1,5 @@ use crate::streaming::local_sizeable::LocalSizeable; +use crate::streaming::local_sizeable::RealSize; use crate::streaming::models::COMPONENT; use bytes::{BufMut, Bytes, BytesMut}; use error_set::ErrContext; @@ -11,6 +12,7 @@ use iggy::utils::sizeable::Sizeable; use iggy::{messages::send_messages::Message, models::messages::MessageState}; use serde::{Deserialize, Serialize}; use std::collections::HashMap; +use std::mem; use std::ops::Deref; use std::sync::Arc; @@ -152,6 +154,36 @@ impl Sizeable for RetainedMessage { } } +impl RealSize for RetainedMessage { + fn real_size(&self) -> IggyByteSize { + let mut total_size = 0; + + total_size += mem::size_of::(); // id + total_size += mem::size_of::(); // offset + total_size += mem::size_of::(); // timestamp + total_size += mem::size_of::(); // checksum + total_size += mem::size_of::(); // message_state + + total_size += mem::size_of::>(); // headers + if let Some(headers) = &self.headers { + total_size += headers.len(); // headers length + total_size += mem::size_of::() * 2; // Bytes overhead + } + + total_size += self.payload.len(); // payload length + total_size += mem::size_of::() * 2; // Bytes overhead + + IggyByteSize::from(total_size as u64) + } +} + +impl RealSize for Arc { + fn real_size(&self) -> IggyByteSize { + let arc_overhead = mem::size_of::() as u64 * 2; + self.deref().real_size() + IggyByteSize::from(arc_overhead) + } +} + impl LocalSizeable for T where T: Deref,