Skip to content

Commit

Permalink
Optimize topic policy with HierarchyTopicPolicies about dispatchRate
Browse files Browse the repository at this point in the history
  • Loading branch information
AnonHxy committed Feb 15, 2022
1 parent b22f706 commit f9297ea
Show file tree
Hide file tree
Showing 5 changed files with 44 additions and 15 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -160,6 +160,10 @@ public SchemaCompatibilityStrategy getSchemaCompatibilityStrategy() {
return this.topicPolicies.getSchemaCompatibilityStrategy().get();
}

public DispatchRateImpl getDispatchRate() {
return this.topicPolicies.getDispatchRate().get();
}

private SchemaCompatibilityStrategy formatSchemaCompatibilityStrategy(SchemaCompatibilityStrategy strategy) {
return strategy == SchemaCompatibilityStrategy.UNDEFINED ? null : strategy;
}
Expand Down Expand Up @@ -195,6 +199,7 @@ protected void updateTopicPolicy(TopicPolicies data) {
topicPolicies.getDelayedDeliveryTickTimeMillis().updateTopicValue(data.getDelayedDeliveryTickTimeMillis());
topicPolicies.getSubscriptionDispatchRate().updateTopicValue(normalize(data.getSubscriptionDispatchRate()));
topicPolicies.getCompactionThreshold().updateTopicValue(data.getCompactionThreshold());
topicPolicies.getDispatchRate().updateTopicValue(normalize(data.getDispatchRate()));
}

protected void updateTopicPolicyByNamespacePolicy(Policies namespacePolicies) {
Expand Down Expand Up @@ -236,6 +241,15 @@ protected void updateTopicPolicyByNamespacePolicy(Policies namespacePolicies) {
updateNamespaceSubscriptionDispatchRate(namespacePolicies,
brokerService.getPulsar().getConfig().getClusterName());
updateSchemaCompatibilityStrategyNamespaceValue(namespacePolicies);
updateNamespaceDispatchRate(namespacePolicies, brokerService.getPulsar().getConfig().getClusterName());
}

private void updateNamespaceDispatchRate(Policies namespacePolicies, String cluster) {
DispatchRateImpl dispatchRate = namespacePolicies.topicDispatchRate.get(cluster);
if (dispatchRate == null) {
dispatchRate = namespacePolicies.clusterDispatchRate.get(cluster);
}
topicPolicies.getDispatchRate().updateNamespaceValue(normalize(dispatchRate));
}

private void updateNamespaceSubscriptionDispatchRate(Policies namespacePolicies, String cluster) {
Expand Down Expand Up @@ -309,6 +323,15 @@ private void updateTopicPolicyByBrokerConfig() {
topicPolicies.getSubscriptionDispatchRate().updateBrokerValue(subscriptionDispatchRateInBroker(config));
topicPolicies.getSchemaCompatibilityStrategy()
.updateBrokerValue(formatSchemaCompatibilityStrategy(schemaCompatibilityStrategy));
topicPolicies.getDispatchRate().updateBrokerValue(dispatchRateInBroker(config));
}

private DispatchRateImpl dispatchRateInBroker(ServiceConfiguration config) {
return DispatchRateImpl.builder()
.dispatchThrottlingRateInMsg(config.getDispatchThrottlingRatePerTopicInMsg())
.dispatchThrottlingRateInByte(config.getDispatchThrottlingRatePerTopicInByte())
.ratePeriodInSecond(1)
.build();
}

private DispatchRateImpl subscriptionDispatchRateInBroker(ServiceConfiguration config) {
Expand Down Expand Up @@ -1131,8 +1154,14 @@ public void updateBrokerSubscriptionTypesEnabled() {
subTypeStringsToEnumSet(brokerService.pulsar().getConfiguration().getSubscriptionTypesEnabled()));
}


public void updateBrokerSubscriptionDispatchRate() {
topicPolicies.getSubscriptionDispatchRate().updateBrokerValue(
subscriptionDispatchRateInBroker(brokerService.pulsar().getConfiguration()));
}

public void updateBrokerDispatchRate() {
topicPolicies.getDispatchRate().updateBrokerValue(
dispatchRateInBroker(brokerService.pulsar().getConfiguration()));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -2237,6 +2237,9 @@ private void updateTopicMessageDispatchRate() {
this.pulsar().getExecutor().execute(() -> {
// update message-rate for each topic
forEachTopic(topic -> {
if (topic instanceof AbstractTopic) {
((AbstractTopic) topic).updateBrokerDispatchRate();
}
if (topic.getDispatchRateLimiter().isPresent()) {
topic.getDispatchRateLimiter().get().updateDispatchRate();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -170,6 +170,9 @@ private DispatchRate createDispatchRate() {
*/
public void updateDispatchRate() {
switch (type) {
case TOPIC:
updateDispatchRate(topic.getDispatchRate());
return;
case SUBSCRIPTION:
updateDispatchRate(topic.getSubscriptionDispatchRate());
return;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -258,7 +258,6 @@ public PersistentTopic(String topic, ManagedLedger ledger, BrokerService brokerS
this.replicators = new ConcurrentOpenHashMap<>(16, 1);
this.backloggedCursorThresholdEntries =
brokerService.pulsar().getConfiguration().getManagedLedgerCursorBackloggedThreshold();
initializeRateLimiterIfNeeded(Optional.empty());
registerTopicPolicyListener();

this.compactedTopic = new CompactedTopicImpl(brokerService.pulsar().getBookKeeperClient());
Expand Down Expand Up @@ -311,13 +310,16 @@ public CompletableFuture<Void> initialize() {
.thenAccept(optPolicies -> {
if (!optPolicies.isPresent()) {
isEncryptionRequired = false;
initializeRateLimiterIfNeeded(Optional.empty());
return;
}

Policies policies = optPolicies.get();

this.updateTopicPolicyByNamespacePolicy(policies);

initializeRateLimiterIfNeeded(Optional.empty());

this.isEncryptionRequired = policies.encryption_required;

isAllowAutoUpdateSchema = policies.is_allow_auto_update_schema;
Expand Down Expand Up @@ -355,8 +357,8 @@ public CompletableFuture<Void> initialize() {
private void initializeRateLimiterIfNeeded(Optional<Policies> policies) {
synchronized (dispatchRateLimiter) {
// dispatch rate limiter for topic
if (!dispatchRateLimiter.isPresent() && DispatchRateLimiter
.isDispatchRateNeeded(brokerService, policies, topic, Type.TOPIC)) {
if (!dispatchRateLimiter.isPresent()
&& DispatchRateLimiter.isDispatchRateEnabled(topicPolicies.getDispatchRate().get())) {
this.dispatchRateLimiter = Optional.of(new DispatchRateLimiter(this, Type.TOPIC));
}
boolean isDispatchRateNeeded = SubscribeRateLimiter.isDispatchRateNeeded(brokerService, policies, topic);
Expand Down Expand Up @@ -2418,11 +2420,7 @@ public CompletableFuture<Void> onPoliciesUpdate(Policies data) {
CompletableFuture<Void> dedupFuture = checkDeduplicationStatus();
CompletableFuture<Void> persistentPoliciesFuture = checkPersistencePolicies();
// update rate-limiter if policies updated
if (this.dispatchRateLimiter.isPresent()) {
if (!topicPolicies.isPresent() || !topicPolicies.get().isDispatchRateSet()) {
dispatchRateLimiter.get().onPoliciesUpdate(data);
}
}
dispatchRateLimiter.ifPresent(DispatchRateLimiter::updateDispatchRate);
if (this.subscribeRateLimiter.isPresent()) {
subscribeRateLimiter.get().onPoliciesUpdate(data);
}
Expand Down Expand Up @@ -3018,13 +3016,7 @@ public void onUpdate(TopicPolicies policies) {
Optional<Policies> namespacePolicies = getNamespacePolicies();
initializeTopicDispatchRateLimiterIfNeeded(policies);

dispatchRateLimiter.ifPresent(limiter -> {
if (policies.isDispatchRateSet()) {
dispatchRateLimiter.get().updateDispatchRate(policies.getDispatchRate());
} else {
dispatchRateLimiter.get().updateDispatchRate();
}
});
dispatchRateLimiter.ifPresent(DispatchRateLimiter::updateDispatchRate);

List<CompletableFuture<Void>> consumerCheckFutures = new ArrayList<>();
subscriptions.forEach((subName, sub) -> sub.getConsumers().forEach(consumer -> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@ public class HierarchyTopicPolicies {
final PolicyHierarchyValue<Integer> maxConsumersPerSubscription;
final PolicyHierarchyValue<DispatchRateImpl> subscriptionDispatchRate;
final PolicyHierarchyValue<SchemaCompatibilityStrategy> schemaCompatibilityStrategy;
final PolicyHierarchyValue<DispatchRateImpl> dispatchRate;

public HierarchyTopicPolicies() {
replicationClusters = new PolicyHierarchyValue<>();
Expand All @@ -78,5 +79,6 @@ public HierarchyTopicPolicies() {
compactionThreshold = new PolicyHierarchyValue<>();
subscriptionDispatchRate = new PolicyHierarchyValue<>();
schemaCompatibilityStrategy = new PolicyHierarchyValue<>();
dispatchRate = new PolicyHierarchyValue<>();
}
}

0 comments on commit f9297ea

Please sign in to comment.