Skip to content

Commit

Permalink
Optimize topic policy with HierarchyTopicPolicies about dispatchRate
Browse files Browse the repository at this point in the history
  • Loading branch information
AnonHxy committed Feb 1, 2022
1 parent d22ff4f commit dfde011
Show file tree
Hide file tree
Showing 4 changed files with 66 additions and 0 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,7 @@
import org.apache.pulsar.common.policies.data.RetentionPolicies;
import org.apache.pulsar.common.policies.data.SchemaCompatibilityStrategy;
import org.apache.pulsar.common.policies.data.TopicPolicies;
import org.apache.pulsar.common.policies.data.impl.DispatchRateImpl;
import org.apache.pulsar.common.protocol.schema.SchemaData;
import org.apache.pulsar.common.protocol.schema.SchemaVersion;
import org.apache.pulsar.common.util.FutureUtil;
Expand Down Expand Up @@ -155,6 +156,10 @@ public SchemaCompatibilityStrategy getSchemaCompatibilityStrategy() {
return this.topicPolicies.getSchemaCompatibilityStrategy().get();
}

public DispatchRateImpl getDispatchRate() {
return this.topicPolicies.getDispatchRate().get();
}

private SchemaCompatibilityStrategy formatSchemaCompatibilityStrategy(SchemaCompatibilityStrategy strategy) {
return strategy == SchemaCompatibilityStrategy.UNDEFINED ? null : strategy;
}
Expand Down Expand Up @@ -187,6 +192,7 @@ protected void updateTopicPolicy(TopicPolicies data) {
topicPolicies.getDelayedDeliveryEnabled().updateTopicValue(data.getDelayedDeliveryEnabled());
topicPolicies.getDelayedDeliveryTickTimeMillis().updateTopicValue(data.getDelayedDeliveryTickTimeMillis());
topicPolicies.getCompactionThreshold().updateTopicValue(data.getCompactionThreshold());
topicPolicies.getDispatchRate().updateTopicValue(normalize(data.getDispatchRate()));
}

protected void updateTopicPolicyByNamespacePolicy(Policies namespacePolicies) {
Expand Down Expand Up @@ -224,6 +230,25 @@ protected void updateTopicPolicyByNamespacePolicy(Policies namespacePolicies) {
type -> this.topicPolicies.getBackLogQuotaMap().get(type)
.updateNamespaceValue(MapUtils.getObject(namespacePolicies.backlog_quota_map, type)));
updateSchemaCompatibilityStrategyNamespaceValue(namespacePolicies);
updateNamespaceDispatchRate(namespacePolicies, brokerService.getPulsar().getConfig().getClusterName());
}

private void updateNamespaceDispatchRate(Policies namespacePolicies, String cluster) {
DispatchRateImpl dispatchRate = namespacePolicies.topicDispatchRate.get(cluster);
if (dispatchRate == null) {
dispatchRate = namespacePolicies.clusterDispatchRate.get(cluster);
}
topicPolicies.getDispatchRate().updateNamespaceValue(normalize(dispatchRate));
}

private DispatchRateImpl normalize(DispatchRateImpl dispatchRate) {
if (dispatchRate != null
&& (dispatchRate.getDispatchThrottlingRateInMsg() > 0
|| dispatchRate.getDispatchThrottlingRateInByte() > 0)) {
return dispatchRate;
} else {
return null;
}
}

private void updateSchemaCompatibilityStrategyNamespaceValue(Policies namespacePolicies){
Expand Down Expand Up @@ -279,6 +304,16 @@ private void updateTopicPolicyByBrokerConfig() {
}
topicPolicies.getSchemaCompatibilityStrategy()
.updateBrokerValue(formatSchemaCompatibilityStrategy(schemaCompatibilityStrategy));
topicPolicies.getDispatchRate().updateBrokerValue(normalizedDispatchRateInBroker(config));
}

private DispatchRateImpl normalizedDispatchRateInBroker(ServiceConfiguration config) {
return normalize(
DispatchRateImpl.builder()
.dispatchThrottlingRateInMsg(config.getDispatchThrottlingRatePerTopicInMsg())
.dispatchThrottlingRateInByte(config.getDispatchThrottlingRatePerTopicInByte())
.ratePeriodInSecond(1)
.build());
}

private EnumSet<SubType> subTypeStringsToEnumSet(Set<String> getSubscriptionTypesEnabled) {
Expand Down Expand Up @@ -1092,4 +1127,9 @@ public void updateBrokerSubscriptionTypesEnabled() {
topicPolicies.getSubscriptionTypesEnabled().updateBrokerValue(
subTypeStringsToEnumSet(brokerService.pulsar().getConfiguration().getSubscriptionTypesEnabled()));
}

public void updateBrokerDispatchRate() {
topicPolicies.getDispatchRate().updateBrokerValue(
normalizedDispatchRateInBroker(brokerService.pulsar().getConfiguration()));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -2221,6 +2221,9 @@ private void updateTopicMessageDispatchRate() {
this.pulsar().getExecutor().execute(() -> {
// update message-rate for each topic
forEachTopic(topic -> {
if (topic instanceof AbstractTopic) {
((AbstractTopic) topic).updateBrokerDispatchRate();
}
if (topic.getDispatchRateLimiter().isPresent()) {
topic.getDispatchRateLimiter().get().updateDispatchRate();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -169,6 +169,26 @@ private DispatchRate createDispatchRate() {
* broker-level
*/
public void updateDispatchRate() {
final ServiceConfiguration serviceConfiguration = brokerService.pulsar().getConfiguration();
DispatchRate dispatchRateValue = null;
if (serviceConfiguration.isSystemTopicEnabled() && serviceConfiguration.isTopicLevelPoliciesEnabled()) {
switch (type) {
case TOPIC:
dispatchRateValue = topic.getDispatchRate();
}
}
if (type == Type.BROKER) {
dispatchRateValue = createDispatchRate();
log.info("configured broker message-dispatch rate {}", dispatchRateValue);
} else {
log.info("[{}] configured {} message-dispatch rate at broker {}", topicName, type, dispatchRateValue);
}

if (dispatchRateValue != null) {
updateDispatchRate(dispatchRateValue);
return;
}

Optional<DispatchRate> dispatchRate = getTopicPolicyDispatchRate(brokerService, topicName, type);
if (!dispatchRate.isPresent()) {
getPoliciesDispatchRateAsync(brokerService).thenAccept(dispatchRateOp -> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import lombok.Getter;
import org.apache.pulsar.common.api.proto.CommandSubscribe.SubType;
import org.apache.pulsar.common.policies.data.BacklogQuota.BacklogQuotaType;
import org.apache.pulsar.common.policies.data.impl.DispatchRateImpl;

/**
* Topic policy hierarchy value container.
Expand All @@ -50,6 +51,7 @@ public class HierarchyTopicPolicies {
final PolicyHierarchyValue<Long> delayedDeliveryTickTimeMillis;
final PolicyHierarchyValue<Integer> maxConsumersPerSubscription;
final PolicyHierarchyValue<SchemaCompatibilityStrategy> schemaCompatibilityStrategy;
final PolicyHierarchyValue<DispatchRateImpl> dispatchRate;

public HierarchyTopicPolicies() {
replicationClusters = new PolicyHierarchyValue<>();
Expand All @@ -73,5 +75,6 @@ public HierarchyTopicPolicies() {
delayedDeliveryTickTimeMillis = new PolicyHierarchyValue<>();
compactionThreshold = new PolicyHierarchyValue<>();
schemaCompatibilityStrategy = new PolicyHierarchyValue<>();
dispatchRate = new PolicyHierarchyValue<>();
}
}

0 comments on commit dfde011

Please sign in to comment.