From b8efc535116ddec42f15e8bc6ce3365f85df4270 Mon Sep 17 00:00:00 2001 From: rayluoluo Date: Tue, 7 Jan 2025 12:47:37 +0800 Subject: [PATCH] [improve][broker] Improve the extensibility of the TopicBundleAssignmentStrategy interface class (#23773) --- .../ConsistentHashingTopicBundleAssigner.java | 13 +++++++++++-- .../naming/TopicBundleAssignmentStrategyTest.java | 2 ++ 2 files changed, 13 insertions(+), 2 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/common/naming/ConsistentHashingTopicBundleAssigner.java b/pulsar-broker/src/main/java/org/apache/pulsar/common/naming/ConsistentHashingTopicBundleAssigner.java index 677c1e8633556..ec19708948619 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/common/naming/ConsistentHashingTopicBundleAssigner.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/common/naming/ConsistentHashingTopicBundleAssigner.java @@ -22,9 +22,14 @@ import java.nio.charset.StandardCharsets; import java.util.Optional; import org.apache.pulsar.broker.PulsarService; +import org.apache.pulsar.broker.ServiceConfiguration; +import org.apache.pulsar.broker.namespace.NamespaceService; public class ConsistentHashingTopicBundleAssigner implements TopicBundleAssignmentStrategy { private PulsarService pulsar; + + private volatile HashFunction hashFunction; + @Override public NamespaceBundle findBundle(TopicName topicName, NamespaceBundles namespaceBundles) { NamespaceBundle bundle = namespaceBundles.getBundle(calculateBundleHashCode(topicName)); @@ -36,7 +41,10 @@ public NamespaceBundle findBundle(TopicName topicName, NamespaceBundles namespac @Override public long calculateBundleHashCode(TopicName topicName) { - return getBundleHashFunc().hashString(topicName.toString(), StandardCharsets.UTF_8).padToLong(); + if (hashFunction == null) { + hashFunction = getBundleHashFunc(); + } + return hashFunction.hashString(topicName.toString(), StandardCharsets.UTF_8).padToLong(); } @Override @@ -45,6 +53,7 @@ public void init(PulsarService pulsarService) { } private HashFunction getBundleHashFunc() { - return Optional.of(pulsar.getNamespaceService().getNamespaceBundleFactory().getHashFunc()).get(); + return Optional.ofNullable(pulsar.getNamespaceService()).map(NamespaceService::getNamespaceBundleFactory) + .map(NamespaceBundleFactory::getHashFunc).orElseThrow(() -> new RuntimeException("HashFunc not specified")); } } diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/common/naming/TopicBundleAssignmentStrategyTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/common/naming/TopicBundleAssignmentStrategyTest.java index d5719d12985be..d03be7a7c4e5a 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/common/naming/TopicBundleAssignmentStrategyTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/common/naming/TopicBundleAssignmentStrategyTest.java @@ -30,6 +30,8 @@ import com.google.common.hash.Hashing; import java.lang.reflect.Field; +import java.lang.reflect.InvocationTargetException; +import java.lang.reflect.Method; import java.nio.charset.StandardCharsets; import java.util.HashSet; import java.util.Optional;