Skip to content

Commit

Permalink
fix(server): resolve deadlock during concurrent segment reads (#1539)
Browse files Browse the repository at this point in the history
This commit addresses a deadlock issue in file reading operations by
refactoring the code to use `Arc<File>` instead of `RwLock<Option<File>>`.

The changes ensure that file operations are performed in a non-blocking
manner using `spawn_blocking` and `read_exact_at`. This improves the
concurrency and efficiency of file access, particularly in scenarios
where multiple tasks are reading from the same file concurrently. The
commit also includes error handling improvements for better diagnostics
and robustness.

Besides that, `posix_fadvise(sequential)` is added for segment files
to improve page cache utilization.
  • Loading branch information
hubcio authored Feb 16, 2025
1 parent 01eaa15 commit 48ba0ce
Show file tree
Hide file tree
Showing 11 changed files with 335 additions and 355 deletions.
15 changes: 14 additions & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 2 additions & 1 deletion server/Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "server"
version = "0.4.203"
version = "0.4.204"
edition = "2021"
build = "src/build.rs"
license = "Apache-2.0"
Expand Down Expand Up @@ -45,6 +45,7 @@ iggy = { path = "../sdk" }
jsonwebtoken = "9.3.1"
mimalloc = { version = "0.1", optional = true }
moka = { version = "0.12.10", features = ["future"] }
nix = { version = "0.29", features = ["fs"] }
openssl = { version = "0.10.70", features = ["vendored"] }
opentelemetry = { version = "0.28.0", features = ["trace", "logs"] }
opentelemetry-appender-tracing = { version = "0.28.1", features = ["log"] }
Expand Down
4 changes: 2 additions & 2 deletions server/src/compat/index_rebuilding/index_rebuilder.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use crate::streaming::utils::file;
use crate::{
server_error::CompatError, streaming::batching::message_batch::RETAINED_BATCH_OVERHEAD,
server_error::CompatError, streaming::batching::message_batch::RETAINED_BATCH_HEADER_LEN,
};
use std::io::SeekFrom;
use tokio::io::{AsyncReadExt, AsyncSeekExt, AsyncWriteExt, BufReader, BufWriter};
Expand Down Expand Up @@ -73,7 +73,7 @@ impl IndexRebuilder {
match Self::read_batch_header(&mut reader).await {
Ok(header) => {
// Calculate next position before writing current entry
next_position = position + RETAINED_BATCH_OVERHEAD as u32 + header.length;
next_position = position + RETAINED_BATCH_HEADER_LEN as u32 + header.length;

// Write index entry using current position
Self::write_index_entry(&mut writer, &header, position, self.start_offset)
Expand Down
4 changes: 2 additions & 2 deletions server/src/streaming/batching/batch_accumulator.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use super::message_batch::{RetainedMessageBatch, RETAINED_BATCH_OVERHEAD};
use super::message_batch::{RetainedMessageBatch, RETAINED_BATCH_HEADER_LEN};
use crate::streaming::local_sizeable::LocalSizeable;
use crate::streaming::models::messages::RetainedMessage;
use bytes::BytesMut;
Expand Down Expand Up @@ -109,6 +109,6 @@ impl BatchAccumulator {

impl Sizeable for BatchAccumulator {
fn get_size_bytes(&self) -> IggyByteSize {
self.current_size + RETAINED_BATCH_OVERHEAD.into()
self.current_size + RETAINED_BATCH_HEADER_LEN.into()
}
}
4 changes: 2 additions & 2 deletions server/src/streaming/batching/message_batch.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ use crate::streaming::models::messages::RetainedMessage;
use bytes::Bytes;
use iggy::utils::{byte_size::IggyByteSize, sizeable::Sizeable};

pub const RETAINED_BATCH_OVERHEAD: u64 = 8 + 8 + 4 + 4;
pub const RETAINED_BATCH_HEADER_LEN: u64 = 8 + 8 + 4 + 4;

#[derive(Debug)]
pub struct RetainedMessageBatch {
Expand Down Expand Up @@ -81,6 +81,6 @@ where

impl Sizeable for RetainedMessageBatch {
fn get_size_bytes(&self) -> IggyByteSize {
self.length + RETAINED_BATCH_OVERHEAD.into()
self.length + RETAINED_BATCH_HEADER_LEN.into()
}
}
Loading

0 comments on commit 48ba0ce

Please sign in to comment.