Skip to content

Commit

Permalink
Optimize AbstractTopic#inactiveTopicPolicies with PolicyHierarchyValue
Browse files Browse the repository at this point in the history
  • Loading branch information
Jiang Haiting committed Nov 9, 2021
1 parent 849e4dc commit a6d8d24
Show file tree
Hide file tree
Showing 4 changed files with 51 additions and 97 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,6 @@
import org.apache.pulsar.broker.stats.prometheus.metrics.Summary;
import org.apache.pulsar.broker.systopic.SystemTopicClient;
import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.policies.data.InactiveTopicDeleteMode;
import org.apache.pulsar.common.policies.data.InactiveTopicPolicies;
import org.apache.pulsar.common.policies.data.Policies;
import org.apache.pulsar.common.policies.data.PolicyHierarchyValue;
Expand Down Expand Up @@ -83,7 +82,7 @@ public abstract class AbstractTopic implements Topic {
protected volatile boolean isFenced;

// Inactive topic policies
protected InactiveTopicPolicies inactiveTopicPolicies = new InactiveTopicPolicies();
protected final PolicyHierarchyValue<InactiveTopicPolicies> inactiveTopicPolicies;

// Timestamp of when this topic was last seen active
protected volatile long lastActive;
Expand Down Expand Up @@ -143,23 +142,23 @@ public AbstractTopic(String topic, BrokerService brokerService) {
this.brokerService = brokerService;
this.producers = new ConcurrentHashMap<>();
this.isFenced = false;
this.replicatorPrefix = brokerService.pulsar().getConfiguration().getReplicatorPrefix();
this.inactiveTopicPolicies.setDeleteWhileInactive(brokerService.pulsar().getConfiguration()
.isBrokerDeleteInactiveTopicsEnabled());
this.inactiveTopicPolicies.setMaxInactiveDurationSeconds(brokerService.pulsar().getConfiguration()
.getBrokerDeleteInactiveTopicsMaxInactiveDurationSeconds());
this.inactiveTopicPolicies.setInactiveTopicDeleteMode(brokerService.pulsar().getConfiguration()
.getBrokerDeleteInactiveTopicsMode());
this.topicMaxMessageSizeCheckIntervalMs = TimeUnit.SECONDS.toMillis(brokerService.pulsar().getConfiguration()
.getMaxMessageSizeCheckIntervalInSeconds());
ServiceConfiguration config = brokerService.pulsar().getConfiguration();
this.replicatorPrefix = config.getReplicatorPrefix();

inactiveTopicPolicies = new PolicyHierarchyValue<>();
inactiveTopicPolicies.updateBrokerValue(new InactiveTopicPolicies(
config.getBrokerDeleteInactiveTopicsMode(),
config.getBrokerDeleteInactiveTopicsMaxInactiveDurationSeconds(),
config.isBrokerDeleteInactiveTopicsEnabled()));

this.topicMaxMessageSizeCheckIntervalMs = TimeUnit.SECONDS.toMillis(
config.getMaxMessageSizeCheckIntervalInSeconds());

maxSubscriptionsPerTopic = new PolicyHierarchyValue<>();
maxSubscriptionsPerTopic.updateBrokerValue(brokerService.pulsar().getConfiguration()
.getMaxSubscriptionsPerTopic());
maxSubscriptionsPerTopic.updateBrokerValue(config.getMaxSubscriptionsPerTopic());

this.lastActive = System.nanoTime();
this.preciseTopicPublishRateLimitingEnable =
brokerService.pulsar().getConfiguration().isPreciseTopicPublishRateLimiterEnable();
this.preciseTopicPublishRateLimitingEnable = config.isPreciseTopicPublishRateLimiterEnable();
updatePublishDispatcher(Optional.empty());
}

Expand Down Expand Up @@ -869,30 +868,19 @@ public long getBytesOutCounter() {
}

public boolean isDeleteWhileInactive() {
return this.inactiveTopicPolicies.isDeleteWhileInactive();
return this.inactiveTopicPolicies.get().isDeleteWhileInactive();
}

public boolean deletePartitionedTopicMetadataWhileInactive() {
return brokerService.pulsar().getConfiguration().isBrokerDeleteInactivePartitionedTopicMetadataEnabled();
}

public void setDeleteWhileInactive(boolean deleteWhileInactive) {
this.inactiveTopicPolicies.setDeleteWhileInactive(deleteWhileInactive);
}

protected abstract boolean isTerminated();

private static final Logger log = LoggerFactory.getLogger(AbstractTopic.class);

public InactiveTopicPolicies getInactiveTopicPolicies() {
return inactiveTopicPolicies;
}

public void resetInactiveTopicPolicies(InactiveTopicDeleteMode inactiveTopicDeleteMode
, int maxInactiveDurationSeconds, boolean deleteWhileInactive) {
inactiveTopicPolicies.setInactiveTopicDeleteMode(inactiveTopicDeleteMode);
inactiveTopicPolicies.setMaxInactiveDurationSeconds(maxInactiveDurationSeconds);
inactiveTopicPolicies.setDeleteWhileInactive(deleteWhileInactive);
return inactiveTopicPolicies.get();
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,6 @@
import org.apache.bookkeeper.mledger.Entry;
import org.apache.bookkeeper.mledger.Position;
import org.apache.pulsar.broker.PulsarServerException;
import org.apache.pulsar.broker.ServiceConfiguration;
import org.apache.pulsar.broker.resources.NamespaceResources;
import org.apache.pulsar.broker.service.AbstractReplicator;
import org.apache.pulsar.broker.service.AbstractTopic;
Expand Down Expand Up @@ -152,11 +151,8 @@ public CompletableFuture<Void> initialize() {
Policies policies = optPolicies.get();
isEncryptionRequired = policies.encryption_required;
isAllowAutoUpdateSchema = policies.is_allow_auto_update_schema;
if (policies.inactive_topic_policies != null) {
inactiveTopicPolicies = policies.inactive_topic_policies;
}
inactiveTopicPolicies.updateNamespaceValue(policies.inactive_topic_policies);
setSchemaCompatibilityStrategy(policies);

schemaValidationEnforced = policies.schema_validation_enforced;
}
});
Expand Down Expand Up @@ -866,7 +862,7 @@ public void checkGC() {
// This topic is not included in GC
return;
}
int maxInactiveDurationInSec = inactiveTopicPolicies.getMaxInactiveDurationSeconds();
int maxInactiveDurationInSec = inactiveTopicPolicies.get().getMaxInactiveDurationSeconds();
if (isActive()) {
lastActive = System.nanoTime();
} else {
Expand Down Expand Up @@ -979,14 +975,8 @@ public CompletableFuture<Void> onPoliciesUpdate(Policies data) {
});
subscriptions.forEach((subName, sub) -> sub.getConsumers().forEach(Consumer::checkPermissions));

if (data.inactive_topic_policies != null) {
this.inactiveTopicPolicies = data.inactive_topic_policies;
} else {
ServiceConfiguration cfg = brokerService.getPulsar().getConfiguration();
resetInactiveTopicPolicies(cfg.getBrokerDeleteInactiveTopicsMode()
, cfg.getBrokerDeleteInactiveTopicsMaxInactiveDurationSeconds(),
cfg.isBrokerDeleteInactiveTopicsEnabled());
}
this.inactiveTopicPolicies.updateNamespaceValue(data.inactive_topic_policies);

return checkReplicationAndRetryOnFailure();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,6 @@
import org.apache.bookkeeper.net.BookieId;
import org.apache.commons.lang3.StringUtils;
import org.apache.pulsar.broker.PulsarServerException;
import org.apache.pulsar.broker.ServiceConfiguration;
import org.apache.pulsar.broker.resources.NamespaceResources.PartitionedTopicResources;
import org.apache.pulsar.broker.service.AbstractReplicator;
import org.apache.pulsar.broker.service.AbstractTopic;
Expand Down Expand Up @@ -324,9 +323,9 @@ public CompletableFuture<Void> initialize() {
isAllowAutoUpdateSchema = policies.is_allow_auto_update_schema;

schemaValidationEnforced = policies.schema_validation_enforced;
if (policies.inactive_topic_policies != null) {
inactiveTopicPolicies = policies.inactive_topic_policies;
}

inactiveTopicPolicies.updateNamespaceValue(policies.inactive_topic_policies);

updateUnackedMessagesAppliedOnSubscription(policies);
updateUnackedMessagesExceededOnConsumer(policies);
}).exceptionally(ex -> {
Expand Down Expand Up @@ -2198,8 +2197,8 @@ public void checkGC() {
// This topic is not included in GC
return;
}
InactiveTopicDeleteMode deleteMode = inactiveTopicPolicies.getInactiveTopicDeleteMode();
int maxInactiveDurationInSec = inactiveTopicPolicies.getMaxInactiveDurationSeconds();
InactiveTopicDeleteMode deleteMode = inactiveTopicPolicies.get().getInactiveTopicDeleteMode();
int maxInactiveDurationInSec = inactiveTopicPolicies.get().getMaxInactiveDurationSeconds();
if (isActive(deleteMode)) {
lastActive = System.nanoTime();
} else if (System.nanoTime() - lastActive < TimeUnit.SECONDS.toNanos(maxInactiveDurationInSec)) {
Expand Down Expand Up @@ -2415,16 +2414,8 @@ public CompletableFuture<Void> onPoliciesUpdate(Policies data) {
}
//If the topic-level policy already exists, the namespace-level policy cannot override the topic-level policy.
Optional<TopicPolicies> topicPolicies = getTopicPolicies();
if (data.inactive_topic_policies != null) {
if (!topicPolicies.isPresent() || !topicPolicies.get().isInactiveTopicPoliciesSet()) {
this.inactiveTopicPolicies = data.inactive_topic_policies;
}
} else {
ServiceConfiguration cfg = brokerService.getPulsar().getConfiguration();
resetInactiveTopicPolicies(cfg.getBrokerDeleteInactiveTopicsMode()
, cfg.getBrokerDeleteInactiveTopicsMaxInactiveDurationSeconds(),
cfg.isBrokerDeleteInactiveTopicsEnabled());
}

inactiveTopicPolicies.updateNamespaceValue(data.inactive_topic_policies);

initializeRateLimiterIfNeeded(Optional.ofNullable(data));

Expand Down Expand Up @@ -3098,18 +3089,8 @@ public void onUpdate(TopicPolicies policies) {

maxSubscriptionsPerTopic.updateTopicValue(policies.getMaxSubscriptionsPerTopic());

if (policies.isInactiveTopicPoliciesSet()) {
inactiveTopicPolicies = policies.getInactiveTopicPolicies();
} else if (namespacePolicies.isPresent() && namespacePolicies.get().inactive_topic_policies != null) {
//topic-level policies is null , so use namespace-level
inactiveTopicPolicies = namespacePolicies.get().inactive_topic_policies;
} else {
//namespace-level policies is null , so use broker level
ServiceConfiguration cfg = brokerService.getPulsar().getConfiguration();
resetInactiveTopicPolicies(cfg.getBrokerDeleteInactiveTopicsMode()
, cfg.getBrokerDeleteInactiveTopicsMaxInactiveDurationSeconds(),
cfg.isBrokerDeleteInactiveTopicsEnabled());
}
inactiveTopicPolicies.updateTopicValue(policies.getInactiveTopicPolicies());

updateUnackedMessagesAppliedOnSubscription(namespacePolicies.orElse(null));
initializeTopicSubscribeRateLimiterIfNeeded(Optional.ofNullable(policies));
if (this.subscribeRateLimiter.isPresent()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,11 +18,6 @@
*/
package org.apache.pulsar.broker.service;

import static org.mockito.ArgumentMatchers.argThat;
import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertFalse;
import static org.testng.Assert.assertNotNull;
Expand All @@ -32,15 +27,13 @@
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.UUID;
import org.apache.pulsar.broker.service.persistent.PersistentTopic;
import org.apache.pulsar.client.api.Consumer;
import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.policies.data.InactiveTopicDeleteMode;
import org.apache.pulsar.common.policies.data.InactiveTopicPolicies;
import org.apache.pulsar.zookeeper.ZooKeeperManagedLedgerCache;
import org.awaitility.Awaitility;
import org.testng.Assert;
import org.testng.annotations.AfterMethod;
Expand Down Expand Up @@ -204,40 +197,41 @@ public void testTopicPolicyUpdateAndClean() throws Exception {

InactiveTopicPolicies policies;
//wait for zk
Awaitility.await().until(()
-> {
InactiveTopicPolicies temp = ((PersistentTopic) pulsar.getBrokerService().getTopic(topic, false).get().get()).inactiveTopicPolicies;
Awaitility.await().until(() -> {
InactiveTopicPolicies temp = ((PersistentTopic) pulsar.getBrokerService().getTopic(topic, false).get()
.get()).getInactiveTopicPolicies();
return temp.isDeleteWhileInactive();
});
policies = ((PersistentTopic) pulsar.getBrokerService().getTopic(topic, false).get().get()).inactiveTopicPolicies;
policies = ((PersistentTopic) pulsar.getBrokerService().getTopic(topic, false).get()
.get()).getInactiveTopicPolicies();

Assert.assertTrue(policies.isDeleteWhileInactive());
assertEquals(policies.getInactiveTopicDeleteMode(), InactiveTopicDeleteMode.delete_when_no_subscriptions);
assertEquals(policies.getMaxInactiveDurationSeconds(), 1);
assertEquals(policies, admin.namespaces().getInactiveTopicPolicies(namespace));

admin.namespaces().removeInactiveTopicPolicies(namespace);
Awaitility.await().until(()
-> {
InactiveTopicPolicies temp = ((PersistentTopic) pulsar.getBrokerService().getTopic(topic, false).get().get()).inactiveTopicPolicies;
Awaitility.await().until(() -> {
InactiveTopicPolicies temp = ((PersistentTopic) pulsar.getBrokerService().getTopic(topic, false).get()
.get()).getInactiveTopicPolicies();
return temp.getMaxInactiveDurationSeconds() == 1000;
});
assertEquals(((PersistentTopic) pulsar.getBrokerService().getTopic(topic, false).get().get()).inactiveTopicPolicies
, defaultPolicy);
assertEquals(((PersistentTopic) pulsar.getBrokerService().getTopic(topic, false).get().get()).getInactiveTopicPolicies(), defaultPolicy);

policies = ((PersistentTopic)pulsar.getBrokerService().getTopic(topic2,false).get().get()).inactiveTopicPolicies;
policies = ((PersistentTopic) pulsar.getBrokerService().getTopic(topic2, false).get()
.get()).getInactiveTopicPolicies();
Assert.assertTrue(policies.isDeleteWhileInactive());
assertEquals(policies.getInactiveTopicDeleteMode(), InactiveTopicDeleteMode.delete_when_subscriptions_caught_up);
assertEquals(policies.getMaxInactiveDurationSeconds(), 1);
assertEquals(policies, admin.namespaces().getInactiveTopicPolicies(namespace2));

admin.namespaces().removeInactiveTopicPolicies(namespace2);
Awaitility.await().until(()
-> {
InactiveTopicPolicies temp = ((PersistentTopic) pulsar.getBrokerService().getTopic(topic, false).get().get()).inactiveTopicPolicies;
Awaitility.await().until(() -> {
InactiveTopicPolicies temp = ((PersistentTopic) pulsar.getBrokerService().getTopic(topic, false).get()
.get()).getInactiveTopicPolicies();
return temp.getMaxInactiveDurationSeconds() == 1000;
});
assertEquals(((PersistentTopic) pulsar.getBrokerService().getTopic(topic2, false).get().get()).inactiveTopicPolicies
assertEquals(((PersistentTopic) pulsar.getBrokerService().getTopic(topic2, false).get().get()).getInactiveTopicPolicies()
, defaultPolicy);
}

Expand Down Expand Up @@ -284,7 +278,8 @@ public void testDeleteWhenNoSubscriptionsWithMultiConfig() throws Exception {

//wait for zk
Awaitility.await().until(() -> {
InactiveTopicPolicies temp = ((PersistentTopic) pulsar.getBrokerService().getTopic(topic, false).get().get()).inactiveTopicPolicies;
InactiveTopicPolicies temp = ((PersistentTopic) pulsar.getBrokerService().getTopic(topic, false).get()
.get()).getInactiveTopicPolicies();
return temp.isDeleteWhileInactive();
});

Expand Down Expand Up @@ -431,7 +426,7 @@ public void testTopicLevelInactivePolicyUpdateAndClean() throws Exception {
Awaitility.await().until(()
-> admin.topics().getInactiveTopicPolicies(topic) != null);
InactiveTopicPolicies policies = ((PersistentTopic) pulsar.getBrokerService()
.getTopic(topic, false).get().get()).inactiveTopicPolicies;
.getTopic(topic, false).get().get()).getInactiveTopicPolicies();
Assert.assertTrue(policies.isDeleteWhileInactive());
assertEquals(policies.getInactiveTopicDeleteMode(), InactiveTopicDeleteMode.delete_when_no_subscriptions);
assertEquals(policies.getMaxInactiveDurationSeconds(), 1);
Expand All @@ -441,10 +436,10 @@ public void testTopicLevelInactivePolicyUpdateAndClean() throws Exception {
//Only the broker-level policies is set, so after removing the topic-level policies
// , the topic will use the broker-level policies
Awaitility.await().untilAsserted(()
-> assertEquals(((PersistentTopic) pulsar.getBrokerService().getTopic(topic, false).get().get()).inactiveTopicPolicies
-> assertEquals(((PersistentTopic) pulsar.getBrokerService().getTopic(topic, false).get().get()).getInactiveTopicPolicies()
, defaultPolicy));

policies = ((PersistentTopic)pulsar.getBrokerService().getTopic(topic2,false).get().get()).inactiveTopicPolicies;
policies = ((PersistentTopic)pulsar.getBrokerService().getTopic(topic2,false).get().get()).getInactiveTopicPolicies();
Assert.assertTrue(policies.isDeleteWhileInactive());
assertEquals(policies.getInactiveTopicDeleteMode(), InactiveTopicDeleteMode.delete_when_subscriptions_caught_up);
assertEquals(policies.getMaxInactiveDurationSeconds(), 1);
Expand All @@ -456,15 +451,15 @@ public void testTopicLevelInactivePolicyUpdateAndClean() throws Exception {
//wait for zk
Awaitility.await().until(() -> {
InactiveTopicPolicies tempPolicies = ((PersistentTopic) pulsar.getBrokerService().getTopic(topic, false)
.get().get()).inactiveTopicPolicies;
.get().get()).getInactiveTopicPolicies();
return inactiveTopicPolicies.equals(tempPolicies);
});
admin.topics().removeInactiveTopicPolicies(topic2);
// The cache has been updated, but the system-event may not be consumed yet
// ,so wait for topic-policies update event
Awaitility.await().untilAsserted(() -> {
InactiveTopicPolicies nsPolicies = ((PersistentTopic) pulsar.getBrokerService()
.getTopic(topic2, false).get().get()).inactiveTopicPolicies;
.getTopic(topic2, false).get().get()).getInactiveTopicPolicies();
assertEquals(nsPolicies.getMaxInactiveDurationSeconds(), 999);
});

Expand Down

0 comments on commit a6d8d24

Please sign in to comment.