Skip to content

Commit

Permalink
[fix][broker] Create replicated subscriptions for new partitions when…
Browse files Browse the repository at this point in the history
… needed (apache#18659)
  • Loading branch information
lhotari authored and lifepuzzlefun committed Jan 10, 2023
1 parent 195b93c commit 0a4f329
Show file tree
Hide file tree
Showing 2 changed files with 37 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -4519,12 +4519,13 @@ private CompletableFuture<Void> createSubscriptions(TopicName topicName, int num
// We must not re-create non-durable subscriptions on the new partitions
return;
}
boolean replicated = ss.isReplicated();

for (int i = partitionMetadata.partitions; i < numPartitions; i++) {
final String topicNamePartition = topicName.getPartition(i).toString();
CompletableFuture<Void> future = new CompletableFuture<>();
admin.topics().createSubscriptionAsync(topicNamePartition,
subscription, MessageId.latest).whenComplete((__, ex) -> {
subscription, MessageId.latest, replicated).whenComplete((__, ex) -> {
if (ex == null) {
future.complete(null);
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.SortedSet;
Expand Down Expand Up @@ -1169,6 +1170,40 @@ public void testUpdateGlobalTopicPartition() throws Exception {
consumer2.close();
}

@Test
public void testIncrementPartitionsOfTopicWithReplicatedSubscription() throws Exception {
final String cluster1 = pulsar1.getConfig().getClusterName();
final String cluster2 = pulsar2.getConfig().getClusterName();
final String namespace = BrokerTestUtil.newUniqueName("pulsar/ns");
final String topicName = BrokerTestUtil.newUniqueName("persistent://" + namespace + "/topic1");
int startPartitions = 4;
int newPartitions = 8;
final String subscriberName = "sub1";
admin1.namespaces().createNamespace(namespace, Sets.newHashSet(cluster1, cluster2));
admin1.topics().createPartitionedTopic(topicName, startPartitions);

@Cleanup
PulsarClient client1 = PulsarClient.builder().serviceUrl(url1.toString()).statsInterval(0, TimeUnit.SECONDS)
.build();

Consumer<byte[]> consumer1 = client1.newConsumer().topic(topicName).subscriptionName(subscriberName)
.replicateSubscriptionState(true)
.subscribe();

admin1.topics().updatePartitionedTopic(topicName, newPartitions);

assertEquals(admin1.topics().getPartitionedTopicMetadata(topicName).partitions, newPartitions);

Map<String, Boolean> replicatedSubscriptionStatus =
admin1.topics().getReplicatedSubscriptionStatus(topicName, subscriberName);
assertEquals(replicatedSubscriptionStatus.size(), newPartitions);
for (Map.Entry<String, Boolean> replicatedStatusForPartition : replicatedSubscriptionStatus.entrySet()) {
assertTrue(replicatedStatusForPartition.getValue(),
"Replicated status is invalid for " + replicatedStatusForPartition.getKey());
}
consumer1.close();
}

@DataProvider(name = "topicPrefix")
public static Object[][] topicPrefix() {
return new Object[][] { { "persistent://", "/persistent" }, { "non-persistent://", "/non-persistent" } };
Expand Down

0 comments on commit 0a4f329

Please sign in to comment.