Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[broker] Optimize TopicPolicy#maxProducersPerTopic with HierarchyTopicPolicies #13082

Merged
merged 2 commits into from
Dec 3, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -148,6 +148,7 @@ public AbstractTopic(String topic, BrokerService brokerService) {

protected void updateTopicPolicy(TopicPolicies data) {
topicPolicies.getMaxSubscriptionsPerTopic().updateTopicValue(data.getMaxSubscriptionsPerTopic());
topicPolicies.getMaxProducersPerTopic().updateTopicValue(data.getMaxProducerPerTopic());
topicPolicies.getInactiveTopicPolicies().updateTopicValue(data.getInactiveTopicPolicies());
topicPolicies.getDeduplicationEnabled().updateTopicValue(data.getDeduplicationEnabled());
Arrays.stream(BacklogQuota.BacklogQuotaType.values()).forEach(type ->
Expand All @@ -164,6 +165,7 @@ protected void updateTopicPolicyByNamespacePolicy(Policies namespacePolicies) {
return;
}
topicPolicies.getMaxSubscriptionsPerTopic().updateNamespaceValue(namespacePolicies.max_subscriptions_per_topic);
topicPolicies.getMaxProducersPerTopic().updateNamespaceValue(namespacePolicies.max_producers_per_topic);
topicPolicies.getInactiveTopicPolicies().updateNamespaceValue(namespacePolicies.inactive_topic_policies);
topicPolicies.getDeduplicationEnabled().updateNamespaceValue(namespacePolicies.deduplicationEnabled);
Arrays.stream(BacklogQuota.BacklogQuotaType.values()).forEach(
Expand All @@ -179,6 +181,7 @@ private void updateTopicPolicyByBrokerConfig(HierarchyTopicPolicies topicPolicie
config.isBrokerDeleteInactiveTopicsEnabled()));

topicPolicies.getMaxSubscriptionsPerTopic().updateBrokerValue(config.getMaxSubscriptionsPerTopic());
topicPolicies.getMaxProducersPerTopic().updateBrokerValue(config.getMaxProducersPerTopic());
topicPolicies.getDeduplicationEnabled().updateBrokerValue(config.isBrokerDeduplicationEnabled());
//init backlogQuota
topicPolicies.getBackLogQuotaMap()
Expand All @@ -192,16 +195,7 @@ private void updateTopicPolicyByBrokerConfig(HierarchyTopicPolicies topicPolicie
}

protected boolean isProducersExceeded() {
Integer maxProducers = getTopicPolicies().map(TopicPolicies::getMaxProducerPerTopic).orElse(null);

if (maxProducers == null) {
Policies policies = brokerService.pulsar().getPulsarResources().getNamespaceResources()
.getPoliciesIfCached(TopicName.get(topic).getNamespaceObject())
.orElseGet(() -> new Policies());
maxProducers = policies.max_producers_per_topic;
}
maxProducers = maxProducers != null ? maxProducers : brokerService.pulsar()
.getConfiguration().getMaxProducersPerTopic();
Integer maxProducers = topicPolicies.getMaxProducersPerTopic().get();
if (maxProducers > 0 && maxProducers <= producers.size()) {
return true;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,7 @@
import org.apache.pulsar.common.policies.data.BacklogQuota;
import org.apache.pulsar.common.policies.data.ClusterData;
import org.apache.pulsar.common.policies.data.DispatchRate;
import org.apache.pulsar.common.policies.data.HierarchyTopicPolicies;
import org.apache.pulsar.common.policies.data.InactiveTopicDeleteMode;
import org.apache.pulsar.common.policies.data.InactiveTopicPolicies;
import org.apache.pulsar.common.policies.data.PersistencePolicies;
Expand Down Expand Up @@ -965,16 +966,51 @@ public void testGetMaxProducerApplied() throws Exception {
assertEquals(admin.topicPolicies().getMaxProducers(topic, true).intValue(), conf.getMaxProducersPerTopic());
}

private void waitTopicPoliciesApplied(String topic, int partitions,
java.util.function.Consumer<HierarchyTopicPolicies> condition) {
TopicName topicName = TopicName.get(topic);
if (partitions > 0) {
for (int i = 0; i < partitions; i++) {
String partition = topicName.getPartition(i).toString();
Awaitility.await().untilAsserted(() -> {
Topic t = pulsar.getBrokerService().getTopicIfExists(partition).get().get();
assertTrue(t instanceof AbstractTopic);
condition.accept(((AbstractTopic) t).getHierarchyTopicPolicies());
});
}
} else {
Awaitility.await().untilAsserted(() -> {
Topic t = pulsar.getBrokerService().getTopicIfExists(topic).get().get();
assertTrue(t instanceof AbstractTopic);
condition.accept(((AbstractTopic) t).getHierarchyTopicPolicies());
});
}
}

@Test
public void testSetMaxProducers() throws Exception {
Integer maxProducers = 2;
log.info("MaxProducers: {} will set to the topic: {}", maxProducers, persistenceTopic);

//broker level setting is 4
conf.setMaxProducersPerTopic(4);
admin.topics().createPartitionedTopic(persistenceTopic, 2);
waitTopicPoliciesApplied(persistenceTopic, 2, hierarchyTopicPolicies -> {
assertEquals((int) hierarchyTopicPolicies.getMaxProducersPerTopic().get(), 4);
});
//ns level setting is 3
admin.namespaces().setMaxProducersPerTopic(myNamespace, 3);
waitTopicPoliciesApplied(persistenceTopic, 2, hierarchyTopicPolicies -> {
assertEquals((int) hierarchyTopicPolicies.getMaxProducersPerTopic().get(), 3);
});
//topic level setting is 2
admin.topicPolicies().setMaxProducers(persistenceTopic, maxProducers);
waitTopicPoliciesApplied(persistenceTopic, 2, hierarchyTopicPolicies -> {
assertEquals((int) hierarchyTopicPolicies.getMaxProducersPerTopic().get(), 2);
});

Awaitility.await()
.untilAsserted(() -> Assert.assertEquals(admin.topicPolicies().getMaxProducers(persistenceTopic), maxProducers));
.untilAsserted(() -> Assert.assertEquals(admin.topicPolicies().getMaxProducers(persistenceTopic),
maxProducers));

Producer<byte[]> producer1 = pulsarClient.newProducer().topic(persistenceTopic).create();
Producer<byte[]> producer2 = pulsarClient.newProducer().topic(persistenceTopic).create();
Expand All @@ -990,6 +1026,37 @@ public void testSetMaxProducers() throws Exception {
Assert.assertNotNull(producer2);
Assert.assertNull(producer3);

admin.topicPolicies().removeMaxProducers(persistenceTopic);
waitTopicPoliciesApplied(persistenceTopic, 2, hierarchyTopicPolicies -> {
assertEquals((int) hierarchyTopicPolicies.getMaxProducersPerTopic().get(), 3);
});
producer3 = pulsarClient.newProducer().topic(persistenceTopic).create();

Producer<byte[]> producer4;
try {
producer4 = pulsarClient.newProducer().topic(persistenceTopic).create();
Assert.fail();
} catch (PulsarClientException e) {
log.info("Topic reached max producers limit on topic level.");
}

admin.namespaces().removeMaxProducersPerTopic(myNamespace);
waitTopicPoliciesApplied(persistenceTopic, 2, hierarchyTopicPolicies -> {
assertEquals((int) hierarchyTopicPolicies.getMaxProducersPerTopic().get(), 4);
});
producer4 = pulsarClient.newProducer().topic(persistenceTopic).create();

try {
Producer<byte[]> producer5 = pulsarClient.newProducer().topic(persistenceTopic).create();
Assert.fail();
} catch (PulsarClientException e) {
log.info("Topic reached max producers limit on topic level.");
}
producer1.close();
producer2.close();
producer3.close();
producer4.close();

admin.topics().deletePartitionedTopic(persistenceTopic, true);
admin.topics().deletePartitionedTopic(testTopic, true);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -532,6 +532,7 @@ public void testProducerOverwrite() throws Exception {

private void testMaxProducers() throws Exception {
PersistentTopic topic = new PersistentTopic(successTopicName, ledgerMock, brokerService);
topic.initialize();
String role = "appid1";
// 1. add producer1
Producer producer = new Producer(topic, serverCnx, 1 /* producer id */, "prod-name1", role,
Expand Down Expand Up @@ -577,8 +578,11 @@ public void testMaxProducersForNamespace() throws Exception {
.thenReturn(Optional.of(policies));

when(pulsar.getPulsarResources().getNamespaceResources()
.getPolicies(TopicName.get(successTopicName).getNamespaceObject()))
.thenReturn(Optional.of(policies));
.getPolicies(TopicName.get(successTopicName).getNamespaceObject()))
.thenReturn(Optional.of(policies));
when(pulsar.getPulsarResources().getNamespaceResources()
.getPoliciesAsync(TopicName.get(successTopicName).getNamespaceObject()))
.thenReturn(CompletableFuture.completedFuture(Optional.of(policies)));
testMaxProducers();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,13 +32,15 @@ public class HierarchyTopicPolicies {
final PolicyHierarchyValue<Boolean> deduplicationEnabled;
final PolicyHierarchyValue<InactiveTopicPolicies> inactiveTopicPolicies;
final PolicyHierarchyValue<Integer> maxSubscriptionsPerTopic;
final PolicyHierarchyValue<Integer> maxProducersPerTopic;
final Map<BacklogQuotaType, PolicyHierarchyValue<BacklogQuota>> backLogQuotaMap;
final PolicyHierarchyValue<Integer> topicMaxMessageSize;

public HierarchyTopicPolicies() {
deduplicationEnabled = new PolicyHierarchyValue<>();
inactiveTopicPolicies = new PolicyHierarchyValue<>();
maxSubscriptionsPerTopic = new PolicyHierarchyValue<>();
maxProducersPerTopic = new PolicyHierarchyValue<>();
backLogQuotaMap = new ImmutableMap.Builder<BacklogQuotaType, PolicyHierarchyValue<BacklogQuota>>()
.put(BacklogQuotaType.destination_storage, new PolicyHierarchyValue<>())
.put(BacklogQuotaType.message_age, new PolicyHierarchyValue<>())
Expand Down