From 08f945876b60cdaf1677d32a1ddb7a7699fb3875 Mon Sep 17 00:00:00 2001 From: JiangHaiting Date: Fri, 26 Nov 2021 23:08:00 +0800 Subject: [PATCH 1/2] optimize topic policy of max_producers_per_topic --- .../pulsar/broker/service/AbstractTopic.java | 14 ++-- .../broker/admin/TopicPoliciesTest.java | 71 ++++++++++++++++++- .../policies/data/HierarchyTopicPolicies.java | 2 + 3 files changed, 75 insertions(+), 12 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java index 9e8c952a0a0c4..00f705fad22f4 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java @@ -148,6 +148,7 @@ public AbstractTopic(String topic, BrokerService brokerService) { protected void updateTopicPolicy(TopicPolicies data) { topicPolicies.getMaxSubscriptionsPerTopic().updateTopicValue(data.getMaxSubscriptionsPerTopic()); + topicPolicies.getMaxProducersPerTopic().updateTopicValue(data.getMaxProducerPerTopic()); topicPolicies.getInactiveTopicPolicies().updateTopicValue(data.getInactiveTopicPolicies()); topicPolicies.getDeduplicationEnabled().updateTopicValue(data.getDeduplicationEnabled()); Arrays.stream(BacklogQuota.BacklogQuotaType.values()).forEach(type -> @@ -164,6 +165,7 @@ protected void updateTopicPolicyByNamespacePolicy(Policies namespacePolicies) { return; } topicPolicies.getMaxSubscriptionsPerTopic().updateNamespaceValue(namespacePolicies.max_subscriptions_per_topic); + topicPolicies.getMaxProducersPerTopic().updateNamespaceValue(namespacePolicies.max_producers_per_topic); topicPolicies.getInactiveTopicPolicies().updateNamespaceValue(namespacePolicies.inactive_topic_policies); topicPolicies.getDeduplicationEnabled().updateNamespaceValue(namespacePolicies.deduplicationEnabled); Arrays.stream(BacklogQuota.BacklogQuotaType.values()).forEach( @@ -179,6 +181,7 @@ private void updateTopicPolicyByBrokerConfig(HierarchyTopicPolicies topicPolicie config.isBrokerDeleteInactiveTopicsEnabled())); topicPolicies.getMaxSubscriptionsPerTopic().updateBrokerValue(config.getMaxSubscriptionsPerTopic()); + topicPolicies.getMaxProducersPerTopic().updateBrokerValue(config.getMaxProducersPerTopic()); topicPolicies.getDeduplicationEnabled().updateBrokerValue(config.isBrokerDeduplicationEnabled()); //init backlogQuota topicPolicies.getBackLogQuotaMap() @@ -192,16 +195,7 @@ private void updateTopicPolicyByBrokerConfig(HierarchyTopicPolicies topicPolicie } protected boolean isProducersExceeded() { - Integer maxProducers = getTopicPolicies().map(TopicPolicies::getMaxProducerPerTopic).orElse(null); - - if (maxProducers == null) { - Policies policies = brokerService.pulsar().getPulsarResources().getNamespaceResources() - .getPoliciesIfCached(TopicName.get(topic).getNamespaceObject()) - .orElseGet(() -> new Policies()); - maxProducers = policies.max_producers_per_topic; - } - maxProducers = maxProducers != null ? maxProducers : brokerService.pulsar() - .getConfiguration().getMaxProducersPerTopic(); + Integer maxProducers = topicPolicies.getMaxProducersPerTopic().get(); if (maxProducers > 0 && maxProducers <= producers.size()) { return true; } diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/TopicPoliciesTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/TopicPoliciesTest.java index 4acab8ed163ed..9a225f00cddcf 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/TopicPoliciesTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/TopicPoliciesTest.java @@ -71,6 +71,7 @@ import org.apache.pulsar.common.policies.data.BacklogQuota; import org.apache.pulsar.common.policies.data.ClusterData; import org.apache.pulsar.common.policies.data.DispatchRate; +import org.apache.pulsar.common.policies.data.HierarchyTopicPolicies; import org.apache.pulsar.common.policies.data.InactiveTopicDeleteMode; import org.apache.pulsar.common.policies.data.InactiveTopicPolicies; import org.apache.pulsar.common.policies.data.PersistencePolicies; @@ -965,16 +966,51 @@ public void testGetMaxProducerApplied() throws Exception { assertEquals(admin.topicPolicies().getMaxProducers(topic, true).intValue(), conf.getMaxProducersPerTopic()); } + private void waitTopicPoliciesApplied(String topic, int partitions, + java.util.function.Consumer condition) { + TopicName topicName = TopicName.get(topic); + if (partitions > 0) { + for (int i = 0; i < partitions; i++) { + String partition = topicName.getPartition(i).toString(); + Awaitility.await().untilAsserted(() -> { + Topic t = pulsar.getBrokerService().getTopicIfExists(partition).get().get(); + assertTrue(t instanceof AbstractTopic); + condition.accept(((AbstractTopic) t).getHierarchyTopicPolicies()); + }); + } + } else { + Awaitility.await().untilAsserted(() -> { + Topic t = pulsar.getBrokerService().getTopicIfExists(topic).get().get(); + assertTrue(t instanceof AbstractTopic); + condition.accept(((AbstractTopic) t).getHierarchyTopicPolicies()); + }); + } + } + @Test public void testSetMaxProducers() throws Exception { Integer maxProducers = 2; log.info("MaxProducers: {} will set to the topic: {}", maxProducers, persistenceTopic); - + //broker level setting is 4 + conf.setMaxProducersPerTopic(4); admin.topics().createPartitionedTopic(persistenceTopic, 2); + waitTopicPoliciesApplied(persistenceTopic, 2, hierarchyTopicPolicies -> { + assertEquals((int) hierarchyTopicPolicies.getMaxProducersPerTopic().get(), 4); + }); + //ns level setting is 3 + admin.namespaces().setMaxProducersPerTopic(myNamespace, 3); + waitTopicPoliciesApplied(persistenceTopic, 2, hierarchyTopicPolicies -> { + assertEquals((int) hierarchyTopicPolicies.getMaxProducersPerTopic().get(), 3); + }); + //topic level setting is 2 admin.topicPolicies().setMaxProducers(persistenceTopic, maxProducers); + waitTopicPoliciesApplied(persistenceTopic, 2, hierarchyTopicPolicies -> { + assertEquals((int) hierarchyTopicPolicies.getMaxProducersPerTopic().get(), 2); + }); Awaitility.await() - .untilAsserted(() -> Assert.assertEquals(admin.topicPolicies().getMaxProducers(persistenceTopic), maxProducers)); + .untilAsserted(() -> Assert.assertEquals(admin.topicPolicies().getMaxProducers(persistenceTopic), + maxProducers)); Producer producer1 = pulsarClient.newProducer().topic(persistenceTopic).create(); Producer producer2 = pulsarClient.newProducer().topic(persistenceTopic).create(); @@ -990,6 +1026,37 @@ public void testSetMaxProducers() throws Exception { Assert.assertNotNull(producer2); Assert.assertNull(producer3); + admin.topicPolicies().removeMaxProducers(persistenceTopic); + waitTopicPoliciesApplied(persistenceTopic, 2, hierarchyTopicPolicies -> { + assertEquals((int) hierarchyTopicPolicies.getMaxProducersPerTopic().get(), 3); + }); + producer3 = pulsarClient.newProducer().topic(persistenceTopic).create(); + + Producer producer4; + try { + producer4 = pulsarClient.newProducer().topic(persistenceTopic).create(); + Assert.fail(); + } catch (PulsarClientException e) { + log.info("Topic reached max producers limit on topic level."); + } + + admin.namespaces().removeMaxProducersPerTopic(myNamespace); + waitTopicPoliciesApplied(persistenceTopic, 2, hierarchyTopicPolicies -> { + assertEquals((int) hierarchyTopicPolicies.getMaxProducersPerTopic().get(), 4); + }); + producer4 = pulsarClient.newProducer().topic(persistenceTopic).create(); + + try { + Producer producer5 = pulsarClient.newProducer().topic(persistenceTopic).create(); + Assert.fail(); + } catch (PulsarClientException e) { + log.info("Topic reached max producers limit on topic level."); + } + producer1.close(); + producer2.close(); + producer3.close(); + producer4.close(); + admin.topics().deletePartitionedTopic(persistenceTopic, true); admin.topics().deletePartitionedTopic(testTopic, true); } diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/HierarchyTopicPolicies.java b/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/HierarchyTopicPolicies.java index a8087b8c39101..171a3d4153478 100644 --- a/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/HierarchyTopicPolicies.java +++ b/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/HierarchyTopicPolicies.java @@ -32,6 +32,7 @@ public class HierarchyTopicPolicies { final PolicyHierarchyValue deduplicationEnabled; final PolicyHierarchyValue inactiveTopicPolicies; final PolicyHierarchyValue maxSubscriptionsPerTopic; + final PolicyHierarchyValue maxProducersPerTopic; final Map> backLogQuotaMap; final PolicyHierarchyValue topicMaxMessageSize; @@ -39,6 +40,7 @@ public HierarchyTopicPolicies() { deduplicationEnabled = new PolicyHierarchyValue<>(); inactiveTopicPolicies = new PolicyHierarchyValue<>(); maxSubscriptionsPerTopic = new PolicyHierarchyValue<>(); + maxProducersPerTopic = new PolicyHierarchyValue<>(); backLogQuotaMap = new ImmutableMap.Builder>() .put(BacklogQuotaType.destination_storage, new PolicyHierarchyValue<>()) .put(BacklogQuotaType.message_age, new PolicyHierarchyValue<>()) From 8fee96001a4c54bbf064d1aace9c8343f8210f93 Mon Sep 17 00:00:00 2001 From: JiangHaiting Date: Thu, 2 Dec 2021 19:32:26 +0800 Subject: [PATCH 2/2] fix test --- .../apache/pulsar/broker/service/PersistentTopicTest.java | 8 ++++++-- 1 file changed, 6 insertions(+), 2 deletions(-) diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicTest.java index 74081b9084af7..d42b44c12f309 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicTest.java @@ -532,6 +532,7 @@ public void testProducerOverwrite() throws Exception { private void testMaxProducers() throws Exception { PersistentTopic topic = new PersistentTopic(successTopicName, ledgerMock, brokerService); + topic.initialize(); String role = "appid1"; // 1. add producer1 Producer producer = new Producer(topic, serverCnx, 1 /* producer id */, "prod-name1", role, @@ -577,8 +578,11 @@ public void testMaxProducersForNamespace() throws Exception { .thenReturn(Optional.of(policies)); when(pulsar.getPulsarResources().getNamespaceResources() - .getPolicies(TopicName.get(successTopicName).getNamespaceObject())) - .thenReturn(Optional.of(policies)); + .getPolicies(TopicName.get(successTopicName).getNamespaceObject())) + .thenReturn(Optional.of(policies)); + when(pulsar.getPulsarResources().getNamespaceResources() + .getPoliciesAsync(TopicName.get(successTopicName).getNamespaceObject())) + .thenReturn(CompletableFuture.completedFuture(Optional.of(policies))); testMaxProducers(); }