From b304b4535466f5e669de802dd1b2453429ffab3d Mon Sep 17 00:00:00 2001 From: "marcin.bobinski" Date: Mon, 23 Dec 2024 12:06:03 +0100 Subject: [PATCH 1/6] SKYEDEN-3020 | commiting offsets change on retransmission --- .../consumers/consumer/BatchConsumer.java | 8 ++-- .../hermes/consumers/consumer/Consumer.java | 4 +- .../consumers/consumer/SerialConsumer.java | 6 +-- .../consumer/batch/MessageBatchReceiver.java | 6 +-- .../broker/KafkaConsumerOffsetMover.java | 40 ++++++++++++++++++- .../consumer/receiver/MessageReceiver.java | 4 +- .../receiver/ThrottlingMessageReceiver.java | 6 +-- .../UninitializedMessageReceiver.java | 4 +- .../kafka/FilteringMessageReceiver.java | 6 +-- .../KafkaSingleThreadedMessageReceiver.java | 5 ++- .../supervisor/process/Retransmitter.java | 40 ++++++------------- .../supervisor/process/ConsumerStub.groovy | 6 +-- 12 files changed, 80 insertions(+), 55 deletions(-) diff --git a/hermes-consumers/src/main/java/pl/allegro/tech/hermes/consumers/consumer/BatchConsumer.java b/hermes-consumers/src/main/java/pl/allegro/tech/hermes/consumers/consumer/BatchConsumer.java index 9f73598e31..32615448d8 100644 --- a/hermes-consumers/src/main/java/pl/allegro/tech/hermes/consumers/consumer/BatchConsumer.java +++ b/hermes-consumers/src/main/java/pl/allegro/tech/hermes/consumers/consumer/BatchConsumer.java @@ -22,7 +22,7 @@ import pl.allegro.tech.hermes.api.BatchSubscriptionPolicy; import pl.allegro.tech.hermes.api.Subscription; import pl.allegro.tech.hermes.api.Topic; -import pl.allegro.tech.hermes.common.kafka.offset.PartitionOffset; +import pl.allegro.tech.hermes.common.kafka.offset.PartitionOffsets; import pl.allegro.tech.hermes.common.message.wrapper.CompositeMessageContentWrapper; import pl.allegro.tech.hermes.common.metric.MetricsFacade; import pl.allegro.tech.hermes.consumers.consumer.batch.MessageBatch; @@ -239,11 +239,11 @@ public void commit(Set offsetsToCommit) { } @Override - public boolean moveOffset(PartitionOffset partitionOffset) { + public PartitionOffsets moveOffset(PartitionOffsets partitionOffsets) { if (receiver != null) { - return receiver.moveOffset(partitionOffset); + return receiver.moveOffset(partitionOffsets); } - return false; + return new PartitionOffsets(); } @Override diff --git a/hermes-consumers/src/main/java/pl/allegro/tech/hermes/consumers/consumer/Consumer.java b/hermes-consumers/src/main/java/pl/allegro/tech/hermes/consumers/consumer/Consumer.java index 5e88be3da4..40e4d96169 100644 --- a/hermes-consumers/src/main/java/pl/allegro/tech/hermes/consumers/consumer/Consumer.java +++ b/hermes-consumers/src/main/java/pl/allegro/tech/hermes/consumers/consumer/Consumer.java @@ -3,7 +3,7 @@ import java.util.Set; import pl.allegro.tech.hermes.api.Subscription; import pl.allegro.tech.hermes.api.Topic; -import pl.allegro.tech.hermes.common.kafka.offset.PartitionOffset; +import pl.allegro.tech.hermes.common.kafka.offset.PartitionOffsets; import pl.allegro.tech.hermes.consumers.consumer.offset.SubscriptionPartitionOffset; public interface Consumer { @@ -26,7 +26,7 @@ public interface Consumer { void commit(Set offsets); - boolean moveOffset(PartitionOffset subscriptionPartitionOffset); + PartitionOffsets moveOffset(PartitionOffsets subscriptionPartitionOffsets); Subscription getSubscription(); } diff --git a/hermes-consumers/src/main/java/pl/allegro/tech/hermes/consumers/consumer/SerialConsumer.java b/hermes-consumers/src/main/java/pl/allegro/tech/hermes/consumers/consumer/SerialConsumer.java index 0897a48e34..e09bce278e 100644 --- a/hermes-consumers/src/main/java/pl/allegro/tech/hermes/consumers/consumer/SerialConsumer.java +++ b/hermes-consumers/src/main/java/pl/allegro/tech/hermes/consumers/consumer/SerialConsumer.java @@ -11,7 +11,7 @@ import org.slf4j.LoggerFactory; import pl.allegro.tech.hermes.api.Subscription; import pl.allegro.tech.hermes.api.Topic; -import pl.allegro.tech.hermes.common.kafka.offset.PartitionOffset; +import pl.allegro.tech.hermes.common.kafka.offset.PartitionOffsets; import pl.allegro.tech.hermes.common.metric.MetricsFacade; import pl.allegro.tech.hermes.consumers.CommonConsumerParameters; import pl.allegro.tech.hermes.consumers.consumer.converter.MessageConverterResolver; @@ -262,8 +262,8 @@ public void commit(Set offsets) { } @Override - public boolean moveOffset(PartitionOffset offset) { - return messageReceiver.moveOffset(offset); + public PartitionOffsets moveOffset(PartitionOffsets offsets) { + return messageReceiver.moveOffset(offsets); } @Override diff --git a/hermes-consumers/src/main/java/pl/allegro/tech/hermes/consumers/consumer/batch/MessageBatchReceiver.java b/hermes-consumers/src/main/java/pl/allegro/tech/hermes/consumers/consumer/batch/MessageBatchReceiver.java index 1011bf9b14..2672f16a62 100644 --- a/hermes-consumers/src/main/java/pl/allegro/tech/hermes/consumers/consumer/batch/MessageBatchReceiver.java +++ b/hermes-consumers/src/main/java/pl/allegro/tech/hermes/consumers/consumer/batch/MessageBatchReceiver.java @@ -16,7 +16,7 @@ import org.slf4j.LoggerFactory; import pl.allegro.tech.hermes.api.Subscription; import pl.allegro.tech.hermes.api.Topic; -import pl.allegro.tech.hermes.common.kafka.offset.PartitionOffset; +import pl.allegro.tech.hermes.common.kafka.offset.PartitionOffsets; import pl.allegro.tech.hermes.common.message.wrapper.CompositeMessageContentWrapper; import pl.allegro.tech.hermes.common.message.wrapper.UnsupportedContentTypeException; import pl.allegro.tech.hermes.consumers.consumer.Message; @@ -180,7 +180,7 @@ public void commit(Set offsets) { receiver.commit(offsets); } - public boolean moveOffset(PartitionOffset offset) { - return receiver.moveOffset(offset); + public PartitionOffsets moveOffset(PartitionOffsets offsets) { + return receiver.moveOffset(offsets); } } diff --git a/hermes-consumers/src/main/java/pl/allegro/tech/hermes/consumers/consumer/offset/kafka/broker/KafkaConsumerOffsetMover.java b/hermes-consumers/src/main/java/pl/allegro/tech/hermes/consumers/consumer/offset/kafka/broker/KafkaConsumerOffsetMover.java index 7f26db83f2..610489cec6 100644 --- a/hermes-consumers/src/main/java/pl/allegro/tech/hermes/consumers/consumer/offset/kafka/broker/KafkaConsumerOffsetMover.java +++ b/hermes-consumers/src/main/java/pl/allegro/tech/hermes/consumers/consumer/offset/kafka/broker/KafkaConsumerOffsetMover.java @@ -1,11 +1,15 @@ package pl.allegro.tech.hermes.consumers.consumer.offset.kafka.broker; +import java.util.LinkedHashMap; +import java.util.Map; import org.apache.kafka.clients.consumer.KafkaConsumer; +import org.apache.kafka.clients.consumer.OffsetAndMetadata; import org.apache.kafka.common.TopicPartition; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import pl.allegro.tech.hermes.api.SubscriptionName; import pl.allegro.tech.hermes.common.kafka.offset.PartitionOffset; +import pl.allegro.tech.hermes.common.kafka.offset.PartitionOffsets; public class KafkaConsumerOffsetMover { @@ -19,7 +23,21 @@ public KafkaConsumerOffsetMover(SubscriptionName subscriptionName, KafkaConsumer this.consumer = consumer; } - public boolean move(PartitionOffset offset) { + public PartitionOffsets move(PartitionOffsets offsets) { + PartitionOffsets movedOffsets = new PartitionOffsets(); + + for (PartitionOffset offset : offsets) { + if (move(offset)) { + movedOffsets.add(offset); + } + } + + commit(movedOffsets); + + return movedOffsets; + } + + private boolean move(PartitionOffset offset) { try { TopicPartition tp = new TopicPartition(offset.getTopic().asString(), offset.getPartition()); if (consumer.assignment().contains(tp)) { @@ -46,4 +64,24 @@ public boolean move(PartitionOffset offset) { return false; } } + + private void commit(PartitionOffsets partitionOffsets) { + try { + Map offsetsToCommit = new LinkedHashMap<>(); + for (PartitionOffset partitionOffset : partitionOffsets) { + offsetsToCommit.put( + new TopicPartition( + partitionOffset.getTopic().asString(), partitionOffset.getPartition()), + new OffsetAndMetadata(partitionOffset.getOffset())); + } + if (!offsetsToCommit.isEmpty()) { + consumer.commitSync(offsetsToCommit); + } + } catch (Exception e) { + logger.error( + "Failed to commit offsets while trying to move them for subscription {}", + subscriptionName, + e); + } + } } diff --git a/hermes-consumers/src/main/java/pl/allegro/tech/hermes/consumers/consumer/receiver/MessageReceiver.java b/hermes-consumers/src/main/java/pl/allegro/tech/hermes/consumers/consumer/receiver/MessageReceiver.java index 797539028a..edbc774a71 100644 --- a/hermes-consumers/src/main/java/pl/allegro/tech/hermes/consumers/consumer/receiver/MessageReceiver.java +++ b/hermes-consumers/src/main/java/pl/allegro/tech/hermes/consumers/consumer/receiver/MessageReceiver.java @@ -3,7 +3,7 @@ import java.util.Optional; import java.util.Set; import pl.allegro.tech.hermes.api.Subscription; -import pl.allegro.tech.hermes.common.kafka.offset.PartitionOffset; +import pl.allegro.tech.hermes.common.kafka.offset.PartitionOffsets; import pl.allegro.tech.hermes.consumers.consumer.Message; import pl.allegro.tech.hermes.consumers.consumer.offset.SubscriptionPartitionOffset; @@ -33,5 +33,5 @@ default void update(Subscription newSubscription) {} void commit(Set offsets); - boolean moveOffset(PartitionOffset offset); + PartitionOffsets moveOffset(PartitionOffsets offsets); } diff --git a/hermes-consumers/src/main/java/pl/allegro/tech/hermes/consumers/consumer/receiver/ThrottlingMessageReceiver.java b/hermes-consumers/src/main/java/pl/allegro/tech/hermes/consumers/consumer/receiver/ThrottlingMessageReceiver.java index 7abe865d26..94d4d196ab 100644 --- a/hermes-consumers/src/main/java/pl/allegro/tech/hermes/consumers/consumer/receiver/ThrottlingMessageReceiver.java +++ b/hermes-consumers/src/main/java/pl/allegro/tech/hermes/consumers/consumer/receiver/ThrottlingMessageReceiver.java @@ -4,7 +4,7 @@ import java.util.Set; import pl.allegro.tech.hermes.api.Subscription; import pl.allegro.tech.hermes.api.SubscriptionName; -import pl.allegro.tech.hermes.common.kafka.offset.PartitionOffset; +import pl.allegro.tech.hermes.common.kafka.offset.PartitionOffsets; import pl.allegro.tech.hermes.common.metric.MetricsFacade; import pl.allegro.tech.hermes.consumers.consumer.Message; import pl.allegro.tech.hermes.consumers.consumer.idletime.IdleTimeCalculator; @@ -53,8 +53,8 @@ public void commit(Set offsets) { } @Override - public boolean moveOffset(PartitionOffset offset) { - return receiver.moveOffset(offset); + public PartitionOffsets moveOffset(PartitionOffsets offsets) { + return receiver.moveOffset(offsets); } @Override diff --git a/hermes-consumers/src/main/java/pl/allegro/tech/hermes/consumers/consumer/receiver/UninitializedMessageReceiver.java b/hermes-consumers/src/main/java/pl/allegro/tech/hermes/consumers/consumer/receiver/UninitializedMessageReceiver.java index 591a24384d..146000d738 100644 --- a/hermes-consumers/src/main/java/pl/allegro/tech/hermes/consumers/consumer/receiver/UninitializedMessageReceiver.java +++ b/hermes-consumers/src/main/java/pl/allegro/tech/hermes/consumers/consumer/receiver/UninitializedMessageReceiver.java @@ -2,7 +2,7 @@ import java.util.Optional; import java.util.Set; -import pl.allegro.tech.hermes.common.kafka.offset.PartitionOffset; +import pl.allegro.tech.hermes.common.kafka.offset.PartitionOffsets; import pl.allegro.tech.hermes.consumers.consumer.Message; import pl.allegro.tech.hermes.consumers.consumer.offset.SubscriptionPartitionOffset; @@ -18,7 +18,7 @@ public void commit(Set offsets) { } @Override - public boolean moveOffset(PartitionOffset offset) { + public PartitionOffsets moveOffset(PartitionOffsets offsets) { throw new ConsumerNotInitializedException(); } } diff --git a/hermes-consumers/src/main/java/pl/allegro/tech/hermes/consumers/consumer/receiver/kafka/FilteringMessageReceiver.java b/hermes-consumers/src/main/java/pl/allegro/tech/hermes/consumers/consumer/receiver/kafka/FilteringMessageReceiver.java index 67ac4f0b6a..8d0dc30765 100644 --- a/hermes-consumers/src/main/java/pl/allegro/tech/hermes/consumers/consumer/receiver/kafka/FilteringMessageReceiver.java +++ b/hermes-consumers/src/main/java/pl/allegro/tech/hermes/consumers/consumer/receiver/kafka/FilteringMessageReceiver.java @@ -4,7 +4,7 @@ import java.util.Optional; import java.util.Set; import pl.allegro.tech.hermes.api.Subscription; -import pl.allegro.tech.hermes.common.kafka.offset.PartitionOffset; +import pl.allegro.tech.hermes.common.kafka.offset.PartitionOffsets; import pl.allegro.tech.hermes.consumers.consumer.Message; import pl.allegro.tech.hermes.consumers.consumer.filtering.FilteredMessageHandler; import pl.allegro.tech.hermes.consumers.consumer.offset.SubscriptionPartitionOffset; @@ -65,7 +65,7 @@ public void commit(Set offsets) { } @Override - public boolean moveOffset(PartitionOffset offset) { - return receiver.moveOffset(offset); + public PartitionOffsets moveOffset(PartitionOffsets offsets) { + return receiver.moveOffset(offsets); } } diff --git a/hermes-consumers/src/main/java/pl/allegro/tech/hermes/consumers/consumer/receiver/kafka/KafkaSingleThreadedMessageReceiver.java b/hermes-consumers/src/main/java/pl/allegro/tech/hermes/consumers/consumer/receiver/kafka/KafkaSingleThreadedMessageReceiver.java index e256371bc3..a9237864b6 100644 --- a/hermes-consumers/src/main/java/pl/allegro/tech/hermes/consumers/consumer/receiver/kafka/KafkaSingleThreadedMessageReceiver.java +++ b/hermes-consumers/src/main/java/pl/allegro/tech/hermes/consumers/consumer/receiver/kafka/KafkaSingleThreadedMessageReceiver.java @@ -26,6 +26,7 @@ import pl.allegro.tech.hermes.common.kafka.KafkaTopic; import pl.allegro.tech.hermes.common.kafka.KafkaTopics; import pl.allegro.tech.hermes.common.kafka.offset.PartitionOffset; +import pl.allegro.tech.hermes.common.kafka.offset.PartitionOffsets; import pl.allegro.tech.hermes.common.metric.MetricsFacade; import pl.allegro.tech.hermes.consumers.consumer.Message; import pl.allegro.tech.hermes.consumers.consumer.load.SubscriptionLoadRecorder; @@ -223,7 +224,7 @@ private Map createOffset( } @Override - public boolean moveOffset(PartitionOffset offset) { - return offsetMover.move(offset); + public PartitionOffsets moveOffset(PartitionOffsets offsets) { + return offsetMover.move(offsets); } } diff --git a/hermes-consumers/src/main/java/pl/allegro/tech/hermes/consumers/supervisor/process/Retransmitter.java b/hermes-consumers/src/main/java/pl/allegro/tech/hermes/consumers/supervisor/process/Retransmitter.java index 3ccd141f89..1f259bfa93 100644 --- a/hermes-consumers/src/main/java/pl/allegro/tech/hermes/consumers/supervisor/process/Retransmitter.java +++ b/hermes-consumers/src/main/java/pl/allegro/tech/hermes/consumers/supervisor/process/Retransmitter.java @@ -30,36 +30,22 @@ public void reloadOffsets(SubscriptionName subscriptionName, Consumer consumer) subscriptionOffsetChangeIndicator.getSubscriptionOffsets( subscriptionName.getTopicName(), subscriptionName.getName(), brokersClusterName); - for (PartitionOffset partitionOffset : offsets) { - if (moveOffset(subscriptionName, consumer, partitionOffset)) { - subscriptionOffsetChangeIndicator.removeOffset( - subscriptionName.getTopicName(), - subscriptionName.getName(), - brokersClusterName, - partitionOffset.getTopic(), - partitionOffset.getPartition()); - logger.info( - "Removed offset indicator for subscription={} and partition={}", - subscriptionName, - partitionOffset.getPartition()); - } + PartitionOffsets movedOffsets = consumer.moveOffset(offsets); + + for (PartitionOffset partitionOffset : movedOffsets) { + subscriptionOffsetChangeIndicator.removeOffset( + subscriptionName.getTopicName(), + subscriptionName.getName(), + brokersClusterName, + partitionOffset.getTopic(), + partitionOffset.getPartition()); + logger.info( + "Removed offset indicator for subscription={} and partition={}", + subscriptionName, + partitionOffset.getPartition()); } } catch (Exception ex) { throw new RetransmissionException(ex); } } - - private boolean moveOffset( - SubscriptionName subscriptionName, Consumer consumer, PartitionOffset partitionOffset) { - try { - return consumer.moveOffset(partitionOffset); - } catch (IllegalStateException ex) { - logger.warn( - "Cannot move offset for subscription={} and partition={} , possibly owned by different node", - subscriptionName, - partitionOffset.getPartition(), - ex); - return false; - } - } } diff --git a/hermes-consumers/src/test/groovy/pl/allegro/tech/hermes/consumers/supervisor/process/ConsumerStub.groovy b/hermes-consumers/src/test/groovy/pl/allegro/tech/hermes/consumers/supervisor/process/ConsumerStub.groovy index ae3a213ac0..50608d0c8b 100644 --- a/hermes-consumers/src/test/groovy/pl/allegro/tech/hermes/consumers/supervisor/process/ConsumerStub.groovy +++ b/hermes-consumers/src/test/groovy/pl/allegro/tech/hermes/consumers/supervisor/process/ConsumerStub.groovy @@ -2,7 +2,7 @@ package pl.allegro.tech.hermes.consumers.supervisor.process import pl.allegro.tech.hermes.api.Subscription import pl.allegro.tech.hermes.api.Topic -import pl.allegro.tech.hermes.common.kafka.offset.PartitionOffset +import pl.allegro.tech.hermes.common.kafka.offset.PartitionOffsets import pl.allegro.tech.hermes.consumers.consumer.Consumer import pl.allegro.tech.hermes.consumers.consumer.offset.SubscriptionPartitionOffset @@ -73,8 +73,8 @@ class ConsumerStub implements Consumer { } @Override - boolean moveOffset(PartitionOffset partitionOffset) { - return true + PartitionOffsets moveOffset(PartitionOffsets partitionOffset) { + return partitionOffset } boolean getInitialized() { From 94fd5634a284fba954666fdb2148f5fb7f2ba02a Mon Sep 17 00:00:00 2001 From: "marcin.bobinski" Date: Mon, 23 Dec 2024 14:33:33 +0100 Subject: [PATCH 2/6] SKYEDEN-3020 | prevent overriding commited offset with lower offset --- .../KafkaSingleThreadedMessageReceiver.java | 25 +++++++++++++++++-- 1 file changed, 23 insertions(+), 2 deletions(-) diff --git a/hermes-consumers/src/main/java/pl/allegro/tech/hermes/consumers/consumer/receiver/kafka/KafkaSingleThreadedMessageReceiver.java b/hermes-consumers/src/main/java/pl/allegro/tech/hermes/consumers/consumer/receiver/kafka/KafkaSingleThreadedMessageReceiver.java index a9237864b6..81102b4da7 100644 --- a/hermes-consumers/src/main/java/pl/allegro/tech/hermes/consumers/consumer/receiver/kafka/KafkaSingleThreadedMessageReceiver.java +++ b/hermes-consumers/src/main/java/pl/allegro/tech/hermes/consumers/consumer/receiver/kafka/KafkaSingleThreadedMessageReceiver.java @@ -25,7 +25,6 @@ import pl.allegro.tech.hermes.common.kafka.KafkaNamesMapper; import pl.allegro.tech.hermes.common.kafka.KafkaTopic; import pl.allegro.tech.hermes.common.kafka.KafkaTopics; -import pl.allegro.tech.hermes.common.kafka.offset.PartitionOffset; import pl.allegro.tech.hermes.common.kafka.offset.PartitionOffsets; import pl.allegro.tech.hermes.common.metric.MetricsFacade; import pl.allegro.tech.hermes.consumers.consumer.Message; @@ -196,15 +195,24 @@ public void commit(Set offsets) { private Map createOffset( Set partitionOffsets) { + Map commitedOffsetsData = + fetchCommitedOffsetsMetadata(partitionOffsets); + Map offsetsData = new LinkedHashMap<>(); for (SubscriptionPartitionOffset partitionOffset : partitionOffsets) { TopicPartition topicAndPartition = new TopicPartition( partitionOffset.getKafkaTopicName().asString(), partitionOffset.getPartition()); + Long commitedOffset = + Optional.ofNullable(commitedOffsetsData.get(topicAndPartition)) + .map(OffsetAndMetadata::offset) + .orElse(Long.MIN_VALUE); + if (partitionAssignmentState.isAssignedPartitionAtCurrentTerm( partitionOffset.getSubscriptionPartition())) { - if (consumer.position(topicAndPartition) >= partitionOffset.getOffset()) { + if (consumer.position(topicAndPartition) >= partitionOffset.getOffset() + && partitionOffset.getOffset() > commitedOffset) { offsetsData.put(topicAndPartition, new OffsetAndMetadata(partitionOffset.getOffset())); } else { skippedCounter.increment(); @@ -227,4 +235,17 @@ private Map createOffset( public PartitionOffsets moveOffset(PartitionOffsets offsets) { return offsetMover.move(offsets); } + + private Map fetchCommitedOffsetsMetadata( + Set partitionOffsets) { + Set topicPartitions = + partitionOffsets.stream() + .map( + offset -> + new TopicPartition( + offset.getKafkaTopicName().asString(), offset.getPartition())) + .collect(Collectors.toSet()); + + return consumer.committed(topicPartitions); + } } From 1c302b6fa6d19c9d849878cdf86669d8dcd7fda9 Mon Sep 17 00:00:00 2001 From: "marcin.bobinski" Date: Tue, 7 Jan 2025 12:39:48 +0100 Subject: [PATCH 3/6] SKYEDEN-3020 | tests + new approach --- .../common/kafka/offset/PartitionOffsets.java | 4 +++ .../ConsumerPartitionAssignmentState.java | 2 +- .../broker/KafkaConsumerOffsetMover.java | 14 +++++++++- .../KafkaSingleThreadedMessageReceiver.java | 27 +++---------------- .../client/integration/HermesTestClient.java | 2 +- .../KafkaRetransmissionServiceTest.java | 14 ++++++++++ 6 files changed, 37 insertions(+), 26 deletions(-) diff --git a/hermes-common/src/main/java/pl/allegro/tech/hermes/common/kafka/offset/PartitionOffsets.java b/hermes-common/src/main/java/pl/allegro/tech/hermes/common/kafka/offset/PartitionOffsets.java index c0c5b1ef30..4a9e552d62 100644 --- a/hermes-common/src/main/java/pl/allegro/tech/hermes/common/kafka/offset/PartitionOffsets.java +++ b/hermes-common/src/main/java/pl/allegro/tech/hermes/common/kafka/offset/PartitionOffsets.java @@ -22,4 +22,8 @@ public PartitionOffsets addAll(PartitionOffsets offsets) { public Iterator iterator() { return offsets.iterator(); } + + public boolean isEmpty() { + return offsets.isEmpty(); + } } diff --git a/hermes-consumers/src/main/java/pl/allegro/tech/hermes/consumers/consumer/offset/ConsumerPartitionAssignmentState.java b/hermes-consumers/src/main/java/pl/allegro/tech/hermes/consumers/consumer/offset/ConsumerPartitionAssignmentState.java index e6c318f369..b6214905dc 100644 --- a/hermes-consumers/src/main/java/pl/allegro/tech/hermes/consumers/consumer/offset/ConsumerPartitionAssignmentState.java +++ b/hermes-consumers/src/main/java/pl/allegro/tech/hermes/consumers/consumer/offset/ConsumerPartitionAssignmentState.java @@ -35,7 +35,7 @@ public void assign(SubscriptionName name, Collection partitions) { })); } - private void incrementTerm(SubscriptionName name) { + public void incrementTerm(SubscriptionName name) { terms.compute(name, ((subscriptionName, term) -> term == null ? 0L : term + 1L)); } diff --git a/hermes-consumers/src/main/java/pl/allegro/tech/hermes/consumers/consumer/offset/kafka/broker/KafkaConsumerOffsetMover.java b/hermes-consumers/src/main/java/pl/allegro/tech/hermes/consumers/consumer/offset/kafka/broker/KafkaConsumerOffsetMover.java index 610489cec6..ef3028ffe2 100644 --- a/hermes-consumers/src/main/java/pl/allegro/tech/hermes/consumers/consumer/offset/kafka/broker/KafkaConsumerOffsetMover.java +++ b/hermes-consumers/src/main/java/pl/allegro/tech/hermes/consumers/consumer/offset/kafka/broker/KafkaConsumerOffsetMover.java @@ -10,6 +10,7 @@ import pl.allegro.tech.hermes.api.SubscriptionName; import pl.allegro.tech.hermes.common.kafka.offset.PartitionOffset; import pl.allegro.tech.hermes.common.kafka.offset.PartitionOffsets; +import pl.allegro.tech.hermes.consumers.consumer.offset.ConsumerPartitionAssignmentState; public class KafkaConsumerOffsetMover { @@ -17,10 +18,15 @@ public class KafkaConsumerOffsetMover { private final SubscriptionName subscriptionName; private KafkaConsumer consumer; + private ConsumerPartitionAssignmentState partitionAssignmentState; - public KafkaConsumerOffsetMover(SubscriptionName subscriptionName, KafkaConsumer consumer) { + public KafkaConsumerOffsetMover( + SubscriptionName subscriptionName, + KafkaConsumer consumer, + ConsumerPartitionAssignmentState partitionAssignmentState) { this.subscriptionName = subscriptionName; this.consumer = consumer; + this.partitionAssignmentState = partitionAssignmentState; } public PartitionOffsets move(PartitionOffsets offsets) { @@ -34,6 +40,12 @@ public PartitionOffsets move(PartitionOffsets offsets) { commit(movedOffsets); + if (!movedOffsets.isEmpty()) { + // Incrementing assignment term ensures that currently committed offsets won't be overwritten + // by the events from the past which are concurrently processed by the consumer + partitionAssignmentState.incrementTerm(subscriptionName); + } + return movedOffsets; } diff --git a/hermes-consumers/src/main/java/pl/allegro/tech/hermes/consumers/consumer/receiver/kafka/KafkaSingleThreadedMessageReceiver.java b/hermes-consumers/src/main/java/pl/allegro/tech/hermes/consumers/consumer/receiver/kafka/KafkaSingleThreadedMessageReceiver.java index 81102b4da7..0baf30fd9a 100644 --- a/hermes-consumers/src/main/java/pl/allegro/tech/hermes/consumers/consumer/receiver/kafka/KafkaSingleThreadedMessageReceiver.java +++ b/hermes-consumers/src/main/java/pl/allegro/tech/hermes/consumers/consumer/receiver/kafka/KafkaSingleThreadedMessageReceiver.java @@ -74,7 +74,9 @@ public KafkaSingleThreadedMessageReceiver( this.partitionAssignmentState = partitionAssignmentState; this.consumer = consumer; this.readQueue = new ArrayBlockingQueue<>(readQueueCapacity); - this.offsetMover = new KafkaConsumerOffsetMover(subscription.getQualifiedName(), consumer); + this.offsetMover = + new KafkaConsumerOffsetMover( + subscription.getQualifiedName(), consumer, partitionAssignmentState); Map topics = getKafkaTopics(topic, kafkaNamesMapper).stream() .collect(Collectors.toMap(t -> t.name().asString(), Function.identity())); @@ -195,8 +197,6 @@ public void commit(Set offsets) { private Map createOffset( Set partitionOffsets) { - Map commitedOffsetsData = - fetchCommitedOffsetsMetadata(partitionOffsets); Map offsetsData = new LinkedHashMap<>(); for (SubscriptionPartitionOffset partitionOffset : partitionOffsets) { @@ -204,15 +204,9 @@ private Map createOffset( new TopicPartition( partitionOffset.getKafkaTopicName().asString(), partitionOffset.getPartition()); - Long commitedOffset = - Optional.ofNullable(commitedOffsetsData.get(topicAndPartition)) - .map(OffsetAndMetadata::offset) - .orElse(Long.MIN_VALUE); - if (partitionAssignmentState.isAssignedPartitionAtCurrentTerm( partitionOffset.getSubscriptionPartition())) { - if (consumer.position(topicAndPartition) >= partitionOffset.getOffset() - && partitionOffset.getOffset() > commitedOffset) { + if (consumer.position(topicAndPartition) >= partitionOffset.getOffset()) { offsetsData.put(topicAndPartition, new OffsetAndMetadata(partitionOffset.getOffset())); } else { skippedCounter.increment(); @@ -235,17 +229,4 @@ private Map createOffset( public PartitionOffsets moveOffset(PartitionOffsets offsets) { return offsetMover.move(offsets); } - - private Map fetchCommitedOffsetsMetadata( - Set partitionOffsets) { - Set topicPartitions = - partitionOffsets.stream() - .map( - offset -> - new TopicPartition( - offset.getKafkaTopicName().asString(), offset.getPartition())) - .collect(Collectors.toSet()); - - return consumer.committed(topicPartitions); - } } diff --git a/hermes-test-helper/src/main/java/pl/allegro/tech/hermes/test/helper/client/integration/HermesTestClient.java b/hermes-test-helper/src/main/java/pl/allegro/tech/hermes/test/helper/client/integration/HermesTestClient.java index c9a8293b0b..fbcfe4fdab 100644 --- a/hermes-test-helper/src/main/java/pl/allegro/tech/hermes/test/helper/client/integration/HermesTestClient.java +++ b/hermes-test-helper/src/main/java/pl/allegro/tech/hermes/test/helper/client/integration/HermesTestClient.java @@ -182,7 +182,7 @@ public void waitUntilConsumerCommitsOffset(String topicQualifiedName, String sub }); } - private long calculateCommittedMessages(String topicQualifiedName, String subscription) { + public long calculateCommittedMessages(String topicQualifiedName, String subscription) { AtomicLong messagesCommittedCount = new AtomicLong(0); List consumerGroups = getConsumerGroupsDescription(topicQualifiedName, subscription) diff --git a/integration-tests/src/integrationTest/java/pl/allegro/tech/hermes/integrationtests/KafkaRetransmissionServiceTest.java b/integration-tests/src/integrationTest/java/pl/allegro/tech/hermes/integrationtests/KafkaRetransmissionServiceTest.java index 94c7eb8f43..f23aae03ec 100644 --- a/integration-tests/src/integrationTest/java/pl/allegro/tech/hermes/integrationtests/KafkaRetransmissionServiceTest.java +++ b/integration-tests/src/integrationTest/java/pl/allegro/tech/hermes/integrationtests/KafkaRetransmissionServiceTest.java @@ -71,6 +71,9 @@ public void shouldMoveOffsetNearGivenTimestamp() throws InterruptedException { publishAndConsumeMessages(messages2, topic, subscriber); hermes.api().waitUntilConsumerCommitsOffset(topic.getQualifiedName(), subscription.getName()); + long commitedMessages = + hermes.api().calculateCommittedMessages(topic.getQualifiedName(), subscription.getName()); + // when WebTestClient.ResponseSpec response = hermes @@ -80,7 +83,18 @@ public void shouldMoveOffsetNearGivenTimestamp() throws InterruptedException { // then response.expectStatus().isOk(); + assertThat( + hermes + .api() + .calculateCommittedMessages(topic.getQualifiedName(), subscription.getName())) + .isLessThan(commitedMessages); messages2.forEach(subscriber::waitUntilReceived); + hermes.api().waitUntilConsumerCommitsOffset(topic.getQualifiedName(), subscription.getName()); + assertThat( + hermes + .api() + .calculateCommittedMessages(topic.getQualifiedName(), subscription.getName())) + .isEqualTo(commitedMessages); } @Test From eff890bb5e333672b3a54d4fbaa3361c939d6a01 Mon Sep 17 00:00:00 2001 From: "marcin.bobinski" Date: Tue, 7 Jan 2025 12:43:36 +0100 Subject: [PATCH 4/6] SKYEDEN-3020 | add comment --- .../hermes/integrationtests/KafkaRetransmissionServiceTest.java | 1 + 1 file changed, 1 insertion(+) diff --git a/integration-tests/src/integrationTest/java/pl/allegro/tech/hermes/integrationtests/KafkaRetransmissionServiceTest.java b/integration-tests/src/integrationTest/java/pl/allegro/tech/hermes/integrationtests/KafkaRetransmissionServiceTest.java index f23aae03ec..8d8584b8ea 100644 --- a/integration-tests/src/integrationTest/java/pl/allegro/tech/hermes/integrationtests/KafkaRetransmissionServiceTest.java +++ b/integration-tests/src/integrationTest/java/pl/allegro/tech/hermes/integrationtests/KafkaRetransmissionServiceTest.java @@ -83,6 +83,7 @@ public void shouldMoveOffsetNearGivenTimestamp() throws InterruptedException { // then response.expectStatus().isOk(); + // Check if Kafka committed offsets were moved on retransmission assertThat( hermes .api() From 4e2865e0ac7d64a9fb8fa99537a0953ca52f5c95 Mon Sep 17 00:00:00 2001 From: "marcin.bobinski" Date: Fri, 10 Jan 2025 09:51:23 +0100 Subject: [PATCH 5/6] SKYEDEN-3020 | resolve conflicts --- .../KafkaRetransmissionServiceTest.java | 13 +++++++------ 1 file changed, 7 insertions(+), 6 deletions(-) diff --git a/integration-tests/src/integrationTest/java/pl/allegro/tech/hermes/integrationtests/KafkaRetransmissionServiceTest.java b/integration-tests/src/integrationTest/java/pl/allegro/tech/hermes/integrationtests/KafkaRetransmissionServiceTest.java index e17e410b0c..9c6f47cc43 100644 --- a/integration-tests/src/integrationTest/java/pl/allegro/tech/hermes/integrationtests/KafkaRetransmissionServiceTest.java +++ b/integration-tests/src/integrationTest/java/pl/allegro/tech/hermes/integrationtests/KafkaRetransmissionServiceTest.java @@ -90,19 +90,20 @@ public void shouldMoveOffsetNearGivenTimestamp(boolean suspendedSubscription) .retransmit( topic.getQualifiedName(), subscription.getName(), retransmissionDate, false); - if (suspendedSubscription) { - hermes.api().activateSubscription(topic, subscription.getName()); - hermes.api().waitUntilSubscriptionActivated(topic.getQualifiedName(), subscription.getName()); - } - // then - response.expectStatus().isOk(); // Check if Kafka committed offsets were moved on retransmission assertThat( hermes .api() .calculateCommittedMessages(topic.getQualifiedName(), subscription.getName())) .isLessThan(commitedMessages); + response.expectStatus().isOk(); + + if (suspendedSubscription) { + hermes.api().activateSubscription(topic, subscription.getName()); + hermes.api().waitUntilSubscriptionActivated(topic.getQualifiedName(), subscription.getName()); + } + messages2.forEach(subscriber::waitUntilReceived); hermes.api().waitUntilConsumerCommitsOffset(topic.getQualifiedName(), subscription.getName()); assertThat( From 33f6a248d8782ac6f6d7e4b4cf3eb27d024c4d6a Mon Sep 17 00:00:00 2001 From: "marcin.bobinski" Date: Fri, 10 Jan 2025 09:56:57 +0100 Subject: [PATCH 6/6] SKYEDEN-3020 | refactor --- .../hermes/integrationtests/KafkaRetransmissionServiceTest.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/integration-tests/src/integrationTest/java/pl/allegro/tech/hermes/integrationtests/KafkaRetransmissionServiceTest.java b/integration-tests/src/integrationTest/java/pl/allegro/tech/hermes/integrationtests/KafkaRetransmissionServiceTest.java index 9c6f47cc43..0daa5be05a 100644 --- a/integration-tests/src/integrationTest/java/pl/allegro/tech/hermes/integrationtests/KafkaRetransmissionServiceTest.java +++ b/integration-tests/src/integrationTest/java/pl/allegro/tech/hermes/integrationtests/KafkaRetransmissionServiceTest.java @@ -91,13 +91,13 @@ public void shouldMoveOffsetNearGivenTimestamp(boolean suspendedSubscription) topic.getQualifiedName(), subscription.getName(), retransmissionDate, false); // then + response.expectStatus().isOk(); // Check if Kafka committed offsets were moved on retransmission assertThat( hermes .api() .calculateCommittedMessages(topic.getQualifiedName(), subscription.getName())) .isLessThan(commitedMessages); - response.expectStatus().isOk(); if (suspendedSubscription) { hermes.api().activateSubscription(topic, subscription.getName());