Skip to content

Commit

Permalink
Optimize topic policy with HierarchyTopicPolicies about subscriptionD…
Browse files Browse the repository at this point in the history
…ispatchRate (apache#14151)
  • Loading branch information
AnonHxy authored and nicklixinyang committed Apr 20, 2022
1 parent 29b510f commit 2c046d3
Show file tree
Hide file tree
Showing 9 changed files with 89 additions and 41 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,7 @@
import org.apache.pulsar.common.policies.data.RetentionPolicies;
import org.apache.pulsar.common.policies.data.SchemaCompatibilityStrategy;
import org.apache.pulsar.common.policies.data.TopicPolicies;
import org.apache.pulsar.common.policies.data.impl.DispatchRateImpl;
import org.apache.pulsar.common.protocol.schema.SchemaData;
import org.apache.pulsar.common.protocol.schema.SchemaVersion;
import org.apache.pulsar.common.util.FutureUtil;
Expand Down Expand Up @@ -151,6 +152,10 @@ public AbstractTopic(String topic, BrokerService brokerService) {
updatePublishDispatcher(Optional.empty());
}

public DispatchRateImpl getSubscriptionDispatchRate() {
return this.topicPolicies.getSubscriptionDispatchRate().get();
}

public SchemaCompatibilityStrategy getSchemaCompatibilityStrategy() {
return this.topicPolicies.getSchemaCompatibilityStrategy().get();
}
Expand Down Expand Up @@ -188,6 +193,7 @@ protected void updateTopicPolicy(TopicPolicies data) {
topicPolicies.getMessageTTLInSeconds().updateTopicValue(data.getMessageTTLInSeconds());
topicPolicies.getDelayedDeliveryEnabled().updateTopicValue(data.getDelayedDeliveryEnabled());
topicPolicies.getDelayedDeliveryTickTimeMillis().updateTopicValue(data.getDelayedDeliveryTickTimeMillis());
topicPolicies.getSubscriptionDispatchRate().updateTopicValue(normalize(data.getSubscriptionDispatchRate()));
topicPolicies.getCompactionThreshold().updateTopicValue(data.getCompactionThreshold());
}

Expand Down Expand Up @@ -227,9 +233,26 @@ protected void updateTopicPolicyByNamespacePolicy(Policies namespacePolicies) {
Arrays.stream(BacklogQuota.BacklogQuotaType.values()).forEach(
type -> this.topicPolicies.getBackLogQuotaMap().get(type)
.updateNamespaceValue(MapUtils.getObject(namespacePolicies.backlog_quota_map, type)));
updateNamespaceSubscriptionDispatchRate(namespacePolicies,
brokerService.getPulsar().getConfig().getClusterName());
updateSchemaCompatibilityStrategyNamespaceValue(namespacePolicies);
}

private void updateNamespaceSubscriptionDispatchRate(Policies namespacePolicies, String cluster) {
topicPolicies.getSubscriptionDispatchRate()
.updateNamespaceValue(normalize(namespacePolicies.subscriptionDispatchRate.get(cluster)));
}

private DispatchRateImpl normalize(DispatchRateImpl dispatchRate) {
if (dispatchRate != null
&& (dispatchRate.getDispatchThrottlingRateInMsg() > 0
|| dispatchRate.getDispatchThrottlingRateInByte() > 0)) {
return dispatchRate;
} else {
return null;
}
}

private void updateSchemaCompatibilityStrategyNamespaceValue(Policies namespacePolicies){
if (isSystemTopic()) {
return;
Expand Down Expand Up @@ -283,10 +306,19 @@ private void updateTopicPolicyByBrokerConfig() {
if (isSystemTopic()) {
schemaCompatibilityStrategy = config.getSystemTopicSchemaCompatibilityStrategy();
}
topicPolicies.getSubscriptionDispatchRate().updateBrokerValue(subscriptionDispatchRateInBroker(config));
topicPolicies.getSchemaCompatibilityStrategy()
.updateBrokerValue(formatSchemaCompatibilityStrategy(schemaCompatibilityStrategy));
}

private DispatchRateImpl subscriptionDispatchRateInBroker(ServiceConfiguration config) {
return DispatchRateImpl.builder()
.dispatchThrottlingRateInMsg(config.getDispatchThrottlingRatePerSubscriptionInMsg())
.dispatchThrottlingRateInByte(config.getDispatchThrottlingRatePerSubscriptionInByte())
.ratePeriodInSecond(1)
.build();
}

private EnumSet<SubType> subTypeStringsToEnumSet(Set<String> getSubscriptionTypesEnabled) {
EnumSet<SubType> subTypes = EnumSet.noneOf(SubType.class);
for (String subTypeStr : CollectionUtils.emptyIfNull(getSubscriptionTypesEnabled)) {
Expand Down Expand Up @@ -1098,4 +1130,9 @@ public void updateBrokerSubscriptionTypesEnabled() {
topicPolicies.getSubscriptionTypesEnabled().updateBrokerValue(
subTypeStringsToEnumSet(brokerService.pulsar().getConfiguration().getSubscriptionTypesEnabled()));
}

public void updateBrokerSubscriptionDispatchRate() {
topicPolicies.getSubscriptionDispatchRate().updateBrokerValue(
subscriptionDispatchRateInBroker(brokerService.pulsar().getConfiguration()));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -2259,6 +2259,9 @@ private void updateSubscriptionMessageDispatchRate() {
this.pulsar().getExecutor().submit(() -> {
// update message-rate for each topic subscription
forEachTopic(topic -> {
if (topic instanceof AbstractTopic) {
((AbstractTopic) topic).updateBrokerSubscriptionDispatchRate();
}
topic.getSubscriptions().forEach((subName, persistentSubscription) -> {
Dispatcher dispatcher = persistentSubscription.getDispatcher();
if (dispatcher != null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,6 @@
import org.apache.pulsar.broker.service.persistent.DispatchRateLimiter;
import org.apache.pulsar.common.api.proto.CommandSubscribe.SubType;
import org.apache.pulsar.common.api.proto.MessageMetadata;
import org.apache.pulsar.common.policies.data.DispatchRate;
import org.apache.pulsar.common.policies.data.Policies;

public interface Dispatcher {
void addConsumer(Consumer consumer) throws BrokerServiceException;
Expand Down Expand Up @@ -92,12 +90,12 @@ default Optional<DispatchRateLimiter> getRateLimiter() {
return Optional.empty();
}

default void updateRateLimiter(DispatchRate dispatchRate) {

default void updateRateLimiter() {
//No-op
}

default void initializeDispatchRateLimiterIfNeeded(Optional<Policies> policies) {
//No-op
default boolean initializeDispatchRateLimiterIfNeeded() {
return false;
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -169,6 +169,12 @@ private DispatchRate createDispatchRate() {
* broker-level
*/
public void updateDispatchRate() {
switch (type) {
case SUBSCRIPTION:
updateDispatchRate(topic.getSubscriptionDispatchRate());
return;
}

Optional<DispatchRate> dispatchRate = getTopicPolicyDispatchRate(brokerService, topicName, type);
if (!dispatchRate.isPresent()) {
getPoliciesDispatchRateAsync(brokerService).thenAccept(dispatchRateOp -> {
Expand Down Expand Up @@ -457,7 +463,7 @@ public long getDispatchRateOnByte() {
}


private static boolean isDispatchRateEnabled(DispatchRate dispatchRate) {
public static boolean isDispatchRateEnabled(DispatchRate dispatchRate) {
return dispatchRate != null && (dispatchRate.getDispatchThrottlingRateInMsg() > 0
|| dispatchRate.getDispatchThrottlingRateInByte() > 0);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,8 +59,6 @@
import org.apache.pulsar.client.impl.Backoff;
import org.apache.pulsar.common.api.proto.CommandSubscribe.SubType;
import org.apache.pulsar.common.api.proto.MessageMetadata;
import org.apache.pulsar.common.policies.data.DispatchRate;
import org.apache.pulsar.common.policies.data.Policies;
import org.apache.pulsar.common.util.Codec;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand Down Expand Up @@ -128,7 +126,7 @@ public PersistentDispatcherMultipleConsumers(PersistentTopic topic, ManagedCurso
? new InMemoryRedeliveryTracker()
: RedeliveryTrackerDisabled.REDELIVERY_TRACKER_DISABLED;
this.readBatchSize = serviceConfig.getDispatcherMaxReadBatchSize();
this.initializeDispatchRateLimiterIfNeeded(Optional.empty());
this.initializeDispatchRateLimiterIfNeeded();
}

@Override
Expand Down Expand Up @@ -822,25 +820,20 @@ public Optional<DispatchRateLimiter> getRateLimiter() {
}

@Override
public void updateRateLimiter(DispatchRate dispatchRate) {
if (!this.dispatchRateLimiter.isPresent() && dispatchRate != null) {
this.dispatchRateLimiter = Optional.of(new DispatchRateLimiter(topic, Type.SUBSCRIPTION));
public void updateRateLimiter() {
if (!initializeDispatchRateLimiterIfNeeded()) {
this.dispatchRateLimiter.ifPresent(DispatchRateLimiter::updateDispatchRate);
}
this.dispatchRateLimiter.ifPresent(limiter -> {
if (dispatchRate != null) {
this.dispatchRateLimiter.get().updateDispatchRate(dispatchRate);
} else {
this.dispatchRateLimiter.get().updateDispatchRate();
}
});
}

@Override
public void initializeDispatchRateLimiterIfNeeded(Optional<Policies> policies) {
if (!dispatchRateLimiter.isPresent() && DispatchRateLimiter
.isDispatchRateNeeded(topic.getBrokerService(), policies, topic.getName(), Type.SUBSCRIPTION)) {
public boolean initializeDispatchRateLimiterIfNeeded() {
if (!dispatchRateLimiter.isPresent()
&& DispatchRateLimiter.isDispatchRateEnabled(topic.getSubscriptionDispatchRate())) {
this.dispatchRateLimiter = Optional.of(new DispatchRateLimiter(topic, Type.SUBSCRIPTION));
return true;
}
return false;
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,8 +50,6 @@
import org.apache.pulsar.broker.transaction.exception.buffer.TransactionBufferException;
import org.apache.pulsar.client.impl.Backoff;
import org.apache.pulsar.common.api.proto.CommandSubscribe.SubType;
import org.apache.pulsar.common.policies.data.DispatchRate;
import org.apache.pulsar.common.policies.data.Policies;
import org.apache.pulsar.common.util.Codec;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand Down Expand Up @@ -84,7 +82,7 @@ public PersistentDispatcherSingleActiveConsumer(ManagedCursor cursor, SubType su
TimeUnit.MILLISECONDS, serviceConfig.getDispatcherReadFailureBackoffMandatoryStopTimeInMs(),
TimeUnit.MILLISECONDS);
this.redeliveryTracker = RedeliveryTrackerDisabled.REDELIVERY_TRACKER_DISABLED;
this.initializeDispatchRateLimiterIfNeeded(Optional.empty());
this.initializeDispatchRateLimiterIfNeeded();
}

protected void scheduleReadOnActiveConsumer() {
Expand Down Expand Up @@ -551,25 +549,20 @@ public Optional<DispatchRateLimiter> getRateLimiter() {
}

@Override
public void updateRateLimiter(DispatchRate dispatchRate) {
if (!this.dispatchRateLimiter.isPresent() && dispatchRate != null) {
this.dispatchRateLimiter = Optional.of(new DispatchRateLimiter(topic, Type.SUBSCRIPTION));
public void updateRateLimiter() {
if (!initializeDispatchRateLimiterIfNeeded()) {
this.dispatchRateLimiter.ifPresent(DispatchRateLimiter::updateDispatchRate);
}
this.dispatchRateLimiter.ifPresent(limiter -> {
if (dispatchRate != null) {
this.dispatchRateLimiter.get().updateDispatchRate(dispatchRate);
} else {
this.dispatchRateLimiter.get().updateDispatchRate();
}
});
}

@Override
public void initializeDispatchRateLimiterIfNeeded(Optional<Policies> policies) {
if (!dispatchRateLimiter.isPresent() && DispatchRateLimiter
.isDispatchRateNeeded(topic.getBrokerService(), policies, topic.getName(), Type.SUBSCRIPTION)) {
public boolean initializeDispatchRateLimiterIfNeeded() {
if (!dispatchRateLimiter.isPresent()
&& DispatchRateLimiter.isDispatchRateEnabled(topic.getSubscriptionDispatchRate())) {
this.dispatchRateLimiter = Optional.of(new DispatchRateLimiter(topic, Type.SUBSCRIPTION));
return true;
}
return false;
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -370,7 +370,7 @@ private void initializeRateLimiterIfNeeded(Optional<Policies> policies) {
subscriptions.forEach((name, subscription) -> {
Dispatcher dispatcher = subscription.getDispatcher();
if (dispatcher != null) {
dispatcher.initializeDispatchRateLimiterIfNeeded(policies);
dispatcher.initializeDispatchRateLimiterIfNeeded();
}
});

Expand Down Expand Up @@ -3031,7 +3031,7 @@ public void onUpdate(TopicPolicies policies) {
consumerCheckFutures.add(consumer.checkPermissionsAsync().thenRun(() -> {
Dispatcher dispatcher = sub.getDispatcher();
if (dispatcher != null) {
dispatcher.updateRateLimiter(policies.getSubscriptionDispatchRate());
dispatcher.updateRateLimiter();
}
}));
}));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1385,6 +1385,9 @@ public void testProducerSuccessOnEncryptionRequiredTopic() throws Exception {
// add `clusterDispatchRate` otherwise there will be a NPE
// `org.apache.pulsar.broker.service.persistent.DispatchRateLimiter.getPoliciesDispatchRate`
policies.clusterDispatchRate = Maps.newHashMap();
// add `clusterDispatchRate` otherwise there will be a NPE
// `org.apache.pulsar.broker.service.AbstractTopic.updateNamespaceSubscriptionDispatchRate`
policies.subscriptionDispatchRate = Maps.newHashMap();
doReturn(CompletableFuture.completedFuture(Optional.of(policies))).when(namespaceResources)
.getPoliciesAsync(TopicName.get(encryptionRequiredTopicName).getNamespaceObject());

Expand Down Expand Up @@ -1414,6 +1417,9 @@ public void testProducerFailureOnEncryptionRequiredTopic() throws Exception {
// add `clusterDispatchRate` otherwise there will be a NPE
// `org.apache.pulsar.broker.service.persistent.DispatchRateLimiter.getPoliciesDispatchRate`
policies.clusterDispatchRate = Maps.newHashMap();
// add `clusterDispatchRate` otherwise there will be a NPE
// `org.apache.pulsar.broker.service.AbstractTopic.updateNamespaceSubscriptionDispatchRate`
policies.subscriptionDispatchRate = Maps.newHashMap();
doReturn(CompletableFuture.completedFuture(Optional.of(policies))).when(namespaceResources)
.getPoliciesAsync(TopicName.get(encryptionRequiredTopicName).getNamespaceObject());

Expand Down Expand Up @@ -1447,6 +1453,9 @@ public void testProducerFailureOnEncryptionRequiredOnBroker() throws Exception {
policies.topicDispatchRate = Maps.newHashMap();
// add `clusterDispatchRate` otherwise there will be a NPE
policies.clusterDispatchRate = Maps.newHashMap();
// add `clusterDispatchRate` otherwise there will be a NPE
// `org.apache.pulsar.broker.service.AbstractTopic.updateNamespaceSubscriptionDispatchRate`
policies.subscriptionDispatchRate = Maps.newHashMap();
doReturn(CompletableFuture.completedFuture(Optional.of(policies))).when(namespaceResources)
.getPoliciesAsync(TopicName.get(encryptionRequiredTopicName).getNamespaceObject());

Expand Down Expand Up @@ -1478,6 +1487,9 @@ public void testSendSuccessOnEncryptionRequiredTopic() throws Exception {
// add `clusterDispatchRate` otherwise there will be a NPE
// `org.apache.pulsar.broker.service.persistent.DispatchRateLimiter.getPoliciesDispatchRate`
policies.clusterDispatchRate = Maps.newHashMap();
// add `clusterDispatchRate` otherwise there will be a NPE
// `org.apache.pulsar.broker.service.AbstractTopic.updateNamespaceSubscriptionDispatchRate`
policies.subscriptionDispatchRate = Maps.newHashMap();
doReturn(CompletableFuture.completedFuture(Optional.of(policies))).when(namespaceResources)
.getPoliciesAsync(TopicName.get(encryptionRequiredTopicName).getNamespaceObject());

Expand Down Expand Up @@ -1515,6 +1527,9 @@ public void testSendFailureOnEncryptionRequiredTopic() throws Exception {
// add `clusterDispatchRate` otherwise there will be a NPE
// `org.apache.pulsar.broker.service.persistent.DispatchRateLimiter.getPoliciesDispatchRate`
policies.clusterDispatchRate = Maps.newHashMap();
// add `clusterDispatchRate` otherwise there will be a NPE
// `org.apache.pulsar.broker.service.AbstractTopic.updateNamespaceSubscriptionDispatchRate`
policies.subscriptionDispatchRate = Maps.newHashMap();
doReturn(CompletableFuture.completedFuture(Optional.of(policies))).when(namespaceResources)
.getPoliciesAsync(TopicName.get(encryptionRequiredTopicName).getNamespaceObject());

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import lombok.Getter;
import org.apache.pulsar.common.api.proto.CommandSubscribe.SubType;
import org.apache.pulsar.common.policies.data.BacklogQuota.BacklogQuotaType;
import org.apache.pulsar.common.policies.data.impl.DispatchRateImpl;

/**
* Topic policy hierarchy value container.
Expand All @@ -50,6 +51,7 @@ public class HierarchyTopicPolicies {
final PolicyHierarchyValue<Boolean> delayedDeliveryEnabled;
final PolicyHierarchyValue<Long> delayedDeliveryTickTimeMillis;
final PolicyHierarchyValue<Integer> maxConsumersPerSubscription;
final PolicyHierarchyValue<DispatchRateImpl> subscriptionDispatchRate;
final PolicyHierarchyValue<SchemaCompatibilityStrategy> schemaCompatibilityStrategy;

public HierarchyTopicPolicies() {
Expand All @@ -74,6 +76,7 @@ public HierarchyTopicPolicies() {
delayedDeliveryEnabled = new PolicyHierarchyValue<>();
delayedDeliveryTickTimeMillis = new PolicyHierarchyValue<>();
compactionThreshold = new PolicyHierarchyValue<>();
subscriptionDispatchRate = new PolicyHierarchyValue<>();
schemaCompatibilityStrategy = new PolicyHierarchyValue<>();
}
}

0 comments on commit 2c046d3

Please sign in to comment.