From 88159777847e14ebed1caab8c35a4a8a61e004a6 Mon Sep 17 00:00:00 2001 From: congbo Date: Tue, 21 Dec 2021 12:59:09 +0800 Subject: [PATCH] [2.9.2] Cherry pick pr #12636 --- .../buffer/impl/TopicTransactionBuffer.java | 14 ++++- .../broker/transaction/TransactionTest.java | 55 ++++++++++++++++++- 2 files changed, 65 insertions(+), 4 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/TopicTransactionBuffer.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/TopicTransactionBuffer.java index 84b209c6cf3a4..9978f6f8ec20a 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/TopicTransactionBuffer.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/TopicTransactionBuffer.java @@ -582,8 +582,8 @@ public void run() { FillEntryQueueCallback fillEntryQueueCallback = new FillEntryQueueCallback(entryQueue, managedCursor, TopicTransactionBufferRecover.this); if (lastConfirmedEntry.getEntryId() != -1) { - while (lastConfirmedEntry.compareTo(currentLoadPosition) > 0) { - fillEntryQueueCallback.fillQueue(); + while (lastConfirmedEntry.compareTo(currentLoadPosition) > 0 + && fillEntryQueueCallback.fillQueue()) { Entry entry = entryQueue.poll(); if (entry != null) { try { @@ -641,19 +641,22 @@ static class FillEntryQueueCallback implements AsyncCallbacks.ReadEntriesCallbac private final TopicTransactionBufferRecover recover; + private volatile boolean isReadable = true; + private FillEntryQueueCallback(SpscArrayQueue entryQueue, ManagedCursor cursor, TopicTransactionBufferRecover recover) { this.entryQueue = entryQueue; this.cursor = cursor; this.recover = recover; } - void fillQueue() { + boolean fillQueue() { if (entryQueue.size() < entryQueue.capacity() && outstandingReadsRequests.get() == 0) { if (cursor.hasMoreEntries()) { outstandingReadsRequests.incrementAndGet(); cursor.asyncReadEntries(100, this, System.nanoTime(), PositionImpl.latest); } } + return isReadable; } @Override @@ -673,6 +676,11 @@ public Entry get() { @Override public void readEntriesFailed(ManagedLedgerException exception, Object ctx) { + if (recover.topic.getManagedLedger().getConfig().isAutoSkipNonRecoverableData() + && exception instanceof ManagedLedgerException.NonRecoverableLedgerException + || exception instanceof ManagedLedgerException.ManagedLedgerFencedException) { + isReadable = false; + } recover.callBackException(exception); outstandingReadsRequests.decrementAndGet(); } diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionTest.java index e4975d97cea0a..0b2fde55319f8 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionTest.java @@ -49,6 +49,7 @@ import org.apache.bookkeeper.mledger.ManagedCursor; import org.apache.bookkeeper.mledger.ManagedLedgerException; import org.apache.bookkeeper.mledger.ManagedLedgerFactory; +import org.apache.bookkeeper.mledger.impl.ManagedCursorContainer; import org.apache.bookkeeper.mledger.impl.ManagedLedgerFactoryImpl; import org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl; import org.apache.bookkeeper.mledger.impl.PositionImpl; @@ -56,6 +57,7 @@ import org.apache.pulsar.broker.service.Topic; import org.apache.pulsar.broker.service.persistent.PersistentSubscription; import org.apache.pulsar.broker.service.persistent.PersistentTopic; +import org.apache.pulsar.broker.transaction.buffer.TransactionBuffer; import org.apache.pulsar.broker.transaction.buffer.impl.TopicTransactionBuffer; import org.apache.pulsar.broker.transaction.buffer.impl.TopicTransactionBufferState; import org.apache.pulsar.broker.transaction.pendingack.PendingAckStore; @@ -502,9 +504,60 @@ public void testMaxReadPositionForNormalPublish() throws Exception { } @Test - public void testEndTCRecoveringWhenManagerLedgerDisReadable() throws Exception{ + public void testEndTBRecoveringWhenManagerLedgerDisReadable() throws Exception{ String topic = NAMESPACE1 + "/testEndTBRecoveringWhenManagerLedgerDisReadable"; admin.topics().createNonPartitionedTopic(topic); + @Cleanup + Producer producer = pulsarClient.newProducer(Schema.STRING) + .producerName("test") + .enableBatching(false) + .sendTimeout(0, TimeUnit.SECONDS) + .topic(topic) + .create(); + Transaction txn = pulsarClient.newTransaction() + .withTransactionTimeout(10, TimeUnit.SECONDS).build().get(); + + producer.newMessage(txn).value("test").send(); + + PersistentTopic persistentTopic = (PersistentTopic) getPulsarServiceList().get(0).getBrokerService() + .getTopic("persistent://" + topic, false).get().get(); + persistentTopic.getManagedLedger().getConfig().setAutoSkipNonRecoverableData(true); + + ManagedCursor managedCursor = mock(ManagedCursor.class); + doReturn("transaction-buffer-sub").when(managedCursor).getName(); + doReturn(true).when(managedCursor).hasMoreEntries(); + doAnswer(invocation -> { + AsyncCallbacks.ReadEntriesCallback callback = invocation.getArgument(1); + callback.readEntriesFailed(new ManagedLedgerException.NonRecoverableLedgerException("No ledger exist"), + null); + return null; + }).when(managedCursor).asyncReadEntries(anyInt(), any(), any(), any()); + Class managedLedgerClass = ManagedLedgerImpl.class; + Field field = managedLedgerClass.getDeclaredField("cursors"); + field.setAccessible(true); + ManagedCursorContainer managedCursors = (ManagedCursorContainer) field.get(persistentTopic.getManagedLedger()); + managedCursors.removeCursor("transaction-buffer-sub"); + managedCursors.add(managedCursor); + + TransactionBuffer buffer1 = new TopicTransactionBuffer(persistentTopic); + Awaitility.await().atMost(30, TimeUnit.SECONDS).untilAsserted(() -> + assertEquals(buffer1.getStats().state, "Ready")); + + doAnswer(invocation -> { + AsyncCallbacks.ReadEntriesCallback callback = invocation.getArgument(1); + callback.readEntriesFailed(new ManagedLedgerException.ManagedLedgerFencedException(), null); + return null; + }).when(managedCursor).asyncReadEntries(anyInt(), any(), any(), any()); + + TransactionBuffer buffer2 = new TopicTransactionBuffer(persistentTopic); + Awaitility.await().atMost(30, TimeUnit.SECONDS).untilAsserted(() -> + assertEquals(buffer2.getStats().state, "Ready")); + } + + @Test + public void testEndTCRecoveringWhenManagerLedgerDisReadable() throws Exception{ + String topic = NAMESPACE1 + "/testEndTCRecoveringWhenManagerLedgerDisReadable"; + admin.topics().createNonPartitionedTopic(topic); PersistentTopic persistentTopic = (PersistentTopic) getPulsarServiceList().get(0).getBrokerService() .getTopic(topic, false).get().get();