Skip to content

Commit

Permalink
Implement PIP-379 draining hashes solution
Browse files Browse the repository at this point in the history
  • Loading branch information
lhotari committed Oct 1, 2024
1 parent c99697b commit 923f379
Show file tree
Hide file tree
Showing 22 changed files with 906 additions and 138 deletions.
1 change: 1 addition & 0 deletions distribution/server/src/assemble/LICENSE.bin.txt
Original file line number Diff line number Diff line change
Expand Up @@ -262,6 +262,7 @@ The Apache Software License, Version 2.0
- com.fasterxml.jackson.module-jackson-module-parameter-names-2.17.2.jar
* Caffeine -- com.github.ben-manes.caffeine-caffeine-2.9.1.jar
* Conscrypt -- org.conscrypt-conscrypt-openjdk-uber-2.5.2.jar
* Fastutil -- it.unimi.dsi-fastutil-core-8.5.14.jar
* Proto Google Common Protos -- com.google.api.grpc-proto-google-common-protos-2.17.0.jar
* Bitbucket -- org.bitbucket.b_c-jose4j-0.9.4.jar
* Gson
Expand Down
7 changes: 7 additions & 0 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -162,6 +162,7 @@ flexible messaging model and an intuitive client API.</description>
<bouncycastle.bcpkix-fips.version>1.0.7</bouncycastle.bcpkix-fips.version>
<bouncycastle.bc-fips.version>1.0.2.5</bouncycastle.bc-fips.version>
<jackson.version>2.17.2</jackson.version>
<fastutil.version>8.5.14</fastutil.version>
<reflections.version>0.10.2</reflections.version>
<swagger.version>1.6.2</swagger.version>
<puppycrawl.checkstyle.version>10.14.2</puppycrawl.checkstyle.version>
Expand Down Expand Up @@ -911,6 +912,12 @@ flexible messaging model and an intuitive client API.</description>
<scope>import</scope>
</dependency>

<dependency>
<groupId>it.unimi.dsi</groupId>
<artifactId>fastutil-core</artifactId>
<version>${fastutil.version}</version>
</dependency>

<dependency>
<groupId>org.codehaus.jettison</groupId>
<artifactId>jettison</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -979,6 +979,16 @@ The max allowed delay for delayed delivery (in milliseconds). If the broker rece
)
private int keySharedLookAheadMsgInReplayThresholdPerSubscription = 20000;

@FieldContext(
category = CATEGORY_POLICIES,
doc = "For Key_Shared subscriptions, when a blocked key hash gets unblocked,"
+ " a redelivery will be attempted after a delay. This setting controls the delay."
+ " The reason to have the delay is to batch multiple unblocking events instead of triggering"
+ " redelivery for each unblocking event.",
dynamic = true
)
private long keySharedUnblockingIntervalMs = 10L;

@FieldContext(
category = CATEGORY_POLICIES,
doc = "Once broker reaches maxUnackedMessagesPerBroker limit, it blocks subscriptions which has higher "
Expand Down
5 changes: 5 additions & 0 deletions pulsar-broker/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,11 @@
<artifactId>protobuf-java</artifactId>
</dependency>

<dependency>
<groupId>it.unimi.dsi</groupId>
<artifactId>fastutil-core</artifactId>
</dependency>

<dependency>
<groupId>${project.groupId}</groupId>
<artifactId>pulsar-client-original</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,12 +23,12 @@
import java.util.List;
import java.util.Map;
import java.util.NavigableMap;
import java.util.NavigableSet;
import java.util.TreeMap;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import org.apache.pulsar.client.api.Range;
import org.apache.pulsar.common.util.Murmur3_32Hash;

/**
* This is a consumer selector based fixed hash range.
Expand All @@ -47,16 +47,23 @@ public class ConsistentHashingStickyKeyConsumerSelector implements StickyKeyCons
private final ConsumerNameIndexTracker consumerNameIndexTracker = new ConsumerNameIndexTracker();

private final int numberOfPoints;
private final int rangeSize;

public ConsistentHashingStickyKeyConsumerSelector(int numberOfPoints) {
this(numberOfPoints, DEFAULT_RANGE_SIZE);
}

public ConsistentHashingStickyKeyConsumerSelector(int numberOfPoints, int rangeSize) {
this.hashRing = new TreeMap<>();
this.numberOfPoints = numberOfPoints;
this.rangeSize = rangeSize;
}

@Override
public CompletableFuture<Void> addConsumer(Consumer consumer) {
public CompletableFuture<Map<Consumer, NavigableSet<Range>>> addConsumer(Consumer consumer) {
rwLock.writeLock().lock();
try {
Map<Range, Consumer> mappingBefore = internalGetKeyHashRangeToConsumerMapping();
ConsumerIdentityWrapper consumerIdentityWrapper = new ConsumerIdentityWrapper(consumer);
// Insert multiple points on the hash ring for every consumer
// The points are deterministically added based on the hash of the consumer name
Expand All @@ -72,12 +79,20 @@ public CompletableFuture<Void> addConsumer(Consumer consumer) {
consumerNameIndexTracker.decreaseConsumerRefCount(removed);
}
}
return CompletableFuture.completedFuture(null);
Map<Range, Consumer> mappingAfter = internalGetKeyHashRangeToConsumerMapping();
Map<Consumer, NavigableSet<Range>> impactedRanges =
HashRanges.resolveImpactedExistingConsumers(mappingBefore, mappingAfter);
return CompletableFuture.completedFuture(impactedRanges);
} finally {
rwLock.writeLock().unlock();
}
}

@Override
public int makeStickyKeyHash(byte[] stickyKey) {
return HashRanges.makeStickyKeyHash(stickyKey, rangeSize);
}

/**
* Calculate the hash for a consumer and hash ring point.
* The hash is calculated based on the consumer name, consumer name index, and hash ring point index.
Expand All @@ -88,16 +103,17 @@ public CompletableFuture<Void> addConsumer(Consumer consumer) {
* @param hashRingPointIndex the index of the hash ring point
* @return the hash value
*/
private static int calculateHashForConsumerAndIndex(Consumer consumer, int consumerNameIndex,
private int calculateHashForConsumerAndIndex(Consumer consumer, int consumerNameIndex,
int hashRingPointIndex) {
String key = consumer.consumerName() + KEY_SEPARATOR + consumerNameIndex + KEY_SEPARATOR + hashRingPointIndex;
return Murmur3_32Hash.getInstance().makeHash(key.getBytes());
return makeStickyKeyHash(key.getBytes());
}

@Override
public void removeConsumer(Consumer consumer) {
public Map<Consumer, NavigableSet<Range>> removeConsumer(Consumer consumer) {
rwLock.writeLock().lock();
try {
Map<Range, Consumer> mappingBefore = internalGetKeyHashRangeToConsumerMapping();
ConsumerIdentityWrapper consumerIdentityWrapper = new ConsumerIdentityWrapper(consumer);
int consumerNameIndex = consumerNameIndexTracker.getTrackedIndex(consumerIdentityWrapper);
if (consumerNameIndex > -1) {
Expand All @@ -109,6 +125,10 @@ public void removeConsumer(Consumer consumer) {
}
}
}
Map<Range, Consumer> mappingAfter = internalGetKeyHashRangeToConsumerMapping();
Map<Consumer, NavigableSet<Range>> impactedRanges =
HashRanges.resolveImpactedExistingConsumers(mappingBefore, mappingAfter);
return impactedRanges;
} finally {
rwLock.writeLock().unlock();
}
Expand All @@ -133,6 +153,11 @@ public Consumer select(int hash) {
}
}

@Override
public Range getKeyHashRange() {
return Range.of(0, rangeSize - 1);
}

@Override
public Map<Consumer, List<Range>> getConsumerKeyHashRanges() {
Map<Consumer, List<Range>> result = new IdentityHashMap<>();
Expand All @@ -143,23 +168,70 @@ public Map<Consumer, List<Range>> getConsumerKeyHashRanges() {
}
int start = 0;
int lastKey = 0;
Consumer previousConsumer = null;
for (Map.Entry<Integer, ConsumerIdentityWrapper> entry: hashRing.entrySet()) {
Consumer consumer = entry.getValue().consumer;
result.computeIfAbsent(consumer, key -> new ArrayList<>())
.add(Range.of(start, entry.getKey()));
if (consumer == previousConsumer) {
List<Range> ranges = result.get(consumer);
Range previousRange = ranges.remove(ranges.size() - 1);
// join ranges
ranges.add(Range.of(previousRange.getStart(), entry.getKey()));
} else {
result.computeIfAbsent(consumer, key -> new ArrayList<>())
.add(Range.of(start, entry.getKey()));
}
lastKey = entry.getKey();
start = lastKey + 1;
previousConsumer = consumer;
}
// Handle wrap-around in the hash ring, the first consumer will also contain the range from the last key
// to the maximum value of the hash range
Consumer firstConsumer = hashRing.firstEntry().getValue().consumer;
List<Range> ranges = result.get(firstConsumer);
if (lastKey != Integer.MAX_VALUE - 1) {
ranges.add(Range.of(lastKey + 1, Integer.MAX_VALUE - 1));
if (lastKey != rangeSize - 1) {
Range lastRange = ranges.get(ranges.size() - 1);
if (lastRange.getEnd() == lastKey) {
// join ranges
ranges.remove(ranges.size() - 1);
ranges.add(Range.of(lastRange.getStart(), rangeSize - 1));
} else {
ranges.add(Range.of(lastKey + 1, rangeSize - 1));
}
}
} finally {
rwLock.readLock().unlock();
}
return result;
}

@Override
public Map<Range, Consumer> getKeyHashRangeToConsumerMapping() {
rwLock.readLock().lock();
try {
return internalGetKeyHashRangeToConsumerMapping();
} finally {
rwLock.readLock().unlock();
}
}

private Map<Range, Consumer> internalGetKeyHashRangeToConsumerMapping() {
Map<Range, Consumer> result = new TreeMap<>();
if (hashRing.isEmpty()) {
return result;
}
int start = 0;
int lastKey = 0;
for (Map.Entry<Integer, ConsumerIdentityWrapper> entry: hashRing.entrySet()) {
Consumer consumer = entry.getValue().consumer;
result.put(Range.of(start, entry.getKey()), consumer);
lastKey = entry.getKey();
start = lastKey + 1;
}
// Handle wrap-around
Consumer firstConsumer = hashRing.firstEntry().getValue().consumer;
if (lastKey != rangeSize - 1) {
result.put(Range.of(lastKey + 1, rangeSize - 1), firstConsumer);
}
return result;
}
}
Loading

0 comments on commit 923f379

Please sign in to comment.