Skip to content

Commit

Permalink
[improve][broker] Improve the extensibility of the TopicBundleAssignm…
Browse files Browse the repository at this point in the history
…entStrategy interface class (#23773)
  • Loading branch information
rayluoluo committed Jan 7, 2025
1 parent d2edba8 commit 306d3fc
Show file tree
Hide file tree
Showing 5 changed files with 110 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -18,31 +18,79 @@
*/
package org.apache.pulsar.common.naming;

import com.google.common.hash.Hashing;
import com.google.common.hash.HashFunction;
import java.nio.charset.StandardCharsets;
import java.util.Optional;
import org.apache.pulsar.broker.PulsarService;

public class ConsistentHashingTopicBundleAssigner implements TopicBundleAssignmentStrategy {
private PulsarService pulsar;
@Override
public NamespaceBundle findBundle(TopicName topicName, NamespaceBundles namespaceBundles) {
<<<<<<< Updated upstream
NamespaceBundle bundle = namespaceBundles.getBundle(getHashCode(topicName.toString()));
=======
<<<<<<< HEAD
<<<<<<< HEAD
NamespaceBundle bundle = namespaceBundles.getBundle(calculateBundleHashCode(topicName));
=======
NamespaceBundle bundle = namespaceBundles.getBundle(getHashCode(topicName.toString()));
>>>>>>> 43e069997e962052bc3376f3cf91e8191f9340bf
=======
NamespaceBundle bundle = namespaceBundles.getBundle(getHashCode(topicName.toString()));
>>>>>>> 43e069997e962052bc3376f3cf91e8191f9340bf
>>>>>>> Stashed changes
if (topicName.getDomain().equals(TopicDomain.non_persistent)) {
bundle.setHasNonPersistentTopic(true);
}
return bundle;
}

@Override
<<<<<<< Updated upstream
=======
<<<<<<< HEAD
<<<<<<< HEAD
public long calculateBundleHashCode(TopicName topicName) {
return getBundleHashFunc().hashString(topicName.toString(), StandardCharsets.UTF_8).padToLong();
=======
=======
>>>>>>> 43e069997e962052bc3376f3cf91e8191f9340bf
>>>>>>> Stashed changes
public long getHashCode(String name) {
return pulsar.getNamespaceService().getNamespaceBundleFactory().getHashFunc()
.hashString(name, StandardCharsets.UTF_8)
.padToLong();
<<<<<<< Updated upstream
}

@Override
public void init(PulsarService pulsarService) {
this.pulsar = pulsarService;
=======
<<<<<<< HEAD
=======
>>>>>>> Stashed changes
}

@Override
public void init(PulsarService pulsarService) {
this.pulsar = pulsarService;
>>>>>>> 43e069997e962052bc3376f3cf91e8191f9340bf
}

@Override
public void init(PulsarService pulsarService) {
this.pulsar = pulsarService;
>>>>>>> 43e069997e962052bc3376f3cf91e8191f9340bf
}

@Override
public void init(PulsarService pulsarService) {
this.pulsar = pulsarService;
}

private HashFunction getBundleHashFunc() {
return Optional.of(pulsar.getNamespaceService().getNamespaceBundleFactory().getHashFunc()).get();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -293,7 +293,19 @@ public CompletableFuture<NamespaceBundle> getFullBundleAsync(NamespaceName fqnn)
}

public long getLongHashCode(String name) {
<<<<<<< Updated upstream
return this.topicBundleAssignmentStrategy.getHashCode(name);
=======
<<<<<<< HEAD
<<<<<<< HEAD
return this.topicBundleAssignmentStrategy.calculateBundleHashCode(TopicName.get(name));
=======
return this.topicBundleAssignmentStrategy.getHashCode(name);
>>>>>>> 43e069997e962052bc3376f3cf91e8191f9340bf
=======
return this.topicBundleAssignmentStrategy.getHashCode(name);
>>>>>>> 43e069997e962052bc3376f3cf91e8191f9340bf
>>>>>>> Stashed changes
}

public NamespaceBundles getBundles(NamespaceName nsname, BundlesData bundleData) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,12 +18,28 @@
*/
package org.apache.pulsar.common.naming;

import com.google.common.hash.Hashing;
import java.nio.charset.StandardCharsets;
import org.apache.pulsar.broker.PulsarService;

public interface TopicBundleAssignmentStrategy {
NamespaceBundle findBundle(TopicName topicName, NamespaceBundles namespaceBundles);

<<<<<<< Updated upstream
long getHashCode(String name);
=======
<<<<<<< HEAD
<<<<<<< HEAD
default long calculateBundleHashCode(TopicName topicName) {
return Hashing.crc32().hashString(topicName.toString(), StandardCharsets.UTF_8).padToLong();
}
=======
long getHashCode(String name);
>>>>>>> 43e069997e962052bc3376f3cf91e8191f9340bf
=======
long getHashCode(String name);
>>>>>>> 43e069997e962052bc3376f3cf91e8191f9340bf
>>>>>>> Stashed changes

void init(PulsarService pulsarService);
}
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
import com.google.common.collect.Range;
import com.google.common.hash.Hashing;
import org.apache.pulsar.broker.PulsarService;
import org.apache.pulsar.broker.namespace.NamespaceService;
import org.apache.pulsar.metadata.api.extended.MetadataStoreExtended;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.Test;
Expand Down Expand Up @@ -122,7 +123,11 @@ private NamespaceBundleFactory getNamespaceBundleFactory() {
MetadataStoreExtended store = mock(MetadataStoreExtended.class);
when(pulsar.getLocalMetadataStore()).thenReturn(store);
when(pulsar.getConfigurationMetadataStore()).thenReturn(store);
return NamespaceBundleFactory.createFactory(pulsar, Hashing.crc32());
NamespaceService namespaceService = mock(NamespaceService.class);
when(pulsar.getNamespaceService()).thenReturn(namespaceService);
NamespaceBundleFactory namespaceBundleFactory = NamespaceBundleFactory.createFactory(pulsar, Hashing.crc32());
when(namespaceService.getNamespaceBundleFactory()).thenReturn(namespaceBundleFactory);
return namespaceBundleFactory;
}

@Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,15 @@ public NamespaceBundle findBundle(TopicName topicName, NamespaceBundles namespac
}

@Override
<<<<<<< HEAD
<<<<<<< HEAD
public long calculateBundleHashCode(TopicName topicName) {
=======
public long getHashCode(String name) {
>>>>>>> 43e069997e962052bc3376f3cf91e8191f9340bf
=======
public long getHashCode(String name) {
>>>>>>> 43e069997e962052bc3376f3cf91e8191f9340bf
return 0;
}

Expand Down Expand Up @@ -133,17 +141,36 @@ public static class RoundRobinBundleAssigner implements TopicBundleAssignmentStr

@Override
public NamespaceBundle findBundle(TopicName topicName, NamespaceBundles namespaceBundles) {
<<<<<<< HEAD
<<<<<<< HEAD
NamespaceBundle bundle = namespaceBundles.getBundle(calculateBundleHashCode(topicName));
=======
NamespaceBundle bundle = namespaceBundles.getBundle(getHashCode(topicName.toString()));
>>>>>>> 43e069997e962052bc3376f3cf91e8191f9340bf
=======
NamespaceBundle bundle = namespaceBundles.getBundle(getHashCode(topicName.toString()));
>>>>>>> 43e069997e962052bc3376f3cf91e8191f9340bf
if (topicName.getDomain().equals(TopicDomain.non_persistent)) {
bundle.setHasNonPersistentTopic(true);
}
return bundle;
}

@Override
<<<<<<< HEAD
<<<<<<< HEAD
public long calculateBundleHashCode(TopicName topicName) {
// use topic name without partition id to decide the first hash value
=======
public long getHashCode(String name) {
// use topic name without partition id to decide the first hash value
TopicName topicName = TopicName.get(name);
>>>>>>> 43e069997e962052bc3376f3cf91e8191f9340bf
=======
public long getHashCode(String name) {
// use topic name without partition id to decide the first hash value
TopicName topicName = TopicName.get(name);
>>>>>>> 43e069997e962052bc3376f3cf91e8191f9340bf
long currentPartitionTopicHash =
pulsar.getNamespaceService().getNamespaceBundleFactory().getHashFunc()
.hashString(topicName.getPartitionedTopicName(), Charsets.UTF_8).padToLong();
Expand Down

0 comments on commit 306d3fc

Please sign in to comment.