diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/TopicTransactionBuffer.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/TopicTransactionBuffer.java index fa46e65724d28..ac081f1ebe46a 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/TopicTransactionBuffer.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/TopicTransactionBuffer.java @@ -580,8 +580,8 @@ public void run() { FillEntryQueueCallback fillEntryQueueCallback = new FillEntryQueueCallback(entryQueue, managedCursor, TopicTransactionBufferRecover.this); if (lastConfirmedEntry.getEntryId() != -1) { - while (lastConfirmedEntry.compareTo(currentLoadPosition) > 0) { - fillEntryQueueCallback.fillQueue(); + while (lastConfirmedEntry.compareTo(currentLoadPosition) > 0 + && fillEntryQueueCallback.fillQueue()) { Entry entry = entryQueue.poll(); if (entry != null) { try { @@ -639,19 +639,22 @@ static class FillEntryQueueCallback implements AsyncCallbacks.ReadEntriesCallbac private final TopicTransactionBufferRecover recover; + private volatile boolean isReadable = true; + private FillEntryQueueCallback(SpscArrayQueue entryQueue, ManagedCursor cursor, TopicTransactionBufferRecover recover) { this.entryQueue = entryQueue; this.cursor = cursor; this.recover = recover; } - void fillQueue() { + boolean fillQueue() { if (entryQueue.size() < entryQueue.capacity() && outstandingReadsRequests.get() == 0) { if (cursor.hasMoreEntries()) { outstandingReadsRequests.incrementAndGet(); cursor.asyncReadEntries(100, this, System.nanoTime(), PositionImpl.latest); } } + return isReadable; } @Override @@ -671,6 +674,11 @@ public Entry get() { @Override public void readEntriesFailed(ManagedLedgerException exception, Object ctx) { + if (recover.topic.getManagedLedger().getConfig().isAutoSkipNonRecoverableData() + && exception instanceof ManagedLedgerException.NonRecoverableLedgerException + || exception instanceof ManagedLedgerException.ManagedLedgerFencedException) { + isReadable = false; + } recover.callBackException(exception); outstandingReadsRequests.decrementAndGet(); } diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionTest.java index e852a4fd1e1f8..bc8bef96be1ba 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionTest.java @@ -20,6 +20,12 @@ import static java.nio.charset.StandardCharsets.UTF_8; import static org.apache.pulsar.transaction.coordinator.impl.MLTransactionLogImpl.TRANSACTION_LOG_PREFIX; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.anyInt; +import static org.mockito.Mockito.doAnswer; +import static org.mockito.Mockito.doReturn; +import static org.mockito.Mockito.mock; +import static org.testng.Assert.assertEquals; import com.google.common.collect.Sets; import io.netty.buffer.Unpooled; import java.lang.reflect.Field; @@ -30,8 +36,11 @@ import java.util.concurrent.TimeUnit; import lombok.Cleanup; import lombok.extern.slf4j.Slf4j; +import org.apache.bookkeeper.mledger.AsyncCallbacks; +import org.apache.bookkeeper.mledger.ManagedCursor; import org.apache.bookkeeper.mledger.ManagedLedgerException; import org.apache.bookkeeper.mledger.ManagedLedgerFactory; +import org.apache.bookkeeper.mledger.impl.ManagedCursorContainer; import org.apache.bookkeeper.mledger.impl.ManagedLedgerFactoryImpl; import org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl; import org.apache.bookkeeper.mledger.impl.PositionImpl; @@ -39,6 +48,9 @@ import org.apache.pulsar.broker.service.Topic; import org.apache.pulsar.broker.service.persistent.PersistentSubscription; import org.apache.pulsar.broker.service.persistent.PersistentTopic; +import org.apache.pulsar.broker.transaction.buffer.TransactionBuffer; +import org.apache.pulsar.broker.transaction.buffer.impl.TopicTransactionBuffer; +import org.apache.pulsar.broker.transaction.buffer.impl.TopicTransactionBufferState; import org.apache.pulsar.broker.transaction.buffer.impl.TopicTransactionBuffer; import org.apache.pulsar.broker.transaction.buffer.impl.TopicTransactionBufferState; import org.apache.pulsar.broker.transaction.pendingack.PendingAckStore; @@ -423,4 +435,51 @@ public void testMaxReadPositionForNormalPublish() throws Exception{ Assert.assertEquals(position5.getEntryId(), messageId4.getEntryId()); } + + @Test + public void testEndTBRecoveringWhenManagerLedgerDisReadable() throws Exception{ + String topic = NAMESPACE1 + "/testEndTBRecoveringWhenManagerLedgerDisReadable"; + admin.topics().createNonPartitionedTopic(topic); + @Cleanup + Producer producer = pulsarClient.newProducer(Schema.STRING) + .producerName("test") + .enableBatching(false) + .sendTimeout(0, TimeUnit.SECONDS) + .topic(topic) + .create(); + producer.newMessage().send(); + PersistentTopic persistentTopic = (PersistentTopic) getPulsarServiceList().get(0).getBrokerService() + .getTopic(topic, false).get().get(); + persistentTopic.getManagedLedger().getConfig().setAutoSkipNonRecoverableData(true); + + ManagedCursor managedCursor = mock(ManagedCursor.class); + doReturn("transaction-buffer-sub").when(managedCursor).getName(); + doReturn(true).when(managedCursor).hasMoreEntries(); + doAnswer(invocation -> { + AsyncCallbacks.ReadEntriesCallback callback = invocation.getArgument(1); + callback.readEntriesFailed(new ManagedLedgerException.NonRecoverableLedgerException("No ledger exist"), + null); + return null; + }).when(managedCursor).asyncReadEntries(anyInt(), any(), any(), any()); + Class managedLedgerClass = ManagedLedgerImpl.class; + Field field = managedLedgerClass.getDeclaredField("cursors"); + field.setAccessible(true); + ManagedCursorContainer managedCursors = (ManagedCursorContainer) field.get(persistentTopic.getManagedLedger()); + managedCursors.removeCursor("transaction-buffer-sub"); + managedCursors.add(managedCursor); + + TransactionBuffer buffer1 = new TopicTransactionBuffer(persistentTopic); + Awaitility.await().atMost(30, TimeUnit.SECONDS).untilAsserted(() -> + assertEquals(buffer1.getStats().state, "Ready")); + + doAnswer(invocation -> { + AsyncCallbacks.ReadEntriesCallback callback = invocation.getArgument(1); + callback.readEntriesFailed(new ManagedLedgerException.ManagedLedgerFencedException(), null); + return null; + }).when(managedCursor).asyncReadEntries(anyInt(), any(), any(), any()); + + TransactionBuffer buffer2 = new TopicTransactionBuffer(persistentTopic); + Awaitility.await().atMost(30, TimeUnit.SECONDS).untilAsserted(() -> + assertEquals(buffer2.getStats().state, "Ready")); + } } \ No newline at end of file