Skip to content

Commit

Permalink
[cherry-pick][branch-2.10] Close TransactionBuffer when create persis…
Browse files Browse the repository at this point in the history
…tent topic timeout (apache#19454)

Co-authored-by: Tao Jiuming <[email protected]>
(cherry picked from commit 4a1ac0a)
  • Loading branch information
liangyepianzhou authored and nicoloboschi committed Feb 28, 2023
1 parent 9ad4925 commit 8b003f9
Show file tree
Hide file tree
Showing 7 changed files with 50 additions and 11 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -207,12 +207,6 @@ public class ManagedCursorImpl implements ManagedCursor {
// active state cache in ManagedCursor. It should be in sync with the state in activeCursors in ManagedLedger.
private volatile boolean isActive = false;

// active state cache in ManagedCursor. It should be in sync with the state in activeCursors in ManagedLedger.
private volatile boolean isActive = false;

// active state cache in ManagedCursor. It should be in sync with the state in activeCursors in ManagedLedger.
private volatile boolean isActive = false;

class MarkDeleteEntry {
final PositionImpl newPosition;
final MarkDeleteCallback callback;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1551,8 +1551,15 @@ public void openLedgerComplete(ManagedLedger ledger, Object ctx) {
- topicCreateTimeMs;
pulsarStats.recordTopicLoadTimeValue(topic, topicLoadLatencyMs);
if (topicFuture.isCompletedExceptionally()) {
// Check create persistent topic timeout.
log.warn("{} future is already completed with failure {}, closing the"
+ " topic", topic, FutureUtil.getException(topicFuture));
persistentTopic.getTransactionBuffer()
.closeAsync()
.exceptionally(t -> {
log.error("[{}] Close transactionBuffer failed", topic, t);
return null;
});
persistentTopic.stopReplProducers()
.whenCompleteAsync((v, exception) -> {
topics.remove(topic, topicFuture);
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
/*
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@
import com.google.common.collect.Sets;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.UUID;
import lombok.Cleanup;
Expand Down Expand Up @@ -136,7 +135,8 @@ public void testIncrementPartitionsOfTopicWithSubscriptionProperties() throws Ex
assertEquals(admin.topics().getPartitionedTopicMetadata(partitionedTopicName).partitions, 20);

assertEquals(admin.topics().getSubscriptions(
TopicName.get(partitionedTopicName).getPartition(15).toString()), List.of("sub-1"));
TopicName.get(partitionedTopicName).getPartition(15).toString()),
Lists.newArrayList("sub-1"));
TopicStats stats = admin.topics()
.getStats(TopicName.get(partitionedTopicName).getPartition(15).toString());
Map<String, String> subscriptionProperties = stats.getSubscriptions()
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
/*
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl;
import org.apache.pulsar.broker.PulsarService;
import org.apache.pulsar.broker.service.BrokerService;
import org.apache.pulsar.broker.service.Topic;
import org.apache.pulsar.broker.service.nonpersistent.NonPersistentTopic;
import org.apache.pulsar.broker.service.persistent.PersistentTopic;
import org.apache.pulsar.broker.transaction.TransactionTestBase;
Expand All @@ -42,8 +43,11 @@
import org.testng.annotations.AfterMethod;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.Test;

import java.time.Duration;
import java.util.Collections;
import java.util.Map;
import java.util.Optional;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
Expand Down Expand Up @@ -138,4 +142,38 @@ public void testCheckDeduplicationFailedWhenCreatePersistentTopic() throws Excep
Assert.assertEquals(ttb.getState(), expectState);
}


@Test
public void testCloseTransactionBufferWhenTimeout() throws Exception {
String topic = "persistent://" + NAMESPACE1 + "/test_" + UUID.randomUUID();
PulsarService pulsar = pulsarServiceList.get(0);
BrokerService brokerService0 = pulsar.getBrokerService();
BrokerService brokerService = Mockito.spy(brokerService0);
AtomicReference<PersistentTopic> reference = new AtomicReference<>();
pulsar.getConfiguration().setTopicLoadTimeoutSeconds(10);
long topicLoadTimeout = TimeUnit.SECONDS.toMillis(pulsar.getConfiguration().getTopicLoadTimeoutSeconds() + 1);

Mockito
.doAnswer(inv -> {
Thread.sleep(topicLoadTimeout);
PersistentTopic persistentTopic = (PersistentTopic) inv.callRealMethod();
reference.set(persistentTopic);
return persistentTopic;
})
.when(brokerService)
.newPersistentTopic(Mockito.eq(topic), Mockito.any(), Mockito.eq(brokerService));

CompletableFuture<Optional<Topic>> f = brokerService.getTopic(topic, true);

Awaitility.waitAtMost(20, TimeUnit.SECONDS)
.pollInterval(Duration.ofSeconds(2)).until(() -> reference.get() != null);
PersistentTopic persistentTopic = reference.get();
TransactionBuffer buffer = persistentTopic.getTransactionBuffer();
Assert.assertTrue(buffer instanceof TopicTransactionBuffer);
TopicTransactionBuffer ttb = (TopicTransactionBuffer) buffer;
TopicTransactionBufferState.State expectState = TopicTransactionBufferState.State.Close;
Assert.assertEquals(ttb.getState(), expectState);
Assert.assertTrue(f.isCompletedExceptionally());
}

}
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
/*
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
Expand Down

0 comments on commit 8b003f9

Please sign in to comment.