Skip to content

Commit

Permalink
Optimize topic policy with HierarchyTopicPolicies about dispatchRate
Browse files Browse the repository at this point in the history
  • Loading branch information
AnonHxy committed Jan 29, 2022
1 parent d22ff4f commit 626f5e0
Show file tree
Hide file tree
Showing 9 changed files with 46 additions and 13 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,7 @@
import org.apache.pulsar.common.policies.data.RetentionPolicies;
import org.apache.pulsar.common.policies.data.SchemaCompatibilityStrategy;
import org.apache.pulsar.common.policies.data.TopicPolicies;
import org.apache.pulsar.common.policies.data.impl.DispatchRateImpl;
import org.apache.pulsar.common.protocol.schema.SchemaData;
import org.apache.pulsar.common.protocol.schema.SchemaVersion;
import org.apache.pulsar.common.util.FutureUtil;
Expand Down Expand Up @@ -155,6 +156,10 @@ public SchemaCompatibilityStrategy getSchemaCompatibilityStrategy() {
return this.topicPolicies.getSchemaCompatibilityStrategy().get();
}

public DispatchRateImpl getDispatchRate() {
return this.topicPolicies.getDispatchRate().get();
}

private SchemaCompatibilityStrategy formatSchemaCompatibilityStrategy(SchemaCompatibilityStrategy strategy) {
return strategy == SchemaCompatibilityStrategy.UNDEFINED ? null : strategy;
}
Expand Down Expand Up @@ -187,6 +192,7 @@ protected void updateTopicPolicy(TopicPolicies data) {
topicPolicies.getDelayedDeliveryEnabled().updateTopicValue(data.getDelayedDeliveryEnabled());
topicPolicies.getDelayedDeliveryTickTimeMillis().updateTopicValue(data.getDelayedDeliveryTickTimeMillis());
topicPolicies.getCompactionThreshold().updateTopicValue(data.getCompactionThreshold());
topicPolicies.getDispatchRate().updateTopicValue(data.getDispatchRate());
}

protected void updateTopicPolicyByNamespacePolicy(Policies namespacePolicies) {
Expand Down Expand Up @@ -224,6 +230,16 @@ protected void updateTopicPolicyByNamespacePolicy(Policies namespacePolicies) {
type -> this.topicPolicies.getBackLogQuotaMap().get(type)
.updateNamespaceValue(MapUtils.getObject(namespacePolicies.backlog_quota_map, type)));
updateSchemaCompatibilityStrategyNamespaceValue(namespacePolicies);
updateNamespaceDispatchRate(namespacePolicies, brokerService.getPulsar().getConfig().getClusterName());
}

private void updateNamespaceDispatchRate(Policies namespacePolicies, String cluster) {
DispatchRateImpl topicDispathcRate = namespacePolicies.topicDispatchRate.get(cluster);
if (topicDispathcRate == null) {
topicPolicies.getDispatchRate().updateNamespaceValue(namespacePolicies.clusterDispatchRate.get(cluster));
} else {
topicPolicies.getDispatchRate().updateNamespaceValue(topicDispathcRate);
}
}

private void updateSchemaCompatibilityStrategyNamespaceValue(Policies namespacePolicies){
Expand Down Expand Up @@ -279,6 +295,15 @@ private void updateTopicPolicyByBrokerConfig() {
}
topicPolicies.getSchemaCompatibilityStrategy()
.updateBrokerValue(formatSchemaCompatibilityStrategy(schemaCompatibilityStrategy));
topicPolicies.getDispatchRate().updateBrokerValue(dispatchRateInBroker(config));
}

private DispatchRateImpl dispatchRateInBroker(ServiceConfiguration config) {
return DispatchRateImpl.builder()
.dispatchThrottlingRateInMsg(config.getDispatchThrottlingRatePerTopicInMsg())
.dispatchThrottlingRateInByte(config.getDispatchThrottlingRatePerTopicInByte())
.ratePeriodInSecond(1)
.build();
}

private EnumSet<SubType> subTypeStringsToEnumSet(Set<String> getSubscriptionTypesEnabled) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import java.util.concurrent.TimeUnit;
import java.util.function.Supplier;
import org.apache.pulsar.broker.ServiceConfiguration;
import org.apache.pulsar.broker.service.AbstractTopic;
import org.apache.pulsar.broker.service.BrokerService;
import org.apache.pulsar.broker.service.BrokerServiceException;
import org.apache.pulsar.common.naming.NamespaceName;
Expand Down Expand Up @@ -169,7 +170,7 @@ private DispatchRate createDispatchRate() {
* broker-level
*/
public void updateDispatchRate() {
Optional<DispatchRate> dispatchRate = getTopicPolicyDispatchRate(brokerService, topicName, type);
Optional<DispatchRate> dispatchRate = getTopicPolicyDispatchRate(brokerService, topic, type);
if (!dispatchRate.isPresent()) {
getPoliciesDispatchRateAsync(brokerService).thenAccept(dispatchRateOp -> {
if (!dispatchRateOp.isPresent()) {
Expand All @@ -194,13 +195,14 @@ public void updateDispatchRate() {
}

public static boolean isDispatchRateNeeded(BrokerService brokerService, Optional<Policies> policies,
String topicName, Type type) {
AbstractTopic topic, Type type) {
final ServiceConfiguration serviceConfig = brokerService.pulsar().getConfiguration();
final String topicName = topic.getName();
if (type == Type.BROKER) {
return brokerService.getBrokerDispatchRateLimiter().isDispatchRateLimitingEnabled();
}

Optional<DispatchRate> dispatchRate = getTopicPolicyDispatchRate(brokerService, topicName, type);
Optional<DispatchRate> dispatchRate = getTopicPolicyDispatchRate(brokerService, topic, type);
if (dispatchRate.isPresent()) {
return true;
}
Expand All @@ -210,16 +212,16 @@ public static boolean isDispatchRateNeeded(BrokerService brokerService, Optional
}

public static Optional<DispatchRate> getTopicPolicyDispatchRate(BrokerService brokerService,
String topicName, Type type) {
AbstractTopic topic, Type type) {
Optional<DispatchRate> dispatchRate = Optional.empty();
String topicName = topic.getName();
final ServiceConfiguration serviceConfiguration = brokerService.pulsar().getConfiguration();
if (serviceConfiguration.isSystemTopicEnabled() && serviceConfiguration.isTopicLevelPoliciesEnabled()) {
try {
switch (type) {
case TOPIC:
dispatchRate = Optional.ofNullable(brokerService.pulsar().getTopicPoliciesService()
.getTopicPolicies(TopicName.get(topicName)))
.map(TopicPolicies::getDispatchRate);
dispatchRate = Optional.ofNullable(
isDispatchRateEnabled(topic.getDispatchRate()) ? topic.getDispatchRate() : null);
break;
case SUBSCRIPTION:
dispatchRate = Optional.ofNullable(brokerService.pulsar().getTopicPoliciesService()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -838,7 +838,7 @@ public void updateRateLimiter(DispatchRate dispatchRate) {
@Override
public void initializeDispatchRateLimiterIfNeeded(Optional<Policies> policies) {
if (!dispatchRateLimiter.isPresent() && DispatchRateLimiter
.isDispatchRateNeeded(topic.getBrokerService(), policies, topic.getName(), Type.SUBSCRIPTION)) {
.isDispatchRateNeeded(topic.getBrokerService(), policies, topic, Type.SUBSCRIPTION)) {
this.dispatchRateLimiter = Optional.of(new DispatchRateLimiter(topic, Type.SUBSCRIPTION));
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -555,7 +555,7 @@ public void updateRateLimiter(DispatchRate dispatchRate) {
@Override
public void initializeDispatchRateLimiterIfNeeded(Optional<Policies> policies) {
if (!dispatchRateLimiter.isPresent() && DispatchRateLimiter
.isDispatchRateNeeded(topic.getBrokerService(), policies, topic.getName(), Type.SUBSCRIPTION)) {
.isDispatchRateNeeded(topic.getBrokerService(), policies, topic, Type.SUBSCRIPTION)) {
this.dispatchRateLimiter = Optional.of(new DispatchRateLimiter(topic, Type.SUBSCRIPTION));
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -698,7 +698,7 @@ public Optional<DispatchRateLimiter> getRateLimiter() {
@Override
public void initializeDispatchRateLimiterIfNeeded(Optional<Policies> policies) {
if (!dispatchRateLimiter.isPresent() && DispatchRateLimiter
.isDispatchRateNeeded(topic.getBrokerService(), policies, topic.getName(), Type.REPLICATOR)) {
.isDispatchRateNeeded(topic.getBrokerService(), policies, topic, Type.REPLICATOR)) {
this.dispatchRateLimiter = Optional.of(new DispatchRateLimiter(topic, Type.REPLICATOR));
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -359,7 +359,7 @@ private void initializeRateLimiterIfNeeded(Optional<Policies> policies) {
synchronized (dispatchRateLimiter) {
// dispatch rate limiter for topic
if (!dispatchRateLimiter.isPresent() && DispatchRateLimiter
.isDispatchRateNeeded(brokerService, policies, topic, Type.TOPIC)) {
.isDispatchRateNeeded(brokerService, policies, this, Type.TOPIC)) {
this.dispatchRateLimiter = Optional.of(new DispatchRateLimiter(this, Type.TOPIC));
}
boolean isDispatchRateNeeded = SubscribeRateLimiter.isDispatchRateNeeded(brokerService, policies, topic);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,13 +43,15 @@
import org.apache.bookkeeper.mledger.impl.EntryImpl;
import org.apache.pulsar.broker.PulsarService;
import org.apache.pulsar.broker.ServiceConfiguration;
import org.apache.pulsar.broker.service.AbstractTopic;
import org.apache.pulsar.broker.service.BrokerService;
import org.apache.pulsar.broker.service.BrokerServiceException;
import org.apache.pulsar.broker.service.Consumer;
import org.apache.pulsar.broker.service.EntryBatchSizes;
import org.apache.pulsar.broker.service.HashRangeAutoSplitStickyKeyConsumerSelector;
import org.apache.pulsar.broker.service.RedeliveryTracker;
import org.apache.pulsar.broker.service.persistent.DispatchRateLimiter;
import org.apache.pulsar.broker.service.persistent.PersistentTopic;
import org.apache.pulsar.common.api.proto.MessageMetadata;
import org.apache.pulsar.common.policies.data.HierarchyTopicPolicies;
import org.apache.pulsar.common.protocol.Commands;
Expand Down Expand Up @@ -108,7 +110,7 @@ public void setup() throws Exception {
PowerMockito.when(DispatchRateLimiter.isDispatchRateNeeded(
any(BrokerService.class),
any(Optional.class),
anyString(),
any(AbstractTopic.class),
any(DispatchRateLimiter.Type.class))
).thenReturn(false);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@
import org.apache.bookkeeper.mledger.impl.PositionImpl;
import org.apache.pulsar.broker.PulsarService;
import org.apache.pulsar.broker.ServiceConfiguration;
import org.apache.pulsar.broker.service.AbstractTopic;
import org.apache.pulsar.broker.service.BrokerService;
import org.apache.pulsar.broker.service.Consumer;
import org.apache.pulsar.broker.service.EntryBatchIndexesAcks;
Expand Down Expand Up @@ -150,7 +151,7 @@ public void setup() throws Exception {
PowerMockito.when(DispatchRateLimiter.isDispatchRateNeeded(
any(BrokerService.class),
any(Optional.class),
anyString(),
any(AbstractTopic.class),
any(DispatchRateLimiter.Type.class))
).thenReturn(false);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import lombok.Getter;
import org.apache.pulsar.common.api.proto.CommandSubscribe.SubType;
import org.apache.pulsar.common.policies.data.BacklogQuota.BacklogQuotaType;
import org.apache.pulsar.common.policies.data.impl.DispatchRateImpl;

/**
* Topic policy hierarchy value container.
Expand All @@ -50,6 +51,7 @@ public class HierarchyTopicPolicies {
final PolicyHierarchyValue<Long> delayedDeliveryTickTimeMillis;
final PolicyHierarchyValue<Integer> maxConsumersPerSubscription;
final PolicyHierarchyValue<SchemaCompatibilityStrategy> schemaCompatibilityStrategy;
final PolicyHierarchyValue<DispatchRateImpl> dispatchRate;

public HierarchyTopicPolicies() {
replicationClusters = new PolicyHierarchyValue<>();
Expand All @@ -73,5 +75,6 @@ public HierarchyTopicPolicies() {
delayedDeliveryTickTimeMillis = new PolicyHierarchyValue<>();
compactionThreshold = new PolicyHierarchyValue<>();
schemaCompatibilityStrategy = new PolicyHierarchyValue<>();
dispatchRate = new PolicyHierarchyValue<>();
}
}

0 comments on commit 626f5e0

Please sign in to comment.