Skip to content

Commit

Permalink
perf(server): optimize writing performance (#1530)
Browse files Browse the repository at this point in the history
This commit  optimizes the log writing process by removing the
`stream_position()` call, which uses `lseek` and negatively
impacts performance. Afterwards, the new implementation
directly updates the log file size using `fetch_add`, resulting
in a 40% performance improvement on writing (Linux).
On MacOS, degradation was not visible.
  • Loading branch information
hubcio authored Feb 14, 2025
1 parent 49afa7b commit 4adba54
Show file tree
Hide file tree
Showing 13 changed files with 52 additions and 68 deletions.
2 changes: 1 addition & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

6 changes: 4 additions & 2 deletions integration/tests/streaming/common/test_setup.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
use server::configs::system::SystemConfig;
use server::streaming::persistence::persister::{FilePersister, PersisterKind};
use server::streaming::persistence::persister::{FileWithSyncPersister, PersisterKind};
use server::streaming::storage::SystemStorage;
use std::sync::Arc;
use tokio::fs;
Expand All @@ -17,10 +17,12 @@ impl TestSetup {

pub async fn init_with_config(mut config: SystemConfig) -> TestSetup {
config.path = format!("local_data_{}", Uuid::now_v7().to_u128_le());
config.partition.enforce_fsync = true;
config.state.enforce_fsync = true;

let config = Arc::new(config);
fs::create_dir(config.get_system_path()).await.unwrap();
let persister = PersisterKind::File(FilePersister {});
let persister = PersisterKind::FileWithSync(FileWithSyncPersister {});
let storage = Arc::new(SystemStorage::new(config.clone(), Arc::new(persister)));
TestSetup { config, storage }
}
Expand Down
2 changes: 1 addition & 1 deletion server/Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "server"
version = "0.4.202"
version = "0.4.203"
edition = "2021"
build = "src/build.rs"
license = "Apache-2.0"
Expand Down
4 changes: 2 additions & 2 deletions server/src/streaming/partitions/messages.rs
Original file line number Diff line number Diff line change
Expand Up @@ -580,7 +580,7 @@ mod tests {
use super::*;
use crate::configs::system::{MessageDeduplicationConfig, SystemConfig};
use crate::streaming::partitions::create_messages;
use crate::streaming::persistence::persister::{FilePersister, PersisterKind};
use crate::streaming::persistence::persister::{FileWithSyncPersister, PersisterKind};
use crate::streaming::storage::SystemStorage;

#[tokio::test]
Expand Down Expand Up @@ -648,7 +648,7 @@ mod tests {
});
let storage = Arc::new(SystemStorage::new(
config.clone(),
Arc::new(PersisterKind::File(FilePersister {})),
Arc::new(PersisterKind::FileWithSync(FileWithSyncPersister {})),
));

(
Expand Down
8 changes: 4 additions & 4 deletions server/src/streaming/partitions/partition.rs
Original file line number Diff line number Diff line change
Expand Up @@ -212,7 +212,7 @@ impl fmt::Display for Partition {
mod tests {
use crate::configs::system::{CacheConfig, SystemConfig};
use crate::streaming::partitions::partition::Partition;
use crate::streaming::persistence::persister::{FilePersister, PersisterKind};
use crate::streaming::persistence::persister::{FileWithSyncPersister, PersisterKind};
use crate::streaming::storage::SystemStorage;
use iggy::utils::duration::IggyDuration;
use iggy::utils::expiry::IggyExpiry;
Expand All @@ -229,7 +229,7 @@ mod tests {
});
let storage = Arc::new(SystemStorage::new(
config.clone(),
Arc::new(PersisterKind::File(FilePersister {})),
Arc::new(PersisterKind::FileWithSync(FileWithSyncPersister {})),
));

let stream_id = 1;
Expand Down Expand Up @@ -279,7 +279,7 @@ mod tests {
});
let storage = Arc::new(SystemStorage::new(
config.clone(),
Arc::new(PersisterKind::File(FilePersister {})),
Arc::new(PersisterKind::FileWithSync(FileWithSyncPersister {})),
));

let partition = Partition::create(
Expand Down Expand Up @@ -316,7 +316,7 @@ mod tests {
});
let storage = Arc::new(SystemStorage::new(
config.clone(),
Arc::new(PersisterKind::File(FilePersister {})),
Arc::new(PersisterKind::FileWithSync(FileWithSyncPersister {})),
));

let topic_id = 1;
Expand Down
22 changes: 6 additions & 16 deletions server/src/streaming/segments/logs/log_writer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ use std::{
};
use tokio::{
fs::{File, OpenOptions},
io::{AsyncSeekExt, AsyncWriteExt},
io::AsyncWriteExt,
sync::RwLock,
};
use tracing::{error, trace};
Expand Down Expand Up @@ -101,8 +101,9 @@ impl SegmentLogWriter {
let batch_size = batch.get_size_bytes();
match confirmation {
Confirmation::Wait => {
let position = self.write_batch(batch).await?;
self.log_size_bytes.store(position, Ordering::Release);
self.write_batch(batch).await?;
self.log_size_bytes
.fetch_add(batch_size.as_bytes_u64(), Ordering::AcqRel);
trace!(
"Written batch of size {batch_size} bytes to log file: {}",
self.file_path
Expand All @@ -127,7 +128,7 @@ impl SegmentLogWriter {
}

/// Write a batch of bytes to the log file and return the new file position.
async fn write_batch(&self, batch_to_write: RetainedMessageBatch) -> Result<u64, IggyError> {
async fn write_batch(&self, batch_to_write: RetainedMessageBatch) -> Result<(), IggyError> {
let mut file_guard = self.file.write().await;
if let Some(ref mut file) = *file_guard {
let header = batch_to_write.header_as_bytes();
Expand All @@ -141,18 +142,7 @@ impl SegmentLogWriter {
})
.map_err(|_| IggyError::CannotWriteToFile)?;

let new_position = file
.stream_position()
.await
.with_error_context(|e| {
format!(
"Failed to get position of file: {}, error: {e}",
self.file_path
)
})
.map_err(|_| IggyError::CannotReadFileMetadata)?;

Ok(new_position)
Ok(())
} else {
error!("File handle is not available for synchronous write.");
Err(IggyError::CannotWriteToFile)
Expand Down
52 changes: 22 additions & 30 deletions server/src/streaming/segments/logs/persister_task.rs
Original file line number Diff line number Diff line change
@@ -1,17 +1,15 @@
use crate::streaming::batching::message_batch::RetainedMessageBatch;
use crate::streaming::batching::message_batch::{RetainedMessageBatch, RETAINED_BATCH_OVERHEAD};
use flume::{unbounded, Receiver};
use iggy::{error::IggyError, utils::duration::IggyDuration};
use std::{
io::IoSlice,
sync::{atomic::AtomicU64, Arc},
sync::{
atomic::{AtomicU64, Ordering},
Arc,
},
time::Duration,
};
use tokio::{
fs::File,
io::{AsyncSeekExt, AsyncWriteExt},
select,
time::sleep,
};
use tokio::{fs::File, io::AsyncWriteExt, select, time::sleep};
use tracing::{error, trace, warn};

#[derive(Debug)]
Expand All @@ -35,12 +33,12 @@ impl PersisterTask {
file: File,
file_path: String,
fsync: bool,
file_size: Arc<AtomicU64>,
log_file_size: Arc<AtomicU64>,
max_retries: u32,
retry_delay: IggyDuration,
) -> Self {
let (sender, receiver) = unbounded();
let file_size_clone = file_size.clone();
let log_file_size_clone = log_file_size.clone();
let file_path_clone = file_path.clone();
let handle = tokio::spawn(async move {
Self::run(
Expand All @@ -50,7 +48,7 @@ impl PersisterTask {
fsync,
max_retries,
retry_delay,
file_size_clone,
log_file_size_clone,
)
.await;
});
Expand Down Expand Up @@ -171,12 +169,12 @@ impl PersisterTask {
fsync: bool,
max_retries: u32,
retry_delay: IggyDuration,
file_size: Arc<AtomicU64>,
log_file_size: Arc<AtomicU64>,
) {
while let Ok(request) = receiver.recv_async().await {
match request {
PersisterTaskCommand::WriteRequest(batch_to_write) => {
if let Err(e) = Self::write_with_retries(
match Self::write_with_retries(
&mut file,
&file_path,
batch_to_write,
Expand All @@ -186,21 +184,14 @@ impl PersisterTask {
)
.await
{
error!(
Ok(bytes_written) => {
log_file_size.fetch_add(bytes_written, Ordering::AcqRel);
}
Err(e) => {
error!(
"Failed to persist data in LogPersisterTask for file {file_path}: {:?}",
e
);
} else {
match file.stream_position().await {
Ok(pos) => {
file_size.store(pos, std::sync::atomic::Ordering::Release);
}
Err(e) => {
error!(
"Failed to get file stream position in LogPersisterTask for file {file_path}: {:?}",
e
);
}
)
}
}
}
Expand All @@ -227,18 +218,19 @@ impl PersisterTask {
fsync: bool,
max_retries: u32,
retry_delay: IggyDuration,
) -> Result<(), IggyError> {
) -> Result<u64, IggyError> {
let header = batch_to_write.header_as_bytes();
let batch_bytes = batch_to_write.bytes;
let slices = [IoSlice::new(&header), IoSlice::new(&batch_bytes)];
let bytes_written = RETAINED_BATCH_OVERHEAD + batch_bytes.len() as u64;

let mut attempts = 0;
loop {
match file.write_vectored(&slices).await {
Ok(_) => {
if fsync {
match file.sync_all().await {
Ok(_) => return Ok(()),
Ok(_) => return Ok(bytes_written),
Err(e) => {
attempts += 1;
error!(
Expand All @@ -248,7 +240,7 @@ impl PersisterTask {
}
}
} else {
return Ok(());
return Ok(bytes_written);
}
}
Err(e) => {
Expand All @@ -265,7 +257,7 @@ impl PersisterTask {
);
return Err(IggyError::CannotWriteToFile);
}
tokio::time::sleep(retry_delay.get_duration()).await;
sleep(retry_delay.get_duration()).await;
}
}
}
4 changes: 2 additions & 2 deletions server/src/streaming/streams/stream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ impl Display for Stream {
#[cfg(test)]
mod tests {
use super::*;
use crate::streaming::persistence::persister::{FilePersister, PersisterKind};
use crate::streaming::persistence::persister::{FileWithSyncPersister, PersisterKind};

#[test]
fn should_be_created_given_valid_parameters() {
Expand All @@ -90,7 +90,7 @@ mod tests {
});
let storage = Arc::new(SystemStorage::new(
config.clone(),
Arc::new(PersisterKind::File(FilePersister {})),
Arc::new(PersisterKind::FileWithSync(FileWithSyncPersister {})),
));
let id = 1;
let name = "test";
Expand Down
4 changes: 2 additions & 2 deletions server/src/streaming/streams/topics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -246,7 +246,7 @@ mod tests {
use crate::{
configs::system::SystemConfig,
streaming::{
persistence::persister::{FilePersister, PersisterKind},
persistence::persister::{FileWithSyncPersister, PersisterKind},
storage::SystemStorage,
},
};
Expand All @@ -262,7 +262,7 @@ mod tests {
});
let storage = Arc::new(SystemStorage::new(
config.clone(),
Arc::new(PersisterKind::File(FilePersister {})),
Arc::new(PersisterKind::FileWithSync(FileWithSyncPersister {})),
));
let stream_id = 1;
let stream_name = "test_stream";
Expand Down
4 changes: 2 additions & 2 deletions server/src/streaming/systems/streams.rs
Original file line number Diff line number Diff line change
Expand Up @@ -400,7 +400,7 @@ mod tests {
use crate::configs::server::{DataMaintenanceConfig, PersonalAccessTokenConfig};
use crate::configs::system::SystemConfig;
use crate::state::{MockState, StateKind};
use crate::streaming::persistence::persister::{FilePersister, PersisterKind};
use crate::streaming::persistence::persister::{FileWithSyncPersister, PersisterKind};
use crate::streaming::storage::SystemStorage;
use crate::streaming::users::user::User;
use iggy::users::defaults::{DEFAULT_ROOT_PASSWORD, DEFAULT_ROOT_USERNAME};
Expand All @@ -418,7 +418,7 @@ mod tests {
});
let storage = SystemStorage::new(
config.clone(),
Arc::new(PersisterKind::File(FilePersister {})),
Arc::new(PersisterKind::FileWithSync(FileWithSyncPersister {})),
);

let stream_id = 1;
Expand Down
4 changes: 2 additions & 2 deletions server/src/streaming/topics/consumer_groups.rs
Original file line number Diff line number Diff line change
Expand Up @@ -198,7 +198,7 @@ impl Topic {
mod tests {
use super::*;
use crate::configs::system::SystemConfig;
use crate::streaming::persistence::persister::{FilePersister, PersisterKind};
use crate::streaming::persistence::persister::{FileWithSyncPersister, PersisterKind};
use crate::streaming::storage::SystemStorage;
use iggy::compression::compression_algorithm::CompressionAlgorithm;
use iggy::utils::expiry::IggyExpiry;
Expand Down Expand Up @@ -358,7 +358,7 @@ mod tests {
});
let storage = Arc::new(SystemStorage::new(
config.clone(),
Arc::new(PersisterKind::File(FilePersister {})),
Arc::new(PersisterKind::FileWithSync(FileWithSyncPersister {})),
));
let stream_id = 1;
let id = 2;
Expand Down
4 changes: 2 additions & 2 deletions server/src/streaming/topics/messages.rs
Original file line number Diff line number Diff line change
Expand Up @@ -312,7 +312,7 @@ impl Topic {
mod tests {
use super::*;
use crate::configs::system::SystemConfig;
use crate::streaming::persistence::persister::FilePersister;
use crate::streaming::persistence::persister::FileWithSyncPersister;
use crate::streaming::persistence::persister::PersisterKind;
use crate::streaming::storage::SystemStorage;
use bytes::Bytes;
Expand Down Expand Up @@ -434,7 +434,7 @@ mod tests {
});
let storage = Arc::new(SystemStorage::new(
config.clone(),
Arc::new(PersisterKind::File(FilePersister {})),
Arc::new(PersisterKind::FileWithSync(FileWithSyncPersister {})),
));
let stream_id = 1;
let id = 2;
Expand Down
4 changes: 2 additions & 2 deletions server/src/streaming/topics/topic.rs
Original file line number Diff line number Diff line change
Expand Up @@ -264,7 +264,7 @@ impl fmt::Display for Topic {
#[cfg(test)]
mod tests {
use super::*;
use crate::streaming::persistence::persister::{FilePersister, PersisterKind};
use crate::streaming::persistence::persister::{FileWithSyncPersister, PersisterKind};
use iggy::locking::IggySharedMutFn;
use std::str::FromStr;

Expand All @@ -277,7 +277,7 @@ mod tests {
});
let storage = Arc::new(SystemStorage::new(
config.clone(),
Arc::new(PersisterKind::File(FilePersister {})),
Arc::new(PersisterKind::FileWithSync(FileWithSyncPersister {})),
));

let stream_id = 1;
Expand Down

0 comments on commit 4adba54

Please sign in to comment.