Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor(server): improve cache size calculation and memory tracking #1529

Merged
merged 2 commits into from
Feb 14, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion server/Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "server"
version = "0.4.200"
version = "0.4.201"
edition = "2021"
build = "src/build.rs"
license = "Apache-2.0"
Expand Down
24 changes: 12 additions & 12 deletions server/src/streaming/cache/buffer.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use crate::streaming::local_sizeable::LocalSizeable;
use crate::streaming::local_sizeable::RealSize;

use super::memory_tracker::CacheMemoryTracker;
use atone::Vc;
Expand All @@ -11,27 +11,27 @@ use std::sync::{
};

#[derive(Debug)]
pub struct SmartCache<T: LocalSizeable + Debug> {
current_size: IggyByteSize,
pub struct SmartCache<T: RealSize + Debug> {
buffer: Vc<T>,
memory_tracker: Arc<CacheMemoryTracker>,
current_size: IggyByteSize,
hits: AtomicU64,
misses: AtomicU64,
}

impl<T> SmartCache<T>
where
T: LocalSizeable + Clone + Debug,
T: RealSize + Clone + Debug,
{
pub fn new() -> Self {
let current_size = IggyByteSize::default();
let buffer = Vc::new();
let memory_tracker = CacheMemoryTracker::get_instance().unwrap();

Self {
current_size,
buffer,
memory_tracker,
current_size,
hits: AtomicU64::new(0),
misses: AtomicU64::new(0),
}
Expand All @@ -49,11 +49,11 @@ where
/// removes the oldest elements until there's enough space for the new element.
/// It's preferred to use `extend` instead of this method.
pub fn push_safe(&mut self, element: T) {
let element_size = element.get_size_bytes();
let element_size = element.real_size();

while !self.memory_tracker.will_fit_into_cache(element_size) {
if let Some(oldest_element) = self.buffer.pop_front() {
let oldest_size = oldest_element.get_size_bytes();
let oldest_size = oldest_element.real_size();
self.memory_tracker
.decrement_used_memory(oldest_size.as_bytes_u64());
self.current_size -= oldest_size;
Expand All @@ -74,7 +74,7 @@ where
if removed_size >= size_to_remove {
break;
}
let elem_size = element.get_size_bytes();
let elem_size = element.real_size();
self.memory_tracker
.decrement_used_memory(elem_size.as_bytes_u64());
self.current_size -= elem_size;
Expand All @@ -101,7 +101,7 @@ where
/// even if it exceeds the memory limit.
pub fn extend(&mut self, elements: impl IntoIterator<Item = T>) {
let elements = elements.into_iter().inspect(|element| {
let element_size = element.get_size_bytes();
let element_size = element.real_size();
self.memory_tracker
.increment_used_memory(element_size.as_bytes_u64());
self.current_size += element_size;
Expand All @@ -111,7 +111,7 @@ where

/// Always appends the element into the buffer, even if it exceeds the memory limit.
pub fn append(&mut self, element: T) {
let element_size = element.get_size_bytes();
let element_size = element.real_size();
self.memory_tracker
.increment_used_memory(element_size.as_bytes_u64());
self.current_size += element_size;
Expand Down Expand Up @@ -154,7 +154,7 @@ where

impl<T> Index<usize> for SmartCache<T>
where
T: LocalSizeable + Clone + Debug,
T: RealSize + Clone + Debug,
{
type Output = T;

Expand All @@ -163,7 +163,7 @@ where
}
}

impl<T: LocalSizeable + Clone + Debug> Default for SmartCache<T> {
impl<T: RealSize + Clone + Debug> Default for SmartCache<T> {
fn default() -> Self {
Self::new()
}
Expand Down
6 changes: 6 additions & 0 deletions server/src/streaming/local_sizeable.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,3 +5,9 @@ use iggy::utils::byte_size::IggyByteSize;
pub trait LocalSizeable {
fn get_size_bytes(&self) -> IggyByteSize;
}

/// Trait for calculating the real memory size of a type, including all its fields
/// and any additional overhead from containers like Arc, Vec, etc.
pub trait RealSize {
fn real_size(&self) -> IggyByteSize;
}
32 changes: 32 additions & 0 deletions server/src/streaming/models/messages.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
use crate::streaming::local_sizeable::LocalSizeable;
use crate::streaming::local_sizeable::RealSize;
use crate::streaming::models::COMPONENT;
use bytes::{BufMut, Bytes, BytesMut};
use error_set::ErrContext;
Expand All @@ -11,6 +12,7 @@ use iggy::utils::sizeable::Sizeable;
use iggy::{messages::send_messages::Message, models::messages::MessageState};
use serde::{Deserialize, Serialize};
use std::collections::HashMap;
use std::mem;
use std::ops::Deref;
use std::sync::Arc;

Expand Down Expand Up @@ -152,6 +154,36 @@ impl Sizeable for RetainedMessage {
}
}

impl RealSize for RetainedMessage {
fn real_size(&self) -> IggyByteSize {
let mut total_size = 0;

total_size += mem::size_of::<u128>(); // id
total_size += mem::size_of::<u64>(); // offset
total_size += mem::size_of::<u64>(); // timestamp
total_size += mem::size_of::<u32>(); // checksum
total_size += mem::size_of::<MessageState>(); // message_state

total_size += mem::size_of::<Option<Bytes>>(); // headers
if let Some(headers) = &self.headers {
total_size += headers.len(); // headers length
total_size += mem::size_of::<Bytes>() * 2; // Bytes overhead
}

total_size += self.payload.len(); // payload length
total_size += mem::size_of::<Bytes>() * 2; // Bytes overhead

IggyByteSize::from(total_size as u64)
}
}

impl RealSize for Arc<RetainedMessage> {
fn real_size(&self) -> IggyByteSize {
let arc_overhead = mem::size_of::<usize>() as u64 * 2;
self.deref().real_size() + IggyByteSize::from(arc_overhead)
}
}

impl<T> LocalSizeable for T
where
T: Deref<Target = RetainedMessage>,
Expand Down