Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[improve][broker] PIP-379: Key_Shared Draining Hashes for Improved Message Ordering #23352

Merged
merged 117 commits into from
Oct 8, 2024
Merged
Show file tree
Hide file tree
Changes from 101 commits
Commits
Show all changes
117 commits
Select commit Hold shift + click to select a range
1fef930
Remove PIP-282 solution
lhotari Sep 18, 2024
989fa25
Implement PIP-379 draining hashes solution
lhotari Sep 20, 2024
aa66c2d
Handle TTL case for replay queue where the mark delete position moves…
lhotari Oct 2, 2024
a391306
Revisit TTL test
lhotari Oct 2, 2024
23a8126
Join ranges with single consumer
lhotari Oct 2, 2024
5ee3da2
Fix getting impacted hashes
lhotari Oct 2, 2024
3d19279
Improve
lhotari Oct 2, 2024
9e4991d
Add more logging
lhotari Oct 2, 2024
f024921
merge overlapping ranges
lhotari Oct 2, 2024
38ebadf
Fix condition for overlapping ranges
lhotari Oct 2, 2024
312f60c
Switch to use fastutil standard jar
lhotari Oct 2, 2024
b16dbe8
Refactor PendingAcksMap to use fastutil
lhotari Oct 2, 2024
070aab3
Refactor getAckOwnerConsumerAndBatchSize to use ObjectIntPair
lhotari Oct 2, 2024
f9bdaf4
Add removeAllUpTo to PendingAcksMap
lhotari Oct 2, 2024
983d915
Allow duplicates in test
lhotari Oct 2, 2024
ebe2f6e
Switch to debug level logging
lhotari Oct 2, 2024
bfa5139
Improve PendingAcksMap class
lhotari Oct 2, 2024
f0292dd
Remove reference to DrainingHashesTracker from Consumer, rely on call…
lhotari Oct 2, 2024
dc01b71
Polish
lhotari Oct 2, 2024
a9009c5
Handle removal before adding draining hashes
lhotari Oct 2, 2024
8a47b07
Use Range.contains
lhotari Oct 2, 2024
51ed234
Add tests for Range.compareTo and Range.contains
lhotari Oct 2, 2024
edd8d3c
Improve DrainingHashesTracker, implement batching
lhotari Oct 2, 2024
9d105e5
Add deduplication for readMoreEntriesAsync calls
lhotari Oct 2, 2024
6e9d584
Fix PendingAcksMap.size()
lhotari Oct 2, 2024
de3a9ec
Polish
lhotari Oct 2, 2024
d299084
Only skip the delay when there are more entries to read
lhotari Oct 2, 2024
54c5529
Remove obsolete solution added in #12890
lhotari Oct 2, 2024
e0ec1d4
Fix test
lhotari Oct 2, 2024
d97af4a
Fix another test
lhotari Oct 2, 2024
5a145e7
Make test fail consistently by using a fixed seed
lhotari Oct 2, 2024
1129da4
Fix testNoRepeatedReadAndDiscard
lhotari Oct 2, 2024
279817d
Add time limit for waiting
lhotari Oct 2, 2024
f028572
Fix NPE
lhotari Oct 2, 2024
32efe4f
Remove obsolete getLastSentPositionWhenJoining test
lhotari Oct 2, 2024
81ac922
Reduce test flakiness
lhotari Oct 2, 2024
0efff76
Remove obsolete test
lhotari Oct 2, 2024
f8f009c
Remove unused imports in test
lhotari Oct 2, 2024
aeb1a4d
Remove obsolete minReplayedPosition
lhotari Oct 2, 2024
cc887db
Rename field for more clarity
lhotari Oct 2, 2024
9099a15
Switch to use readMoreEntriesAsync to deduplicate calls
lhotari Oct 2, 2024
dcda819
Use selector logic to create sticky key hash in tests
lhotari Oct 2, 2024
53c1242
Fix java.util.NoSuchElementException: No value present
lhotari Oct 2, 2024
c90e71c
Fix test related to readMoreEntriesAsync changes
lhotari Oct 2, 2024
4770383
Fix race condition in upgrading read lock to write lock
lhotari Oct 2, 2024
fc030a9
optimize registerDrainingHashes by creating an array of ranges once
lhotari Oct 2, 2024
4729cd1
Improve code comments
lhotari Oct 2, 2024
fa220e2
Improve condition
lhotari Oct 2, 2024
3f729c4
Add more tests for HashRanges
lhotari Oct 2, 2024
e84b42b
Use single lock in PendingAcksMap when using Key_Shared
lhotari Oct 2, 2024
2b158e4
Simplify PendingAcksMap by always using separate read and write lock
lhotari Oct 2, 2024
acba9c2
Add tests for PendingAcksMapTest
lhotari Oct 2, 2024
cd55d48
Add tests for DrainingHashesTracker
lhotari Oct 2, 2024
a50ce1f
Add test case that reproduced issue 23307 which is now fixed
lhotari Oct 3, 2024
27e251a
Fix PersistentStickyKeyDispatcherMultipleConsumersTest
lhotari Oct 3, 2024
78eccb8
Revert "Remove obsolete minReplayedPosition"
lhotari Oct 3, 2024
204ffec
Revert "Remove obsolete solution added in #12890"
lhotari Oct 3, 2024
e9fa63f
Use Consumer instance in DrainingHashEntry
lhotari Oct 3, 2024
0fc5c70
Improve code comments
lhotari Oct 3, 2024
46209d8
Update javadoc about consumer instance in DrainingHashEntry
lhotari Oct 3, 2024
16c18ec
Refactor NavigableSet<Range> -> ImpactedHashRanges
lhotari Oct 5, 2024
c7e49ee
Refactor: Extract ConsumerHashAssignmentsSnapshot
lhotari Oct 5, 2024
14fef5d
Move method call outside of loop since it's making the test slower
lhotari Oct 5, 2024
e761121
Optimize snapshotting and cache getRangesByConsumer
lhotari Oct 5, 2024
a608266
Add StickyKeyConsumerSelectorUtils for internal implementation methods
lhotari Oct 5, 2024
7834d7a
Rename field to sortedRanges to improve clarity
lhotari Oct 5, 2024
fabb326
Rename ImpactedHashRanges to RemovedHashRanges
lhotari Oct 5, 2024
fa88ede
Introduce the type ImpactedConsumersResult
lhotari Oct 5, 2024
f5d8051
Use Lombok's ToString annotation
lhotari Oct 5, 2024
260d1e9
Fix javadoc description for ConsistentHashingStickyKeyConsumerSelector
lhotari Oct 5, 2024
85c7e9e
Rename variable to pendingAck
lhotari Oct 5, 2024
2c84be3
Add IllegalStateExceptions when unexpected state occurs
lhotari Oct 5, 2024
fd04d64
Remove some duplication in KeySharedSubscriptionTest
lhotari Oct 5, 2024
57f95d7
Fix issue in testOrderingWhenAddingConsumers
lhotari Oct 5, 2024
184848d
Rename variable
lhotari Oct 5, 2024
b06c9e9
Fix test
lhotari Oct 6, 2024
97395a6
Add comments
lhotari Oct 7, 2024
31b3b6e
Add STICKY_KEY_HASH_NOT_SET constant to replace the magic value 0 for…
lhotari Oct 7, 2024
2e4f379
Improve javadoc of StickyKeyConsumerSelector
lhotari Oct 7, 2024
3c1e852
Add comment about fixed seed in test
lhotari Oct 7, 2024
7f8c979
Cleanup test
lhotari Oct 7, 2024
1f9f44f
Use "stickyKeyHash != STICKY_KEY_HASH_NOT_SET" instead of "stickyKeyH…
lhotari Oct 7, 2024
721fd6b
Update pulsar-broker/src/main/java/org/apache/pulsar/broker/service/p…
lhotari Oct 7, 2024
6510b1b
Improve clarity of unset sticky key hash value
lhotari Oct 7, 2024
5f387ec
Address review comment about constant keyHashRange
lhotari Oct 7, 2024
54d9ab1
Address review feedback in ConsumerHashAssignmentsSnapshot
lhotari Oct 7, 2024
cc6cd43
Improve makeStickyKeyHash
lhotari Oct 7, 2024
96d1660
Update test to match cc6cd43 changes
lhotari Oct 7, 2024
ebd8458
Use Range.size method in tests
lhotari Oct 7, 2024
c96efe0
Improve clarity of test
lhotari Oct 7, 2024
cbed983
Extract createReadEntriesSkipCondition
lhotari Oct 7, 2024
e5470a3
Add readOpCount to ManagedCursor to prevent Key_Shared subscription f…
lhotari Oct 7, 2024
7c02595
Add hasPendingReadRequest to ManagedCursor
lhotari Oct 7, 2024
a62b71c
Extract unblocking logic to KeySharedUnblockingHandler
lhotari Oct 7, 2024
303ea5c
Remove unused method
lhotari Oct 7, 2024
9b59211
Unblock when redelivery messages have been added
lhotari Oct 7, 2024
040bebe
Rename KeySharedUnblockingHandler to RescheduleReadHandler
lhotari Oct 7, 2024
d68db0f
Fix test
lhotari Oct 7, 2024
43da603
Rename method
lhotari Oct 7, 2024
d3e75ed
Improve logic
lhotari Oct 7, 2024
a445e0c
Fix test
lhotari Oct 7, 2024
82540e4
Improve test
lhotari Oct 7, 2024
84ad0b9
Fix bug: Remove blocking of replay entries while adding entries
lhotari Oct 7, 2024
29df175
Remove unused fields after removing logic
lhotari Oct 7, 2024
2d339a1
Revert "Add readOpCount to ManagedCursor to prevent Key_Shared subscr…
lhotari Oct 7, 2024
f4402e8
Revert "Add hasPendingReadRequest to ManagedCursor"
lhotari Oct 7, 2024
ead7bce
Revisit RescheduleReadHandler used for the Key_Shared implementation
lhotari Oct 7, 2024
bf1e89c
Remove unused field
lhotari Oct 7, 2024
186dc30
Revisit removeConsumer method
lhotari Oct 7, 2024
fbd7695
Rename parameter
lhotari Oct 7, 2024
22e1709
Polish
lhotari Oct 7, 2024
58bddf7
Add some tests for RescheduleReadHandlerTest
lhotari Oct 8, 2024
fb7a8ea
Add test for adding 1000 consumers
lhotari Oct 8, 2024
92f9a29
Optimize snapshotting by assuming that entries are sorted
lhotari Oct 8, 2024
640b48a
Optimize snapshotting step by assuming sorted order as input
lhotari Oct 8, 2024
a81393e
Disable perf test by default since there's no validation to be performed
lhotari Oct 8, 2024
34f925d
Remove invalid test case that makes assumptions about the implementation
lhotari Oct 8, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions distribution/server/src/assemble/LICENSE.bin.txt
Original file line number Diff line number Diff line change
Expand Up @@ -262,6 +262,7 @@ The Apache Software License, Version 2.0
- com.fasterxml.jackson.module-jackson-module-parameter-names-2.17.2.jar
* Caffeine -- com.github.ben-manes.caffeine-caffeine-2.9.1.jar
* Conscrypt -- org.conscrypt-conscrypt-openjdk-uber-2.5.2.jar
* Fastutil -- it.unimi.dsi-fastutil-8.5.14.jar
* Proto Google Common Protos -- com.google.api.grpc-proto-google-common-protos-2.17.0.jar
* Bitbucket -- org.bitbucket.b_c-jose4j-0.9.4.jar
* Gson
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -330,6 +330,12 @@ default void asyncReadEntriesWithSkipOrWait(int maxEntries, long maxSizeBytes, R
asyncReadEntriesOrWait(maxEntries, maxSizeBytes, callback, ctx, maxPosition);
}

/**
* Returns whether there is a pending read operation in progress.
* @return true if there is a pending read operation in progress
*/
boolean hasPendingReadRequest();
codelipenghui marked this conversation as resolved.
Show resolved Hide resolved

/**
* Cancel a previously scheduled asyncReadEntriesOrWait operation.
*
Expand Down Expand Up @@ -900,4 +906,14 @@ default ManagedCursorAttributes getManagedCursorAttributes() {
int applyMaxSizeCap(int maxEntries, long maxSizeBytes);

void updateReadStats(int readEntriesCount, long readEntriesSize);

/**
* Get the number of read operations performed on this cursor.
* This is used in Pulsar's Key_Shared dispatcher to determine whether a pending read needs to be cancelled
* in order to process entries in the replay queue when a hash is unblocked.
* @return the number of read operations performed on this cursor
*/
default long getReadOpCount() {
return 0;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -216,6 +216,9 @@ public class ManagedCursorImpl implements ManagedCursor {
private int individualDeletedMessagesSerializedSize;
private static final String COMPACTION_CURSOR_NAME = "__compaction";
private volatile boolean cacheReadEntry = false;
private static final AtomicLongFieldUpdater<ManagedCursorImpl> READ_OP_COUNT_UPDATER =
AtomicLongFieldUpdater.newUpdater(ManagedCursorImpl.class, "readOpCount");
private volatile long readOpCount;

// active state cache in ManagedCursor. It should be in sync with the state in activeCursors in ManagedLedger.
private volatile boolean isActive = false;
Expand Down Expand Up @@ -571,6 +574,7 @@ protected void recoverFromLedger(final ManagedCursorInfo info, final VoidCallbac
return;
}

READ_OP_COUNT_UPDATER.incrementAndGet(ManagedCursorImpl.this);
codelipenghui marked this conversation as resolved.
Show resolved Hide resolved
lh.asyncReadEntries(lastEntryInLedger, lastEntryInLedger, (rc1, lh1, seq, ctx1) -> {
if (log.isDebugEnabled()) {
log.debug("[{}} readComplete rc={} entryId={}", ledger.getName(), rc1, lh1.getLastAddConfirmed());
Expand Down Expand Up @@ -1102,6 +1106,7 @@ public boolean cancelPendingReadRequest() {
return op != null;
}

@Override
public boolean hasPendingReadRequest() {
return WAITING_READ_OP_UPDATER.get(this) != null;
}
Expand Down Expand Up @@ -3804,13 +3809,19 @@ public ManagedCursorAttributes getManagedCursorAttributes() {
return ATTRIBUTES_UPDATER.updateAndGet(this, old -> old != null ? old : new ManagedCursorAttributes(this));
}

@Override
public long getReadOpCount() {
return readOpCount;
}

@Override
public ManagedLedgerInternalStats.CursorStats getCursorStats() {
ManagedLedgerInternalStats.CursorStats cs = new ManagedLedgerInternalStats.CursorStats();
cs.markDeletePosition = getMarkDeletedPosition().toString();
cs.readPosition = getReadPosition().toString();
cs.waitingReadOp = hasPendingReadRequest();
cs.pendingReadOps = getPendingReadOpsCount();
cs.readOpCount = getReadOpCount();
cs.messagesConsumedCounter = getMessagesConsumedCounter();
cs.cursorLedger = getCursorLedger();
cs.cursorLedgerLastEntry = getCursorLedgerLastEntry();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -315,6 +315,11 @@ public void asyncReadEntriesOrWait(int maxEntries, long maxSizeBytes, ReadEntrie

}

@Override
public boolean hasPendingReadRequest() {
return false;
}

@Override
public boolean cancelPendingReadRequest() {
return true;
Expand Down
7 changes: 7 additions & 0 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -162,6 +162,7 @@ flexible messaging model and an intuitive client API.</description>
<bouncycastle.bcpkix-fips.version>1.0.7</bouncycastle.bcpkix-fips.version>
<bouncycastle.bc-fips.version>1.0.2.5</bouncycastle.bc-fips.version>
<jackson.version>2.17.2</jackson.version>
<fastutil.version>8.5.14</fastutil.version>
<reflections.version>0.10.2</reflections.version>
<swagger.version>1.6.2</swagger.version>
<puppycrawl.checkstyle.version>10.14.2</puppycrawl.checkstyle.version>
Expand Down Expand Up @@ -911,6 +912,12 @@ flexible messaging model and an intuitive client API.</description>
<scope>import</scope>
</dependency>

<dependency>
<groupId>it.unimi.dsi</groupId>
<artifactId>fastutil</artifactId>
<version>${fastutil.version}</version>
</dependency>

<dependency>
<groupId>org.codehaus.jettison</groupId>
<artifactId>jettison</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -979,6 +979,16 @@ The max allowed delay for delayed delivery (in milliseconds). If the broker rece
)
private int keySharedLookAheadMsgInReplayThresholdPerSubscription = 20000;

@FieldContext(
category = CATEGORY_POLICIES,
doc = "For Key_Shared subscriptions, when a blocked key hash gets unblocked,"
+ " a redelivery will be attempted after a delay. This setting controls the delay."
+ " The reason to have the delay is to batch multiple unblocking events instead of triggering"
+ " redelivery for each unblocking event.",
dynamic = true
)
private long keySharedUnblockingIntervalMs = 10L;

@FieldContext(
category = CATEGORY_POLICIES,
doc = "Once broker reaches maxUnackedMessagesPerBroker limit, it blocks subscriptions which has higher "
Expand Down
5 changes: 5 additions & 0 deletions pulsar-broker/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,11 @@
<artifactId>protobuf-java</artifactId>
</dependency>

<dependency>
<groupId>it.unimi.dsi</groupId>
<artifactId>fastutil</artifactId>
</dependency>

<dependency>
<groupId>${project.groupId}</groupId>
<artifactId>pulsar-client-original</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -146,7 +146,7 @@ public void run(Timeout timeout) throws Exception {
lastTickRun = clock.millis();
currentTimeoutTarget = -1;
this.timeout = null;
dispatcher.readMoreEntries();
dispatcher.readMoreEntriesAsync();
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3351,7 +3351,7 @@ public void unblockDispatchersOnUnAckMessages(List<PersistentDispatcherMultipleC
try {
dispatcherList.forEach(dispatcher -> {
dispatcher.unBlockDispatcherOnUnackedMsgs();
executor().execute(() -> dispatcher.readMoreEntries());
dispatcher.readMoreEntriesAsync();
log.info("[{}] Dispatcher is unblocked", dispatcher.getName());
blockedDispatchers.remove(dispatcher);
});
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,23 +18,18 @@
*/
package org.apache.pulsar.broker.service;

import java.util.ArrayList;
import java.util.IdentityHashMap;
import java.util.List;
import java.util.Map;
import java.util.NavigableMap;
import java.util.SortedMap;
import java.util.TreeMap;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import org.apache.pulsar.client.api.Range;
import org.apache.pulsar.common.util.Murmur3_32Hash;

/**
* This is a consumer selector based fixed hash range.
*
* The implementation uses consistent hashing to evenly split, the
* number of keys assigned to each consumer.
* This is a consumer selector using consistent hashing to evenly split
* the number of keys assigned to each consumer.
*/
public class ConsistentHashingStickyKeyConsumerSelector implements StickyKeyConsumerSelector {
// use NUL character as field separator for hash key calculation
Expand All @@ -47,14 +42,22 @@ public class ConsistentHashingStickyKeyConsumerSelector implements StickyKeyCons
private final ConsumerNameIndexTracker consumerNameIndexTracker = new ConsumerNameIndexTracker();

private final int numberOfPoints;
private final Range keyHashRange;
private ConsumerHashAssignmentsSnapshot consumerHashAssignmentsSnapshot;

public ConsistentHashingStickyKeyConsumerSelector(int numberOfPoints) {
this(numberOfPoints, DEFAULT_RANGE_SIZE - 1);
}

public ConsistentHashingStickyKeyConsumerSelector(int numberOfPoints, int rangeMaxValue) {
this.hashRing = new TreeMap<>();
this.numberOfPoints = numberOfPoints;
this.keyHashRange = Range.of(STICKY_KEY_HASH_NOT_SET + 1, rangeMaxValue);
this.consumerHashAssignmentsSnapshot = ConsumerHashAssignmentsSnapshot.empty();
}

@Override
public CompletableFuture<Void> addConsumer(Consumer consumer) {
public CompletableFuture<ImpactedConsumersResult> addConsumer(Consumer consumer) {
rwLock.writeLock().lock();
try {
ConsumerIdentityWrapper consumerIdentityWrapper = new ConsumerIdentityWrapper(consumer);
Expand All @@ -72,7 +75,11 @@ public CompletableFuture<Void> addConsumer(Consumer consumer) {
consumerNameIndexTracker.decreaseConsumerRefCount(removed);
}
}
return CompletableFuture.completedFuture(null);
ConsumerHashAssignmentsSnapshot assignmentsAfter = internalGetConsumerHashAssignmentsSnapshot();
ImpactedConsumersResult impactedConsumers =
consumerHashAssignmentsSnapshot.resolveImpactedConsumers(assignmentsAfter);
consumerHashAssignmentsSnapshot = assignmentsAfter;
return CompletableFuture.completedFuture(impactedConsumers);
} finally {
rwLock.writeLock().unlock();
}
Expand All @@ -88,14 +95,14 @@ public CompletableFuture<Void> addConsumer(Consumer consumer) {
* @param hashRingPointIndex the index of the hash ring point
* @return the hash value
*/
private static int calculateHashForConsumerAndIndex(Consumer consumer, int consumerNameIndex,
private int calculateHashForConsumerAndIndex(Consumer consumer, int consumerNameIndex,
int hashRingPointIndex) {
String key = consumer.consumerName() + KEY_SEPARATOR + consumerNameIndex + KEY_SEPARATOR + hashRingPointIndex;
return Murmur3_32Hash.getInstance().makeHash(key.getBytes());
return makeStickyKeyHash(key.getBytes());
}

@Override
public void removeConsumer(Consumer consumer) {
public ImpactedConsumersResult removeConsumer(Consumer consumer) {
rwLock.writeLock().lock();
try {
ConsumerIdentityWrapper consumerIdentityWrapper = new ConsumerIdentityWrapper(consumer);
Expand All @@ -109,6 +116,11 @@ public void removeConsumer(Consumer consumer) {
}
}
}
ConsumerHashAssignmentsSnapshot assignmentsAfter = internalGetConsumerHashAssignmentsSnapshot();
ImpactedConsumersResult impactedConsumers =
consumerHashAssignmentsSnapshot.resolveImpactedConsumers(assignmentsAfter);
lhotari marked this conversation as resolved.
Show resolved Hide resolved
consumerHashAssignmentsSnapshot = assignmentsAfter;
return impactedConsumers;
} finally {
rwLock.writeLock().unlock();
}
Expand All @@ -134,32 +146,58 @@ public Consumer select(int hash) {
}

@Override
public Map<Consumer, List<Range>> getConsumerKeyHashRanges() {
Map<Consumer, List<Range>> result = new IdentityHashMap<>();
public Range getKeyHashRange() {
return keyHashRange;
}

@Override
public ConsumerHashAssignmentsSnapshot getConsumerHashAssignmentsSnapshot() {
rwLock.readLock().lock();
try {
if (hashRing.isEmpty()) {
return result;
}
int start = 0;
int lastKey = 0;
for (Map.Entry<Integer, ConsumerIdentityWrapper> entry: hashRing.entrySet()) {
Consumer consumer = entry.getValue().consumer;
result.computeIfAbsent(consumer, key -> new ArrayList<>())
.add(Range.of(start, entry.getKey()));
lastKey = entry.getKey();
start = lastKey + 1;
}
// Handle wrap-around in the hash ring, the first consumer will also contain the range from the last key
// to the maximum value of the hash range
Consumer firstConsumer = hashRing.firstEntry().getValue().consumer;
List<Range> ranges = result.get(firstConsumer);
if (lastKey != Integer.MAX_VALUE - 1) {
ranges.add(Range.of(lastKey + 1, Integer.MAX_VALUE - 1));
}
return consumerHashAssignmentsSnapshot;
} finally {
rwLock.readLock().unlock();
}
return result;
}

private ConsumerHashAssignmentsSnapshot internalGetConsumerHashAssignmentsSnapshot() {
if (hashRing.isEmpty()) {
return ConsumerHashAssignmentsSnapshot.empty();
}
SortedMap<Range, Consumer> result = new TreeMap<>();
int start = getKeyHashRange().getStart();
int lastKey = -1;
Consumer previousConsumer = null;
Range previousRange = null;
for (Map.Entry<Integer, ConsumerIdentityWrapper> entry: hashRing.entrySet()) {
Consumer consumer = entry.getValue().consumer;
Range range;
if (consumer == previousConsumer) {
// join ranges
result.remove(previousRange);
range = Range.of(previousRange.getStart(), entry.getKey());
} else {
range = Range.of(start, entry.getKey());
}
result.put(range, consumer);
lastKey = entry.getKey();
start = lastKey + 1;
previousConsumer = consumer;
previousRange = range;
}
// Handle wrap-around
Consumer firstConsumer = hashRing.firstEntry().getValue().consumer;
if (lastKey != getKeyHashRange().getEnd()) {
Range range;
if (firstConsumer == previousConsumer && previousRange.getEnd() == lastKey) {
// join ranges
result.remove(previousRange);
range = Range.of(previousRange.getStart(), getKeyHashRange().getEnd());
} else {
range = Range.of(lastKey + 1, getKeyHashRange().getEnd());
}
result.put(range, firstConsumer);
}
return ConsumerHashAssignmentsSnapshot.of(result);
}
}
Loading
Loading