From 733037ca429c22ce511d9a16efff8a552c71911d Mon Sep 17 00:00:00 2001 From: Piotr Gankiewicz Date: Wed, 4 Dec 2024 10:45:48 +0100 Subject: [PATCH] Return remaining messages in batch when auto commit is disabled (#1375) --- Cargo.lock | 2 +- sdk/Cargo.toml | 2 +- sdk/src/clients/consumer.rs | 19 ++++++++++++------- 3 files changed, 14 insertions(+), 9 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 5531a6482..e3c6352e7 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2249,7 +2249,7 @@ dependencies = [ [[package]] name = "iggy" -version = "0.6.50" +version = "0.6.51" dependencies = [ "aes-gcm", "ahash 0.8.11", diff --git a/sdk/Cargo.toml b/sdk/Cargo.toml index 71a253330..570c859cb 100644 --- a/sdk/Cargo.toml +++ b/sdk/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "iggy" -version = "0.6.50" +version = "0.6.51" description = "Iggy is the persistent message streaming platform written in Rust, supporting QUIC, TCP and HTTP transport protocols, capable of processing millions of messages per second." edition = "2021" license = "MIT" diff --git a/sdk/src/clients/consumer.rs b/sdk/src/clients/consumer.rs index f01308215..edc57e046 100644 --- a/sdk/src/clients/consumer.rs +++ b/sdk/src/clients/consumer.rs @@ -505,7 +505,7 @@ impl IggyConsumer { ) .await; - if let Ok(polled_messages) = polled_messages { + if let Ok(mut polled_messages) = polled_messages { if polled_messages.messages.is_empty() { return Ok(polled_messages); } @@ -522,12 +522,17 @@ impl IggyConsumer { last_consumed_offset.insert(partition_id, AtomicU64::new(0)); } - if has_consumed_offset && consumed_offset >= polled_messages.messages[0].offset { - return Ok(PolledMessages { - messages: EMPTY_MESSAGES, - current_offset: polled_messages.current_offset, - partition_id, - }); + if has_consumed_offset { + polled_messages + .messages + .retain(|message| message.offset > consumed_offset); + if polled_messages.messages.is_empty() { + return Ok(PolledMessages { + messages: EMPTY_MESSAGES, + current_offset: polled_messages.current_offset, + partition_id, + }); + } } let stored_offset;