From b71cc53e87ef66f25e689595bd2624fe5e2f5a3e Mon Sep 17 00:00:00 2001 From: fengyubiao Date: Thu, 2 Jan 2025 18:33:52 +0800 Subject: [PATCH 1/6] [fix] [broker] Fix items in dispatcher.recentlyJoinedConsumers are out-of-order, which may cause a delivery stuck --- ...entDispatcherMultipleConsumersClassic.java | 4 + ...KeyDispatcherMultipleConsumersClassic.java | 23 +- ...ntryCacheKeySharedSubscriptionV30Test.java | 290 ++++++++++++++++++ 3 files changed, 316 insertions(+), 1 deletion(-) create mode 100644 pulsar-broker/src/test/java/org/apache/bookkeeper/mledger/impl/NonEntryCacheKeySharedSubscriptionV30Test.java diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumersClassic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumersClassic.java index 6ab7acfa56da8..9eb3f45e353ca 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumersClassic.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumersClassic.java @@ -627,6 +627,8 @@ public SubType getType() { return SubType.Shared; } + protected synchronized void afterRewindAfterPendingRead(){} + @Override public final synchronized void readEntriesComplete(List entries, Object ctx) { ReadType readType = (ReadType) ctx; @@ -652,6 +654,7 @@ public final synchronized void readEntriesComplete(List entries, Object c entries.forEach(Entry::release); cursor.rewind(); shouldRewindBeforeReadingOrReplaying = false; + afterRewindAfterPendingRead(); readMoreEntries(); return; } @@ -935,6 +938,7 @@ public synchronized void readEntriesFailed(ManagedLedgerException exception, Obj if (shouldRewindBeforeReadingOrReplaying) { shouldRewindBeforeReadingOrReplaying = false; cursor.rewind(); + afterRewindAfterPendingRead(); } if (readType == ReadType.Normal) { diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumersClassic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumersClassic.java index 71f37c5939d6a..4d4c9f69459bf 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumersClassic.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumersClassic.java @@ -34,6 +34,7 @@ import java.util.concurrent.CompletableFuture; import java.util.concurrent.atomic.AtomicInteger; import java.util.stream.Collectors; +import lombok.Setter; import org.apache.bookkeeper.mledger.Entry; import org.apache.bookkeeper.mledger.ManagedCursor; import org.apache.bookkeeper.mledger.Position; @@ -82,6 +83,15 @@ public class PersistentStickyKeyDispatcherMultipleConsumersClassic */ private final LinkedHashMap recentlyJoinedConsumers; + /** + * The method {@link #sortRecentlyJoinedConsumersIfNeeded} is a un-normal method, which used to fix the issue that + * was described at https://github.com/apache/pulsar/pull/23795. + * To cover the case that does not contain the hot fix that https://github.com/apache/pulsar/pull/23795 provided, + * we add this method to reproduce the issue in tests. + **/ + @Setter + public boolean sortRecentlyJoinedConsumersIfNeeded = true; + PersistentStickyKeyDispatcherMultipleConsumersClassic(PersistentTopic topic, ManagedCursor cursor, Subscription subscription, ServiceConfiguration conf, KeySharedMeta ksm) { @@ -112,6 +122,11 @@ public class PersistentStickyKeyDispatcherMultipleConsumersClassic } } + @Override + protected synchronized void afterRewindAfterPendingRead() { + recentlyJoinedConsumers.clear(); + } + @VisibleForTesting public StickyKeyConsumerSelector getSelector() { return selector; @@ -153,6 +168,9 @@ public synchronized CompletableFuture addConsumer(Consumer consumer) { } private void sortRecentlyJoinedConsumersIfNeeded() { + if (!sortRecentlyJoinedConsumersIfNeeded) { + return; + } if (recentlyJoinedConsumers.size() == 1) { return; } @@ -173,8 +191,11 @@ private void sortRecentlyJoinedConsumersIfNeeded() { posPre = posAfter; } } - if (sortNeeded) { + log.error("[{}] [{}] The items in recentlyJoinedConsumers are out-of-order. {}", + topic.getName(), name, recentlyJoinedConsumers.entrySet().stream().map(entry -> + String.format("%s-%s:%s", entry.getKey().consumerName(), entry.getValue().getLedgerId(), + entry.getValue().getEntryId())).collect(Collectors.toList())); List> sortedList = new ArrayList<>(recentlyJoinedConsumers.entrySet()); Collections.sort(sortedList, Map.Entry.comparingByValue()); recentlyJoinedConsumers.clear(); diff --git a/pulsar-broker/src/test/java/org/apache/bookkeeper/mledger/impl/NonEntryCacheKeySharedSubscriptionV30Test.java b/pulsar-broker/src/test/java/org/apache/bookkeeper/mledger/impl/NonEntryCacheKeySharedSubscriptionV30Test.java new file mode 100644 index 0000000000000..d106d7b1b2e73 --- /dev/null +++ b/pulsar-broker/src/test/java/org/apache/bookkeeper/mledger/impl/NonEntryCacheKeySharedSubscriptionV30Test.java @@ -0,0 +1,290 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.bookkeeper.mledger.impl; + +import static org.mockito.ArgumentMatchers.anyLong; +import static org.mockito.Mockito.doAnswer; +import static org.mockito.Mockito.spy; +import static org.testng.Assert.assertTrue; +import io.netty.util.concurrent.DefaultThreadFactory; +import java.util.ArrayList; +import java.util.LinkedHashMap; +import java.util.List; +import java.util.Map; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; +import lombok.Cleanup; +import lombok.extern.slf4j.Slf4j; +import org.apache.bookkeeper.client.LedgerHandle; +import org.apache.bookkeeper.client.api.LedgerEntries; +import org.apache.bookkeeper.mledger.Position; +import org.apache.pulsar.broker.BrokerTestUtil; +import org.apache.pulsar.broker.service.persistent.PersistentStickyKeyDispatcherMultipleConsumers; +import org.apache.pulsar.broker.service.persistent.PersistentStickyKeyDispatcherMultipleConsumersClassic; +import org.apache.pulsar.broker.service.persistent.PersistentSubscription; +import org.apache.pulsar.broker.service.persistent.PersistentTopic; +import org.apache.pulsar.client.api.Consumer; +import org.apache.pulsar.client.api.MessageId; +import org.apache.pulsar.client.api.Producer; +import org.apache.pulsar.client.api.ProducerConsumerBase; +import org.apache.pulsar.client.api.Schema; +import org.apache.pulsar.client.api.SubscriptionType; +import org.awaitility.Awaitility; +import org.mockito.stubbing.Answer; +import org.testng.Assert; +import org.testng.annotations.AfterClass; +import org.testng.annotations.BeforeClass; +import org.testng.annotations.Test; + +@Slf4j +@Test(groups = "broker-api") +public class NonEntryCacheKeySharedSubscriptionV30Test extends ProducerConsumerBase { + + @BeforeClass(alwaysRun = true) + @Override + protected void setup() throws Exception { + super.internalSetup(); + super.producerBaseSetup(); + } + + @AfterClass(alwaysRun = true) + @Override + protected void cleanup() throws Exception { + super.internalCleanup(); + } + + protected void doInitConf() throws Exception { + super.doInitConf(); + this.conf.setManagedLedgerCacheSizeMB(0); + this.conf.setManagedLedgerMaxEntriesPerLedger(50000); + // Use the implementation of subscriptions in v3.x. + this.conf.setSubscriptionKeySharedUseClassicPersistentImplementation(true); + } + + + @Test(timeOut = 180 * 1000, invocationCount = 1) + public void testRecentJoinQueueIsInOrderAfterRewind() throws Exception { + int msgCount = 300; + final String topic = BrokerTestUtil.newUniqueName("persistent://public/default/tp"); + final String subName = "my-sub"; + final DefaultThreadFactory threadFactory = + new DefaultThreadFactory(BrokerTestUtil.newUniqueName("thread")); + admin.topics().createNonPartitionedTopic(topic); + admin.topics().createSubscription(topic, subName, MessageId.earliest); + + // Send messages. + @Cleanup + Producer producer = pulsarClient.newProducer(Schema.INT32).topic(topic).enableBatching(false).create(); + AtomicInteger msgGenerator = new AtomicInteger(); + for (int i = 0; i < msgCount; i++) { + int v = msgGenerator.getAndIncrement(); + producer.newMessage().key(String.valueOf(v)).value(v).send(); + } + + // Make ack holes. + Consumer consumer1 = pulsarClient.newConsumer(Schema.INT32) + .topic(topic) + .subscriptionName(subName) + .receiverQueueSize(100) + .subscriptionType(SubscriptionType.Key_Shared) + .consumerName("c1") + .subscribe(); + Consumer consumer2 = pulsarClient.newConsumer(Schema.INT32) + .topic(topic) + .subscriptionName(subName) + .receiverQueueSize(100) + .subscriptionType(SubscriptionType.Key_Shared) + .consumerName("c2") + .subscribe(); + Consumer consumer3 = pulsarClient.newConsumer(Schema.INT32) + .topic(topic) + .subscriptionName(subName) + .receiverQueueSize(100) + .subscriptionType(SubscriptionType.Key_Shared) + .consumerName("c3") + .subscribe(); + final PersistentTopic persistentTopic = + (PersistentTopic) pulsar.getBrokerService().getTopic(topic, false).join().get(); + final ManagedLedgerImpl ml = (ManagedLedgerImpl) persistentTopic.getManagedLedger(); + final PersistentSubscription persistentSubscription = persistentTopic.getSubscription(subName); + final PersistentStickyKeyDispatcherMultipleConsumersClassic dispatcher = + (PersistentStickyKeyDispatcherMultipleConsumersClassic) persistentSubscription.getDispatcher(); + final ManagedCursorImpl cursor = (ManagedCursorImpl) ml.getCursors().get(subName); + dispatcher.setSortRecentlyJoinedConsumersIfNeeded(false); + + // Make ack holes. + // - ack all messages that consumer1 or consumer2 received. + // - do not ack messages that consumer2 received. + ackAllMessages(consumer1, consumer2); + Position mdPosition = (Position) cursor.getMarkDeletedPosition(); + Position readPosition = (Position) cursor.getReadPosition(); + Position LAC = (Position) ml.getLastConfirmedEntry(); + assertTrue(readPosition.compareTo(LAC) >= 0); + Position firstWaitingAckPos = ml.getNextValidPosition(mdPosition); + log.info("md-pos {}:{}", mdPosition.getLedgerId(), mdPosition.getEntryId()); + log.info("rd-pos {}:{}", readPosition.getLedgerId(), readPosition.getEntryId()); + log.info("lac-pos {}:{}", LAC.getLedgerId(), LAC.getEntryId()); + log.info("first-waiting-ack-pos {}:{}", firstWaitingAckPos.getLedgerId(), firstWaitingAckPos.getEntryId()); + + // Inject a delay for the next replay read. + LedgerHandle firstLedger = ml.currentLedger; + Assert.assertEquals(firstWaitingAckPos.getLedgerId(), firstLedger.getId()); + LedgerHandle spyFirstLedger = spy(firstLedger); + CountDownLatch replyReadSignal = new CountDownLatch(1); + AtomicBoolean replayReadWasTriggered = new AtomicBoolean(); + Answer answer = invocation -> { + long firstEntry = (long) invocation.getArguments()[0]; + if (firstEntry == firstWaitingAckPos.getEntryId()) { + replayReadWasTriggered.set(true); + final CompletableFuture res = new CompletableFuture<>(); + threadFactory.newThread(() -> { + try { + replyReadSignal.await(); + CompletableFuture future = + (CompletableFuture) invocation.callRealMethod(); + future.thenAccept(v -> { + res.complete(v); + }).exceptionally(ex -> { + res.completeExceptionally(ex); + return null; + }); + } catch (Throwable ex) { + res.completeExceptionally(ex); + } + }).start(); + return res; + } else { + return invocation.callRealMethod(); + } + }; + doAnswer(answer).when(spyFirstLedger).readAsync(anyLong(), anyLong()); + doAnswer(answer).when(spyFirstLedger).readUnconfirmedAsync(anyLong(), anyLong()); + ml.currentLedger = spyFirstLedger; + + // Keep publish to avoid pending normal read. + AtomicBoolean keepPublishing = new AtomicBoolean(true); + new Thread(() -> { + while (keepPublishing.get()) { + int v = msgGenerator.getAndIncrement(); + producer.newMessage().key(String.valueOf(v)).value(v).sendAsync(); + sleep(100); + } + }).start(); + + // Trigger a message redelivery. + consumer3.close(); + Awaitility.await().until(() -> replayReadWasTriggered.get()); + + // Close all consumers to trigger a cursor.rewind. + consumer1.close(); + consumer2.close(); + + // Start 100 consumers. + List>> consumerList = new ArrayList<>(); + for (int i = 0; i < 40; i++) { + consumerList.add(pulsarClient.newConsumer(Schema.INT32) + .topic(topic) + .subscriptionName(subName) + .subscriptionType(SubscriptionType.Key_Shared) + .subscribeAsync()); + if (i == 10) { + for (int j = 0; j < msgCount; j++) { + int v = msgGenerator.getAndIncrement(); + producer.newMessage().key(String.valueOf(v)).value(v).send(); + } + final Consumer firstConsumer = consumerList.get(0).join(); + ackAllMessages(firstConsumer); + new Thread(() -> { + while (keepPublishing.get()) { + try { + ackAllMessages(firstConsumer); + } catch (Exception e) { + throw new RuntimeException(e); + } + } + }).start(); + } + log.info("recent-joined-consumers {} {}", i, dispatcher.getRecentlyJoinedConsumers().size()); + if (dispatcher.getRecentlyJoinedConsumers().size() > 0) { + Position mdPosition2 = (Position) cursor.getMarkDeletedPosition(); + Position readPosition2 = (Position) cursor.getReadPosition(); + Position LAC2 = (Position) ml.getLastConfirmedEntry(); + assertTrue(readPosition.compareTo(LAC) >= 0); + Position firstWaitingAckPos2 = ml.getNextValidPosition(mdPosition); + if(readPosition2.compareTo(firstWaitingAckPos) > 0) { + keepPublishing.set(false); + log.info("consumer-index: {}", i); + log.info("md-pos-2 {}:{}", mdPosition2.getLedgerId(), mdPosition2.getEntryId()); + log.info("rd-pos-2 {}:{}", readPosition2.getLedgerId(), readPosition2.getEntryId()); + log.info("lac-pos-2 {}:{}", LAC2.getLedgerId(), LAC2.getEntryId()); + log.info("first-waiting-ack-pos-2 {}:{}", firstWaitingAckPos2.getLedgerId(), + firstWaitingAckPos2.getEntryId()); + // finish the replay read here. + replyReadSignal.countDown(); + } else { + sleep(1000); + } + } + } + consumerList.get(consumerList.size() - 1).join(); + + synchronized (dispatcher) { + LinkedHashMap recentJoinedConsumers = dispatcher.getRecentlyJoinedConsumers(); + assertTrue(verifyMapItemsAreInOrder(recentJoinedConsumers)); + } + + // cleanup. + producer.close(); + for (CompletableFuture> c : consumerList) { + c.join().close(); + } + admin.topics().delete(topic, false); + } + + private void sleep(int millis) { + try { + Thread.sleep(millis); + } catch (InterruptedException e) { + throw new RuntimeException(e); + } + } + + private boolean verifyMapItemsAreInOrder(LinkedHashMap map) { + boolean outOfOrder = false; + Position posPre = null; + Position posAfter = null; + for (Map.Entry entry : map.entrySet()) { + if (posPre == null) { + posPre = (Position) entry.getValue(); + } else { + posAfter = (Position) entry.getValue(); + } + if (posPre != null && posAfter != null) { + if (posPre.compareTo(posAfter) > 0) { + outOfOrder = true; + break; + } + posPre = posAfter; + } + } + return !outOfOrder; + } +} From d08b189a345df4f471cdad842d87ae2244f0fb0f Mon Sep 17 00:00:00 2001 From: fengyubiao Date: Thu, 2 Jan 2025 22:02:44 +0800 Subject: [PATCH 2/6] checkstyle --- .../mledger/impl/NonEntryCacheKeySharedSubscriptionV30Test.java | 1 - 1 file changed, 1 deletion(-) diff --git a/pulsar-broker/src/test/java/org/apache/bookkeeper/mledger/impl/NonEntryCacheKeySharedSubscriptionV30Test.java b/pulsar-broker/src/test/java/org/apache/bookkeeper/mledger/impl/NonEntryCacheKeySharedSubscriptionV30Test.java index d106d7b1b2e73..0de3d4edfad64 100644 --- a/pulsar-broker/src/test/java/org/apache/bookkeeper/mledger/impl/NonEntryCacheKeySharedSubscriptionV30Test.java +++ b/pulsar-broker/src/test/java/org/apache/bookkeeper/mledger/impl/NonEntryCacheKeySharedSubscriptionV30Test.java @@ -37,7 +37,6 @@ import org.apache.bookkeeper.client.api.LedgerEntries; import org.apache.bookkeeper.mledger.Position; import org.apache.pulsar.broker.BrokerTestUtil; -import org.apache.pulsar.broker.service.persistent.PersistentStickyKeyDispatcherMultipleConsumers; import org.apache.pulsar.broker.service.persistent.PersistentStickyKeyDispatcherMultipleConsumersClassic; import org.apache.pulsar.broker.service.persistent.PersistentSubscription; import org.apache.pulsar.broker.service.persistent.PersistentTopic; From 8d15073f6a1add8f1210e8141ede80a9d6b76d4d Mon Sep 17 00:00:00 2001 From: fengyubiao Date: Fri, 3 Jan 2025 09:57:19 +0800 Subject: [PATCH 3/6] address comment --- .../PersistentDispatcherMultipleConsumersClassic.java | 6 +++--- ...rsistentStickyKeyDispatcherMultipleConsumersClassic.java | 2 +- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumersClassic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumersClassic.java index 9eb3f45e353ca..6c464a707bc25 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumersClassic.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumersClassic.java @@ -627,7 +627,7 @@ public SubType getType() { return SubType.Shared; } - protected synchronized void afterRewindAfterPendingRead(){} + protected synchronized void finishedRewindAfterPendingRead(){} @Override public final synchronized void readEntriesComplete(List entries, Object ctx) { @@ -654,7 +654,7 @@ public final synchronized void readEntriesComplete(List entries, Object c entries.forEach(Entry::release); cursor.rewind(); shouldRewindBeforeReadingOrReplaying = false; - afterRewindAfterPendingRead(); + finishedRewindAfterPendingRead(); readMoreEntries(); return; } @@ -938,7 +938,7 @@ public synchronized void readEntriesFailed(ManagedLedgerException exception, Obj if (shouldRewindBeforeReadingOrReplaying) { shouldRewindBeforeReadingOrReplaying = false; cursor.rewind(); - afterRewindAfterPendingRead(); + finishedRewindAfterPendingRead(); } if (readType == ReadType.Normal) { diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumersClassic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumersClassic.java index 4d4c9f69459bf..fabef5d7c6977 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumersClassic.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumersClassic.java @@ -123,7 +123,7 @@ public class PersistentStickyKeyDispatcherMultipleConsumersClassic } @Override - protected synchronized void afterRewindAfterPendingRead() { + protected synchronized void finishedRewindAfterPendingRead() { recentlyJoinedConsumers.clear(); } From 3a568ea3c768ae72b357c4ee41a772836a7e8335 Mon Sep 17 00:00:00 2001 From: fengyubiao Date: Fri, 3 Jan 2025 09:59:18 +0800 Subject: [PATCH 4/6] address comment --- .../PersistentDispatcherMultipleConsumersClassic.java | 6 +++--- ...rsistentStickyKeyDispatcherMultipleConsumersClassic.java | 2 +- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumersClassic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumersClassic.java index 6c464a707bc25..b128a1a371115 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumersClassic.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumersClassic.java @@ -627,7 +627,7 @@ public SubType getType() { return SubType.Shared; } - protected synchronized void finishedRewindAfterPendingRead(){} + protected synchronized void finishedRewindAfterInProgressReading(){} @Override public final synchronized void readEntriesComplete(List entries, Object ctx) { @@ -654,7 +654,7 @@ public final synchronized void readEntriesComplete(List entries, Object c entries.forEach(Entry::release); cursor.rewind(); shouldRewindBeforeReadingOrReplaying = false; - finishedRewindAfterPendingRead(); + finishedRewindAfterInProgressReading(); readMoreEntries(); return; } @@ -938,7 +938,7 @@ public synchronized void readEntriesFailed(ManagedLedgerException exception, Obj if (shouldRewindBeforeReadingOrReplaying) { shouldRewindBeforeReadingOrReplaying = false; cursor.rewind(); - finishedRewindAfterPendingRead(); + finishedRewindAfterInProgressReading(); } if (readType == ReadType.Normal) { diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumersClassic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumersClassic.java index fabef5d7c6977..8bc0dfd16126e 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumersClassic.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumersClassic.java @@ -123,7 +123,7 @@ public class PersistentStickyKeyDispatcherMultipleConsumersClassic } @Override - protected synchronized void finishedRewindAfterPendingRead() { + protected synchronized void finishedRewindAfterInProgressReading() { recentlyJoinedConsumers.clear(); } From 303e56279e4f11ccb601662b0cbeb2808fe5d31a Mon Sep 17 00:00:00 2001 From: fengyubiao Date: Fri, 3 Jan 2025 11:25:06 +0800 Subject: [PATCH 5/6] improve logics --- ...entDispatcherMultipleConsumersClassic.java | 4 ---- ...KeyDispatcherMultipleConsumersClassic.java | 21 ++++++++----------- 2 files changed, 9 insertions(+), 16 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumersClassic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumersClassic.java index b128a1a371115..6ab7acfa56da8 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumersClassic.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumersClassic.java @@ -627,8 +627,6 @@ public SubType getType() { return SubType.Shared; } - protected synchronized void finishedRewindAfterInProgressReading(){} - @Override public final synchronized void readEntriesComplete(List entries, Object ctx) { ReadType readType = (ReadType) ctx; @@ -654,7 +652,6 @@ public final synchronized void readEntriesComplete(List entries, Object c entries.forEach(Entry::release); cursor.rewind(); shouldRewindBeforeReadingOrReplaying = false; - finishedRewindAfterInProgressReading(); readMoreEntries(); return; } @@ -938,7 +935,6 @@ public synchronized void readEntriesFailed(ManagedLedgerException exception, Obj if (shouldRewindBeforeReadingOrReplaying) { shouldRewindBeforeReadingOrReplaying = false; cursor.rewind(); - finishedRewindAfterInProgressReading(); } if (readType == ReadType.Normal) { diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumersClassic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumersClassic.java index 8bc0dfd16126e..6d4c5866a9eff 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumersClassic.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumersClassic.java @@ -122,11 +122,6 @@ public class PersistentStickyKeyDispatcherMultipleConsumersClassic } } - @Override - protected synchronized void finishedRewindAfterInProgressReading() { - recentlyJoinedConsumers.clear(); - } - @VisibleForTesting public StickyKeyConsumerSelector getSelector() { return selector; @@ -159,7 +154,8 @@ public synchronized CompletableFuture addConsumer(Consumer consumer) { if (!allowOutOfOrderDelivery && recentlyJoinedConsumers != null && consumerList.size() > 1 - && cursor.getNumberOfEntriesSinceFirstNotAckedMessage() > 1) { + && cursor.getNumberOfEntriesSinceFirstNotAckedMessage() > 1 + && !shouldRewindBeforeReadingOrReplaying) { recentlyJoinedConsumers.put(consumer, readPositionWhenJoining); sortRecentlyJoinedConsumersIfNeeded(); } @@ -174,6 +170,7 @@ private void sortRecentlyJoinedConsumersIfNeeded() { if (recentlyJoinedConsumers.size() == 1) { return; } + // Since we check the order of queue after each consumer joined, we can only check the last two items. boolean sortNeeded = false; Position posPre = null; Position posAfter = null; @@ -181,16 +178,16 @@ private void sortRecentlyJoinedConsumersIfNeeded() { if (posPre == null) { posPre = entry.getValue(); } else { + posPre = posAfter; posAfter = entry.getValue(); } - if (posPre != null && posAfter != null) { - if (posPre.compareTo(posAfter) > 0) { - sortNeeded = true; - break; - } - posPre = posAfter; + } + if (posPre != null && posAfter != null) { + if (posPre.compareTo(posAfter) > 0) { + sortNeeded = true; } } + // Something went wrongly, sort the collection. if (sortNeeded) { log.error("[{}] [{}] The items in recentlyJoinedConsumers are out-of-order. {}", topic.getName(), name, recentlyJoinedConsumers.entrySet().stream().map(entry -> From 82b9eea5bdfa12b2ca18b11303d5ea6b7adfb24b Mon Sep 17 00:00:00 2001 From: fengyubiao Date: Fri, 3 Jan 2025 11:48:03 +0800 Subject: [PATCH 6/6] add more code comments --- .../PersistentStickyKeyDispatcherMultipleConsumersClassic.java | 3 +++ 1 file changed, 3 insertions(+) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumersClassic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumersClassic.java index 6d4c5866a9eff..56161d8dd1544 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumersClassic.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumersClassic.java @@ -155,6 +155,9 @@ public synchronized CompletableFuture addConsumer(Consumer consumer) { && recentlyJoinedConsumers != null && consumerList.size() > 1 && cursor.getNumberOfEntriesSinceFirstNotAckedMessage() > 1 + // If there is a delayed "cursor.rewind" after the pending read, the consumers that will be + // added before the "cursor.rewind" will have a same "recent joined position", which is the + // same as "mark deleted position +1", so we can skip this adding. && !shouldRewindBeforeReadingOrReplaying) { recentlyJoinedConsumers.put(consumer, readPositionWhenJoining); sortRecentlyJoinedConsumersIfNeeded();