diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java index 8c03f432fba84..85c8456cc7887 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java @@ -997,6 +997,13 @@ public CompletableFuture> getTopic(final String topic, boolean c final TopicPolicies topicPolicies = optionalTopicPolicies.orElse(null); return topics.computeIfAbsent(topicName.toString(), (tpName) -> { return loadOrCreatePersistentTopic(tpName, createIfMissing, properties, topicPolicies); + }).thenCompose(optionalTopic -> { + if (!optionalTopic.isPresent() && createIfMissing) { + log.warn("[{}] Try to recreate the topic with createIfMissing=true " + + "but the returned topic is empty", topicName); + return getTopic(topic, createIfMissing, properties); + } + return CompletableFuture.completedFuture(optionalTopic); }); }); } else { diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/TopicAutoCreationTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/TopicAutoCreationTest.java index d4a695663be8e..28020626c4bcc 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/TopicAutoCreationTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/TopicAutoCreationTest.java @@ -28,7 +28,10 @@ import java.util.List; import java.util.UUID; import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; +import lombok.Cleanup; import lombok.extern.slf4j.Slf4j; import org.apache.commons.lang3.tuple.Pair; import org.apache.pulsar.client.admin.PulsarAdminException; @@ -40,6 +43,7 @@ import org.apache.pulsar.client.impl.LookupService; import org.apache.pulsar.common.naming.NamespaceName; import org.apache.pulsar.common.partition.PartitionedTopicMetadata; +import org.apache.pulsar.common.policies.data.AutoTopicCreationOverride; import org.powermock.reflect.Whitebox; import org.testng.annotations.AfterMethod; import org.testng.annotations.BeforeMethod; @@ -55,6 +59,9 @@ protected void setup() throws Exception { conf.setAllowAutoTopicCreationType("partitioned"); conf.setAllowAutoTopicCreation(true); conf.setDefaultNumPartitions(3); + conf.setForceDeleteNamespaceAllowed(true); + conf.setSystemTopicEnabled(true); + conf.setTopicLevelPoliciesEnabled(true); super.internalSetup(); super.producerBaseSetup(); } @@ -188,4 +195,56 @@ public void testPartitionedTopicAutoCreationForbiddenDuringNamespaceDeletion() } } + + @Test + public void testClientWithAutoCreationGotNotFoundException() throws PulsarAdminException, PulsarClientException { + final String namespace = "public/test_1"; + final String topicName = "persistent://public/test_1/test_auto_creation_got_not_found" + + System.currentTimeMillis(); + final int retryTimes = 30; + admin.namespaces().createNamespace(namespace); + admin.namespaces().setAutoTopicCreation(namespace, AutoTopicCreationOverride.builder() + .allowAutoTopicCreation(true) + .topicType("non-partitioned") + .build()); + + @Cleanup("shutdown") + final ExecutorService executor1 = Executors.newSingleThreadExecutor(); + + @Cleanup("shutdown") + final ExecutorService executor2 = Executors.newSingleThreadExecutor(); + + for (int i = 0; i < retryTimes; i++) { + final CompletableFuture adminListSub = CompletableFuture.runAsync(() -> { + try { + admin.topics().getSubscriptions(topicName); + } catch (PulsarAdminException e) { + throw new RuntimeException(e); + } + }, executor1); + + final CompletableFuture> consumerSub = CompletableFuture.supplyAsync(() -> { + try { + return pulsarClient.newConsumer() + .topic(topicName) + .subscriptionName("sub-1") + .subscribe(); + } catch (PulsarClientException e) { + throw new RuntimeException(e); + } + }, executor2); + + try { + adminListSub.join(); + } catch (Throwable ex) { + // we don't care the exception. + } + + consumerSub.join().close(); + admin.topics().delete(topicName, true); + } + + admin.namespaces().deleteNamespace(namespace, true); + } + }