Skip to content

Commit

Permalink
Add "no_wait" mode for file synchronization (#1425)
Browse files Browse the repository at this point in the history
This PR introduces a new "no_wait" mode to the file synchronization
process. The "nowait" mode allows file operations to proceed without
waiting for certain synchronization steps to complete, improving
performance in scenarios where blocking is unnecessary
  • Loading branch information
haze518 authored Jan 9, 2025
1 parent 00f66ea commit 07bf86b
Show file tree
Hide file tree
Showing 36 changed files with 503 additions and 121 deletions.
2 changes: 1 addition & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

15 changes: 15 additions & 0 deletions configs/server.toml
Original file line number Diff line number Diff line change
Expand Up @@ -338,6 +338,14 @@ path = "compatibility"
# `false` allows the OS to manage write operations, which can improve performance.
enforce_fsync = false

# Maximum number of retries for a failed file operation (e.g., append, overwrite).
# This defines how many times the system will attempt the operation before failing.
max_file_operation_retries = 1

# Delay between retries in case of a failed file operation.
# This helps to avoid immediate repeated attempts and can reduce load.
retry_delay = "1 s"

# Runtime configuration.
[system.runtime]
# Path for storing runtime data.
Expand Down Expand Up @@ -452,6 +460,13 @@ size = "1 GB"
# Example: `message_expiry = "2 days 4 hours 15 minutes"` means messages will expire after that duration.
message_expiry = "none"

# Defines the file system confirmation behavior during state updates.
# Controls how the system waits for file write operations to complete.
# Possible values:
# - "wait": waits for the file operation to complete before proceeding.
# - "no_wait": proceeds without waiting for the file operation to finish, potentially increasing performance but at the cost of durability.
server_confirmation = "wait"

# Configures whether expired segments are archived (boolean) or just deleted without archiving.
archive_expired = false

Expand Down
15 changes: 9 additions & 6 deletions integration/tests/streaming/messages.rs
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,8 @@ async fn should_persist_messages_and_then_load_them_by_timestamp() {
Arc::new(AtomicU64::new(0)),
Arc::new(AtomicU32::new(0)),
IggyTimestamp::now(),
);
)
.await;

let mut messages = Vec::with_capacity(messages_count as usize);
let mut appended_messages = Vec::with_capacity(messages_count as usize);
Expand Down Expand Up @@ -119,12 +120,12 @@ async fn should_persist_messages_and_then_load_them_by_timestamp() {
partition.partition_id,
);
partition
.append_messages(appendable_batch_info, messages)
.append_messages(appendable_batch_info, messages, None)
.await
.unwrap();
let test_timestamp = IggyTimestamp::now();
partition
.append_messages(appendable_batch_info_two, messages_two)
.append_messages(appendable_batch_info_two, messages_two, None)
.await
.unwrap();

Expand Down Expand Up @@ -183,7 +184,8 @@ async fn should_persist_messages_and_then_load_them_from_disk() {
Arc::new(AtomicU64::new(0)),
Arc::new(AtomicU32::new(0)),
IggyTimestamp::now(),
);
)
.await;

let mut messages = Vec::with_capacity(messages_count as usize);
let mut appended_messages = Vec::with_capacity(messages_count as usize);
Expand Down Expand Up @@ -229,7 +231,7 @@ async fn should_persist_messages_and_then_load_them_from_disk() {
partition.partition_id,
);
partition
.append_messages(appendable_batch_info, messages)
.append_messages(appendable_batch_info, messages, None)
.await
.unwrap();
assert_eq!(partition.unsaved_messages_count, 0);
Expand All @@ -249,7 +251,8 @@ async fn should_persist_messages_and_then_load_them_from_disk() {
Arc::new(AtomicU64::new(0)),
Arc::new(AtomicU32::new(0)),
now,
);
)
.await;
let partition_state = PartitionState {
id: partition.partition_id,
created_at: now,
Expand Down
17 changes: 11 additions & 6 deletions integration/tests/streaming/partition.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,8 @@ async fn should_persist_partition_with_segment() {
Arc::new(AtomicU64::new(0)),
Arc::new(AtomicU32::new(0)),
IggyTimestamp::now(),
);
)
.await;

partition.persist().await.unwrap();

Expand Down Expand Up @@ -66,7 +67,8 @@ async fn should_load_existing_partition_from_disk() {
Arc::new(AtomicU64::new(0)),
Arc::new(AtomicU32::new(0)),
IggyTimestamp::now(),
);
)
.await;
partition.persist().await.unwrap();
assert_persisted_partition(&partition.partition_path, with_segment).await;

Expand All @@ -85,7 +87,8 @@ async fn should_load_existing_partition_from_disk() {
Arc::new(AtomicU64::new(0)),
Arc::new(AtomicU32::new(0)),
now,
);
)
.await;
let partition_state = PartitionState {
id: partition.partition_id,
created_at: now,
Expand Down Expand Up @@ -139,7 +142,8 @@ async fn should_delete_existing_partition_from_disk() {
Arc::new(AtomicU64::new(0)),
Arc::new(AtomicU32::new(0)),
IggyTimestamp::now(),
);
)
.await;
partition.persist().await.unwrap();
assert_persisted_partition(&partition.partition_path, with_segment).await;

Expand Down Expand Up @@ -172,7 +176,8 @@ async fn should_purge_existing_partition_on_disk() {
Arc::new(AtomicU64::new(0)),
Arc::new(AtomicU32::new(0)),
IggyTimestamp::now(),
);
)
.await;
partition.persist().await.unwrap();
assert_persisted_partition(&partition.partition_path, with_segment).await;
let messages = create_messages();
Expand All @@ -185,7 +190,7 @@ async fn should_purge_existing_partition_on_disk() {
partition.partition_id,
);
partition
.append_messages(appendable_batch_info, messages)
.append_messages(appendable_batch_info, messages, None)
.await
.unwrap();
let loaded_messages = partition.get_messages_by_offset(0, 100).await.unwrap();
Expand Down
117 changes: 107 additions & 10 deletions integration/tests/streaming/segment.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
use crate::streaming::common::test_setup::TestSetup;
use bytes::Bytes;
use iggy::bytes_serializable::BytesSerializable;
use iggy::confirmation::Confirmation;
use iggy::models::messages::{MessageState, PolledMessage};
use iggy::utils::byte_size::IggyByteSize;
use iggy::utils::expiry::IggyExpiry;
Expand All @@ -11,7 +12,9 @@ use server::streaming::segments::segment;
use server::streaming::segments::segment::{INDEX_EXTENSION, LOG_EXTENSION};
use std::sync::atomic::AtomicU64;
use std::sync::Arc;
use std::time::Duration;
use tokio::fs;
use tokio::time::sleep;

#[tokio::test]
async fn should_persist_segment() {
Expand All @@ -35,7 +38,8 @@ async fn should_persist_segment() {
Arc::new(AtomicU64::new(0)),
Arc::new(AtomicU64::new(0)),
Arc::new(AtomicU64::new(0)),
);
)
.await;

setup
.create_partition_directory(stream_id, topic_id, partition_id)
Expand Down Expand Up @@ -73,7 +77,8 @@ async fn should_load_existing_segment_from_disk() {
Arc::new(AtomicU64::new(0)),
Arc::new(AtomicU64::new(0)),
Arc::new(AtomicU64::new(0)),
);
)
.await;
setup
.create_partition_directory(stream_id, topic_id, partition_id)
.await;
Expand All @@ -100,7 +105,8 @@ async fn should_load_existing_segment_from_disk() {
Arc::new(AtomicU64::new(0)),
Arc::new(AtomicU64::new(0)),
Arc::new(AtomicU64::new(0)),
);
)
.await;
loaded_segment.load().await.unwrap();
let loaded_messages = loaded_segment.get_messages(0, 10).await.unwrap();

Expand Down Expand Up @@ -137,7 +143,91 @@ async fn should_persist_and_load_segment_with_messages() {
Arc::new(AtomicU64::new(0)),
Arc::new(AtomicU64::new(0)),
Arc::new(AtomicU64::new(0)),
);
)
.await;

setup
.create_partition_directory(stream_id, topic_id, partition_id)
.await;
segment.persist().await.unwrap();
assert_persisted_segment(
&setup
.config
.get_partition_path(stream_id, topic_id, partition_id),
start_offset,
)
.await;
let messages_count = 10;
let mut messages = Vec::new();
let mut batch_size = IggyByteSize::default();
for i in 0..messages_count {
let message = create_message(i, "test", IggyTimestamp::now());

let retained_message = Arc::new(RetainedMessage {
id: message.id,
offset: message.offset,
timestamp: message.timestamp,
checksum: message.checksum,
message_state: message.state,
headers: message.headers.map(|headers| headers.to_bytes()),
payload: message.payload.clone(),
});
batch_size += retained_message.get_size_bytes();
messages.push(retained_message);
}

segment
.append_batch(batch_size, messages_count as u32, &messages)
.await
.unwrap();
segment.persist_messages(None).await.unwrap();
let mut loaded_segment = segment::Segment::create(
stream_id,
topic_id,
partition_id,
start_offset,
setup.config.clone(),
setup.storage.clone(),
IggyExpiry::NeverExpire,
Arc::new(AtomicU64::new(0)),
Arc::new(AtomicU64::new(0)),
Arc::new(AtomicU64::new(0)),
Arc::new(AtomicU64::new(0)),
Arc::new(AtomicU64::new(0)),
Arc::new(AtomicU64::new(0)),
)
.await;
loaded_segment.load().await.unwrap();
let messages = loaded_segment
.get_messages(0, messages_count as u32)
.await
.unwrap();
assert_eq!(messages.len(), messages_count as usize);
}

#[tokio::test]
async fn should_persist_and_load_segment_with_messages_with_nowait_confirmation() {
let setup = TestSetup::init().await;
let stream_id = 1;
let topic_id = 2;
let partition_id = 3;
let start_offset = 0;
let mut segment = segment::Segment::create(
stream_id,
topic_id,
partition_id,
start_offset,
setup.config.clone(),
setup.storage.clone(),
IggyExpiry::NeverExpire,
Arc::new(AtomicU64::new(0)),
Arc::new(AtomicU64::new(0)),
Arc::new(AtomicU64::new(0)),
Arc::new(AtomicU64::new(0)),
Arc::new(AtomicU64::new(0)),
Arc::new(AtomicU64::new(0)),
)
.await;

setup
.create_partition_directory(stream_id, topic_id, partition_id)
Expand Down Expand Up @@ -173,7 +263,11 @@ async fn should_persist_and_load_segment_with_messages() {
.append_batch(batch_size, messages_count as u32, &messages)
.await
.unwrap();
segment.persist_messages().await.unwrap();
segment
.persist_messages(Some(Confirmation::NoWait))
.await
.unwrap();
sleep(Duration::from_millis(200)).await;
let mut loaded_segment = segment::Segment::create(
stream_id,
topic_id,
Expand All @@ -188,7 +282,8 @@ async fn should_persist_and_load_segment_with_messages() {
Arc::new(AtomicU64::new(0)),
Arc::new(AtomicU64::new(0)),
Arc::new(AtomicU64::new(0)),
);
)
.await;
loaded_segment.load().await.unwrap();
let messages = loaded_segment
.get_messages(0, messages_count as u32)
Expand Down Expand Up @@ -220,7 +315,8 @@ async fn given_all_expired_messages_segment_should_be_expired() {
Arc::new(AtomicU64::new(0)),
Arc::new(AtomicU64::new(0)),
Arc::new(AtomicU64::new(0)),
);
)
.await;

setup
.create_partition_directory(stream_id, topic_id, partition_id)
Expand Down Expand Up @@ -258,7 +354,7 @@ async fn given_all_expired_messages_segment_should_be_expired() {
.append_batch(batch_size, messages_count as u32, &messages)
.await
.unwrap();
segment.persist_messages().await.unwrap();
segment.persist_messages(None).await.unwrap();

segment.is_closed = true;
let is_expired = segment.is_expired(now).await;
Expand Down Expand Up @@ -288,7 +384,8 @@ async fn given_at_least_one_not_expired_message_segment_should_not_be_expired()
Arc::new(AtomicU64::new(0)),
Arc::new(AtomicU64::new(0)),
Arc::new(AtomicU64::new(0)),
);
)
.await;

setup
.create_partition_directory(stream_id, topic_id, partition_id)
Expand Down Expand Up @@ -343,7 +440,7 @@ async fn given_at_least_one_not_expired_message_segment_should_not_be_expired()
.append_batch(not_expired_message_size, 1, &not_expired_messages)
.await
.unwrap();
segment.persist_messages().await.unwrap();
segment.persist_messages(None).await.unwrap();

let is_expired = segment.is_expired(now).await;
assert!(!is_expired);
Expand Down
2 changes: 1 addition & 1 deletion integration/tests/streaming/stream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -134,7 +134,7 @@ async fn should_purge_existing_stream_on_disk() {
.map(|msg| msg.get_size_bytes())
.sum::<IggyByteSize>();
topic
.append_messages(batch_size, Partitioning::partition_id(1), messages)
.append_messages(batch_size, Partitioning::partition_id(1), messages, None)
.await
.unwrap();
let loaded_messages = topic
Expand Down
Loading

0 comments on commit 07bf86b

Please sign in to comment.