Skip to content

Commit

Permalink
[Branch-2.9] [Transaction] Stop TB recovering with exception (#13425)
Browse files Browse the repository at this point in the history
Cherry-pick from #12636
  • Loading branch information
congbobo184 authored Dec 21, 2021
1 parent f6be11a commit 44dbe70
Show file tree
Hide file tree
Showing 2 changed files with 65 additions and 4 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -582,8 +582,8 @@ public void run() {
FillEntryQueueCallback fillEntryQueueCallback = new FillEntryQueueCallback(entryQueue, managedCursor,
TopicTransactionBufferRecover.this);
if (lastConfirmedEntry.getEntryId() != -1) {
while (lastConfirmedEntry.compareTo(currentLoadPosition) > 0) {
fillEntryQueueCallback.fillQueue();
while (lastConfirmedEntry.compareTo(currentLoadPosition) > 0
&& fillEntryQueueCallback.fillQueue()) {
Entry entry = entryQueue.poll();
if (entry != null) {
try {
Expand Down Expand Up @@ -641,19 +641,22 @@ static class FillEntryQueueCallback implements AsyncCallbacks.ReadEntriesCallbac

private final TopicTransactionBufferRecover recover;

private volatile boolean isReadable = true;

private FillEntryQueueCallback(SpscArrayQueue<Entry> entryQueue, ManagedCursor cursor,
TopicTransactionBufferRecover recover) {
this.entryQueue = entryQueue;
this.cursor = cursor;
this.recover = recover;
}
void fillQueue() {
boolean fillQueue() {
if (entryQueue.size() < entryQueue.capacity() && outstandingReadsRequests.get() == 0) {
if (cursor.hasMoreEntries()) {
outstandingReadsRequests.incrementAndGet();
cursor.asyncReadEntries(100, this, System.nanoTime(), PositionImpl.latest);
}
}
return isReadable;
}

@Override
Expand All @@ -673,6 +676,11 @@ public Entry get() {

@Override
public void readEntriesFailed(ManagedLedgerException exception, Object ctx) {
if (recover.topic.getManagedLedger().getConfig().isAutoSkipNonRecoverableData()
&& exception instanceof ManagedLedgerException.NonRecoverableLedgerException
|| exception instanceof ManagedLedgerException.ManagedLedgerFencedException) {
isReadable = false;
}
recover.callBackException(exception);
outstandingReadsRequests.decrementAndGet();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@
import org.apache.bookkeeper.mledger.ManagedCursor;
import org.apache.bookkeeper.mledger.ManagedLedgerException;
import org.apache.bookkeeper.mledger.ManagedLedgerFactory;
import org.apache.bookkeeper.mledger.impl.ManagedCursorContainer;
import org.apache.bookkeeper.mledger.impl.ManagedCursorImpl;
import org.apache.bookkeeper.mledger.impl.ManagedLedgerFactoryImpl;
import org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl;
Expand All @@ -57,6 +58,7 @@
import org.apache.pulsar.broker.service.Topic;
import org.apache.pulsar.broker.service.persistent.PersistentSubscription;
import org.apache.pulsar.broker.service.persistent.PersistentTopic;
import org.apache.pulsar.broker.transaction.buffer.TransactionBuffer;
import org.apache.pulsar.broker.transaction.buffer.impl.TopicTransactionBuffer;
import org.apache.pulsar.broker.transaction.buffer.impl.TopicTransactionBufferState;
import org.apache.pulsar.broker.transaction.pendingack.PendingAckStore;
Expand Down Expand Up @@ -363,6 +365,7 @@ public void testTakeSnapshotBeforeBuildTxnProducer() throws Exception {
});
}


@Test
public void testAppendBufferWithNotManageLedgerExceptionCanCastToMLE()
throws Exception {
Expand Down Expand Up @@ -503,7 +506,57 @@ public void testMaxReadPositionForNormalPublish() throws Exception {
PositionImpl position5 = (PositionImpl) maxReadPositionField.get(topicTransactionBuffer);
Assert.assertEquals(position5.getLedgerId(), messageId4.getLedgerId());
Assert.assertEquals(position5.getEntryId(), messageId4.getEntryId());
}

@Test
public void testEndTBRecoveringWhenManagerLedgerDisReadable() throws Exception{
String topic = NAMESPACE1 + "/testEndTBRecoveringWhenManagerLedgerDisReadable";
admin.topics().createNonPartitionedTopic(topic);
@Cleanup
Producer<String> producer = pulsarClient.newProducer(Schema.STRING)
.producerName("test")
.enableBatching(false)
.sendTimeout(0, TimeUnit.SECONDS)
.topic(topic)
.create();
Transaction txn = pulsarClient.newTransaction()
.withTransactionTimeout(10, TimeUnit.SECONDS).build().get();

producer.newMessage(txn).value("test").send();

PersistentTopic persistentTopic = (PersistentTopic) getPulsarServiceList().get(0).getBrokerService()
.getTopic("persistent://" + topic, false).get().get();
persistentTopic.getManagedLedger().getConfig().setAutoSkipNonRecoverableData(true);

ManagedCursor managedCursor = mock(ManagedCursor.class);
doReturn("transaction-buffer-sub").when(managedCursor).getName();
doReturn(true).when(managedCursor).hasMoreEntries();
doAnswer(invocation -> {
AsyncCallbacks.ReadEntriesCallback callback = invocation.getArgument(1);
callback.readEntriesFailed(new ManagedLedgerException.NonRecoverableLedgerException("No ledger exist"),
null);
return null;
}).when(managedCursor).asyncReadEntries(anyInt(), any(), any(), any());
Class<ManagedLedgerImpl> managedLedgerClass = ManagedLedgerImpl.class;
Field field = managedLedgerClass.getDeclaredField("cursors");
field.setAccessible(true);
ManagedCursorContainer managedCursors = (ManagedCursorContainer) field.get(persistentTopic.getManagedLedger());
managedCursors.removeCursor("transaction-buffer-sub");
managedCursors.add(managedCursor);

TransactionBuffer buffer1 = new TopicTransactionBuffer(persistentTopic);
Awaitility.await().atMost(30, TimeUnit.SECONDS).untilAsserted(() ->
assertEquals(buffer1.getStats().state, "Ready"));

doAnswer(invocation -> {
AsyncCallbacks.ReadEntriesCallback callback = invocation.getArgument(1);
callback.readEntriesFailed(new ManagedLedgerException.ManagedLedgerFencedException(), null);
return null;
}).when(managedCursor).asyncReadEntries(anyInt(), any(), any(), any());

TransactionBuffer buffer2 = new TopicTransactionBuffer(persistentTopic);
Awaitility.await().atMost(30, TimeUnit.SECONDS).untilAsserted(() ->
assertEquals(buffer2.getStats().state, "Ready"));
}

@Test
Expand Down Expand Up @@ -565,7 +618,7 @@ public void testEndTPRecoveringWhenManagerLedgerDisReadable() throws Exception{

@Test
public void testEndTCRecoveringWhenManagerLedgerDisReadable() throws Exception{
String topic = NAMESPACE1 + "/testEndTBRecoveringWhenManagerLedgerDisReadable";
String topic = NAMESPACE1 + "/testEndTCRecoveringWhenManagerLedgerDisReadable";
admin.topics().createNonPartitionedTopic(topic);

PersistentTopic persistentTopic = (PersistentTopic) getPulsarServiceList().get(0).getBrokerService()
Expand Down

0 comments on commit 44dbe70

Please sign in to comment.