Skip to content

Commit

Permalink
[fix][broker] Closed topics won't be removed from the cache
Browse files Browse the repository at this point in the history
  • Loading branch information
shibd committed Jan 26, 2025
1 parent e5bd774 commit 6cdf378
Show file tree
Hide file tree
Showing 7 changed files with 181 additions and 49 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@
import java.util.function.ToLongFunction;
import javax.annotation.Nonnull;
import lombok.Getter;
import lombok.Setter;
import org.apache.bookkeeper.mledger.util.StatsBuckets;
import org.apache.commons.collections4.CollectionUtils;
import org.apache.commons.collections4.MapUtils;
Expand Down Expand Up @@ -95,6 +96,9 @@ public abstract class AbstractTopic implements Topic, TopicPolicyListener {
protected static final long POLICY_UPDATE_FAILURE_RETRY_TIME_SECONDS = 60;

protected final String topic;
@Getter
@Setter
protected volatile CompletableFuture<Optional<Topic>> createFuture;

// Producers currently connected to this topic
protected final ConcurrentHashMap<String, Producer> producers;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1326,6 +1326,7 @@ private CompletableFuture<Optional<Topic>> createNonPersistentTopic(String topic
NonPersistentTopic nonPersistentTopic;
try {
nonPersistentTopic = newTopic(topic, null, this, NonPersistentTopic.class);
nonPersistentTopic.setCreateFuture(topicFuture);
} catch (Throwable e) {
log.warn("Failed to create topic {}", topic, e);
topicFuture.completeExceptionally(e);
Expand Down Expand Up @@ -1800,6 +1801,7 @@ public void openLedgerComplete(ManagedLedger ledger, Object ctx) {
PersistentTopic persistentTopic = isSystemTopic(topic)
? new SystemTopic(topic, ledger, BrokerService.this)
: newTopic(topic, ledger, BrokerService.this, PersistentTopic.class);
persistentTopic.setCreateFuture(topicFuture);
persistentTopic
.initialize()
.thenCompose(__ -> persistentTopic.preCreateSubscriptionForCompactionIfNeeded())
Expand Down Expand Up @@ -2409,42 +2411,7 @@ public AuthorizationService getAuthorizationService() {
return authorizationService;
}

public CompletableFuture<Void> removeTopicFromCache(Topic topic) {
Optional<CompletableFuture<Optional<Topic>>> createTopicFuture = findTopicFutureInCache(topic);
if (createTopicFuture.isEmpty()){
return CompletableFuture.completedFuture(null);
}
return removeTopicFutureFromCache(topic.getName(), createTopicFuture.get());
}

private Optional<CompletableFuture<Optional<Topic>>> findTopicFutureInCache(Topic topic){
if (topic == null){
return Optional.empty();
}
final CompletableFuture<Optional<Topic>> createTopicFuture = topics.get(topic.getName());
// If not exists in cache, do nothing.
if (createTopicFuture == null){
return Optional.empty();
}
// If the future in cache is not yet complete, the topic instance in the cache is not the same with the topic.
if (!createTopicFuture.isDone()){
return Optional.empty();
}
// If the future in cache has exception complete,
// the topic instance in the cache is not the same with the topic.
if (createTopicFuture.isCompletedExceptionally()){
return Optional.empty();
}
Optional<Topic> optionalTopic = createTopicFuture.join();
Topic topicInCache = optionalTopic.orElse(null);
if (topicInCache == null || topicInCache != topic){
return Optional.empty();
} else {
return Optional.of(createTopicFuture);
}
}

private CompletableFuture<Void> removeTopicFutureFromCache(String topic,
public CompletableFuture<Void> removeTopicFutureFromCache(String topic,
CompletableFuture<Optional<Topic>> createTopicFuture) {
TopicName topicName = TopicName.get(topic);
return pulsar.getNamespaceService().getBundleAsync(topicName)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -454,7 +454,7 @@ private CompletableFuture<Void> delete(boolean failIfHasSubscriptions, boolean c
// topic GC iterates over topics map and removing from the map with the same thread creates
// deadlock. so, execute it in different thread
brokerService.executor().execute(() -> {
brokerService.removeTopicFromCache(NonPersistentTopic.this);
brokerService.removeTopicFutureFromCache(topic, createFuture);
unregisterTopicPolicyListener();
log.info("[{}] Topic deleted", topic);
deleteFuture.complete(null);
Expand Down Expand Up @@ -555,7 +555,7 @@ public CompletableFuture<Void> close(
brokerService.executor().execute(() -> {

if (disconnectClients) {
brokerService.removeTopicFromCache(NonPersistentTopic.this);
brokerService.removeTopicFutureFromCache(topic, createFuture);
unregisterTopicPolicyListener();
}
closeFuture.complete(null);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1530,7 +1530,7 @@ private CompletableFuture<Void> delete(boolean failIfHasSubscriptions,
ledger.asyncDelete(new AsyncCallbacks.DeleteLedgerCallback() {
@Override
public void deleteLedgerComplete(Object ctx) {
brokerService.removeTopicFromCache(PersistentTopic.this);
brokerService.removeTopicFutureFromCache(topic, createFuture);

dispatchRateLimiter.ifPresent(DispatchRateLimiter::close);

Expand Down Expand Up @@ -1807,7 +1807,7 @@ private boolean isClosed() {
}

private void disposeTopic(CompletableFuture<?> closeFuture) {
brokerService.removeTopicFromCache(PersistentTopic.this)
brokerService.removeTopicFutureFromCache(topic, createFuture)
.thenRun(() -> {
replicatedSubscriptionsController.ifPresent(ReplicatedSubscriptionsController::close);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,25 @@ public class TopicTransactionBuffer extends TopicTransactionBufferState implemen
private final AbortedTxnProcessor.SnapshotType snapshotType;
private final MaxReadPositionCallBack maxReadPositionCallBack;

private static AbortedTxnProcessor createSnapshotProcessor(PersistentTopic topic) {
return topic.getBrokerService().getPulsar().getConfiguration().isTransactionBufferSegmentedSnapshotEnabled()
? new SnapshotSegmentAbortedTxnProcessorImpl(topic)
: new SingleSnapshotAbortedTxnProcessorImpl(topic);
}

private static AbortedTxnProcessor.SnapshotType determineSnapshotType(PersistentTopic topic) {
return topic.getBrokerService().getPulsar().getConfiguration().isTransactionBufferSegmentedSnapshotEnabled()
? AbortedTxnProcessor.SnapshotType.Segment
: AbortedTxnProcessor.SnapshotType.Single;
}

public TopicTransactionBuffer(PersistentTopic topic) {
this(topic, createSnapshotProcessor(topic), determineSnapshotType(topic));
}

@VisibleForTesting
TopicTransactionBuffer(PersistentTopic topic, AbortedTxnProcessor snapshotAbortedTxnProcessor,
AbortedTxnProcessor.SnapshotType snapshotType) {
super(State.None);
this.topic = topic;
this.timer = topic.getBrokerService().getPulsar().getTransactionTimer();
Expand All @@ -118,13 +136,8 @@ public TopicTransactionBuffer(PersistentTopic topic) {
this.takeSnapshotIntervalTime = topic.getBrokerService().getPulsar()
.getConfiguration().getTransactionBufferSnapshotMinTimeInMillis();
this.maxReadPosition = topic.getManagedLedger().getLastConfirmedEntry();
if (topic.getBrokerService().getPulsar().getConfiguration().isTransactionBufferSegmentedSnapshotEnabled()) {
snapshotAbortedTxnProcessor = new SnapshotSegmentAbortedTxnProcessorImpl(topic);
snapshotType = AbortedTxnProcessor.SnapshotType.Segment;
} else {
snapshotAbortedTxnProcessor = new SingleSnapshotAbortedTxnProcessorImpl(topic);
snapshotType = AbortedTxnProcessor.SnapshotType.Single;
}
this.snapshotAbortedTxnProcessor = snapshotAbortedTxnProcessor;
this.snapshotType = snapshotType;
this.maxReadPositionCallBack = topic.getMaxReadPositionCallBack();
this.recover();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -126,7 +126,7 @@ public void testCrashBrokerWithoutCursorLedgerLeak() throws Exception {
// (3) remove topic and managed-ledger from broker which means topic is not closed gracefully
consumer.close();
producer.close();
pulsar.getBrokerService().removeTopicFromCache(topic);
pulsar.getBrokerService().removeTopicFutureFromCache(topic.getName(), topic.getCreateFuture());
ManagedLedgerFactoryImpl factory = (ManagedLedgerFactoryImpl) pulsar.getDefaultManagedLedgerFactory();
Field field = ManagedLedgerFactoryImpl.class.getDeclaredField("ledgers");
field.setAccessible(true);
Expand Down Expand Up @@ -249,7 +249,7 @@ public void testSkipCorruptDataLedger() throws Exception {

// clean managed-ledger and recreate topic to clean any data from the cache
producer.close();
pulsar.getBrokerService().removeTopicFromCache(topic);
pulsar.getBrokerService().removeTopicFutureFromCache(topic.getName(), topic.getCreateFuture());
ManagedLedgerFactoryImpl factory = (ManagedLedgerFactoryImpl) pulsar.getDefaultManagedLedgerFactory();
Field field = ManagedLedgerFactoryImpl.class.getDeclaredField("ledgers");
field.setAccessible(true);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,148 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.pulsar.broker.transaction.buffer.impl;

import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
import static org.testng.Assert.assertFalse;
import static org.testng.Assert.assertTrue;
import java.io.IOException;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import lombok.SneakyThrows;
import lombok.extern.slf4j.Slf4j;
import org.apache.bookkeeper.mledger.ManagedLedger;
import org.apache.pulsar.broker.BrokerTestUtil;
import org.apache.pulsar.broker.service.BrokerService;
import org.apache.pulsar.broker.service.Topic;
import org.apache.pulsar.broker.service.TopicFactory;
import org.apache.pulsar.broker.service.nonpersistent.NonPersistentTopic;
import org.apache.pulsar.broker.service.persistent.PersistentTopic;
import org.apache.pulsar.broker.transaction.buffer.AbortedTxnProcessor;
import org.apache.pulsar.broker.transaction.buffer.TransactionBufferProvider;
import org.apache.pulsar.client.api.ProducerConsumerBase;
import org.awaitility.Awaitility;
import org.testng.annotations.AfterClass;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.Test;

@Slf4j
@Test(groups = "broker")
public class TransactionPersistentTopicTest extends ProducerConsumerBase {

private static CountDownLatch topicInitSuccessSignal = new CountDownLatch(1);

@BeforeClass(alwaysRun = true)
@Override
protected void setup() throws Exception {
// Intercept when the `topicFuture` is about to complete and wait until the topic close operation finishes.
conf.setTopicFactoryClassName(MyTopicFactory.class.getName());
conf.setTransactionCoordinatorEnabled(true);
conf.setBrokerDeduplicationEnabled(false);
super.internalSetup();
super.producerBaseSetup();
}

@AfterClass(alwaysRun = true)
@Override
protected void cleanup() throws Exception {
super.internalCleanup();
}

@Test
public void testNoOrphanClosedTopicIfTxnInternalFailed() {
String tpName = BrokerTestUtil.newUniqueName("persistent://public/default/tp2");

BrokerService brokerService = pulsar.getBrokerService();

// 2. Mock close topic when create transactionBuffer
TransactionBufferProvider mockTransactionBufferProvider = originTopic -> {
AbortedTxnProcessor abortedTxnProcessor = mock(AbortedTxnProcessor.class);
doAnswer(invocation -> {
topicInitSuccessSignal.await();
return CompletableFuture.failedFuture(new RuntimeException("Mock recovery failed"));
}).when(abortedTxnProcessor).recoverFromSnapshot();
when(abortedTxnProcessor.closeAsync()).thenReturn(CompletableFuture.completedFuture(null));
return new TopicTransactionBuffer(
(PersistentTopic) originTopic, abortedTxnProcessor, AbortedTxnProcessor.SnapshotType.Single);
};
TransactionBufferProvider originalTransactionBufferProvider = pulsar.getTransactionBufferProvider();
pulsar.setTransactionBufferProvider(mockTransactionBufferProvider);

// 3. Trigger create topic and assert topic load success.
CompletableFuture<Optional<Topic>> firstLoad = brokerService.getTopic(tpName, true);
Awaitility.await().ignoreExceptions().atMost(10, TimeUnit.SECONDS)
.pollInterval(200, TimeUnit.MILLISECONDS)
.untilAsserted(() -> {
assertTrue(firstLoad.isDone());
assertFalse(firstLoad.isCompletedExceptionally());
});

// 4. Assert topic removed from cache
Awaitility.await().ignoreExceptions().atMost(10, TimeUnit.SECONDS)
.pollInterval(500, TimeUnit.MILLISECONDS)
.untilAsserted(() -> {
assertFalse(brokerService.getTopics().containsKey(tpName));
});

// 5. Set txn provider to back
pulsar.setTransactionBufferProvider(originalTransactionBufferProvider);
}

public static class MyTopicFactory implements TopicFactory {
@Override
public <T extends Topic> T create(String topic, ManagedLedger ledger, BrokerService brokerService,
Class<T> topicClazz) {
try {
if (topicClazz == NonPersistentTopic.class) {
return (T) new NonPersistentTopic(topic, brokerService);
} else {
return (T) new MyPersistentTopic(topic, ledger, brokerService);
}
} catch (Exception e) {
throw new IllegalStateException(e);
}
}

@Override
public void close() throws IOException {
// No-op
}
}

public static class MyPersistentTopic extends PersistentTopic {

public MyPersistentTopic(String topic, ManagedLedger ledger, BrokerService brokerService) {
super(topic, ledger, brokerService);
}

@SneakyThrows
@Override
public CompletableFuture<Void> checkDeduplicationStatus() {
topicInitSuccessSignal.countDown();
// Sleep 1s pending txn buffer recover failed and close topic
Thread.sleep(1000);
return CompletableFuture.completedFuture(null);
}
}

}

0 comments on commit 6cdf378

Please sign in to comment.