diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumersClassic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumersClassic.java index b128a1a371115..6ab7acfa56da8 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumersClassic.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumersClassic.java @@ -627,8 +627,6 @@ public SubType getType() { return SubType.Shared; } - protected synchronized void finishedRewindAfterInProgressReading(){} - @Override public final synchronized void readEntriesComplete(List entries, Object ctx) { ReadType readType = (ReadType) ctx; @@ -654,7 +652,6 @@ public final synchronized void readEntriesComplete(List entries, Object c entries.forEach(Entry::release); cursor.rewind(); shouldRewindBeforeReadingOrReplaying = false; - finishedRewindAfterInProgressReading(); readMoreEntries(); return; } @@ -938,7 +935,6 @@ public synchronized void readEntriesFailed(ManagedLedgerException exception, Obj if (shouldRewindBeforeReadingOrReplaying) { shouldRewindBeforeReadingOrReplaying = false; cursor.rewind(); - finishedRewindAfterInProgressReading(); } if (readType == ReadType.Normal) { diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumersClassic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumersClassic.java index 8bc0dfd16126e..6d4c5866a9eff 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumersClassic.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumersClassic.java @@ -122,11 +122,6 @@ public class PersistentStickyKeyDispatcherMultipleConsumersClassic } } - @Override - protected synchronized void finishedRewindAfterInProgressReading() { - recentlyJoinedConsumers.clear(); - } - @VisibleForTesting public StickyKeyConsumerSelector getSelector() { return selector; @@ -159,7 +154,8 @@ public synchronized CompletableFuture addConsumer(Consumer consumer) { if (!allowOutOfOrderDelivery && recentlyJoinedConsumers != null && consumerList.size() > 1 - && cursor.getNumberOfEntriesSinceFirstNotAckedMessage() > 1) { + && cursor.getNumberOfEntriesSinceFirstNotAckedMessage() > 1 + && !shouldRewindBeforeReadingOrReplaying) { recentlyJoinedConsumers.put(consumer, readPositionWhenJoining); sortRecentlyJoinedConsumersIfNeeded(); } @@ -174,6 +170,7 @@ private void sortRecentlyJoinedConsumersIfNeeded() { if (recentlyJoinedConsumers.size() == 1) { return; } + // Since we check the order of queue after each consumer joined, we can only check the last two items. boolean sortNeeded = false; Position posPre = null; Position posAfter = null; @@ -181,16 +178,16 @@ private void sortRecentlyJoinedConsumersIfNeeded() { if (posPre == null) { posPre = entry.getValue(); } else { + posPre = posAfter; posAfter = entry.getValue(); } - if (posPre != null && posAfter != null) { - if (posPre.compareTo(posAfter) > 0) { - sortNeeded = true; - break; - } - posPre = posAfter; + } + if (posPre != null && posAfter != null) { + if (posPre.compareTo(posAfter) > 0) { + sortNeeded = true; } } + // Something went wrongly, sort the collection. if (sortNeeded) { log.error("[{}] [{}] The items in recentlyJoinedConsumers are out-of-order. {}", topic.getName(), name, recentlyJoinedConsumers.entrySet().stream().map(entry ->