Skip to content

Commit

Permalink
improve logics
Browse files Browse the repository at this point in the history
  • Loading branch information
poorbarcode committed Jan 3, 2025
1 parent 3a568ea commit 303e562
Show file tree
Hide file tree
Showing 2 changed files with 9 additions and 16 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -627,8 +627,6 @@ public SubType getType() {
return SubType.Shared;
}

protected synchronized void finishedRewindAfterInProgressReading(){}

@Override
public final synchronized void readEntriesComplete(List<Entry> entries, Object ctx) {
ReadType readType = (ReadType) ctx;
Expand All @@ -654,7 +652,6 @@ public final synchronized void readEntriesComplete(List<Entry> entries, Object c
entries.forEach(Entry::release);
cursor.rewind();
shouldRewindBeforeReadingOrReplaying = false;
finishedRewindAfterInProgressReading();
readMoreEntries();
return;
}
Expand Down Expand Up @@ -938,7 +935,6 @@ public synchronized void readEntriesFailed(ManagedLedgerException exception, Obj
if (shouldRewindBeforeReadingOrReplaying) {
shouldRewindBeforeReadingOrReplaying = false;
cursor.rewind();
finishedRewindAfterInProgressReading();
}

if (readType == ReadType.Normal) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -122,11 +122,6 @@ public class PersistentStickyKeyDispatcherMultipleConsumersClassic
}
}

@Override
protected synchronized void finishedRewindAfterInProgressReading() {
recentlyJoinedConsumers.clear();
}

@VisibleForTesting
public StickyKeyConsumerSelector getSelector() {
return selector;
Expand Down Expand Up @@ -159,7 +154,8 @@ public synchronized CompletableFuture<Void> addConsumer(Consumer consumer) {
if (!allowOutOfOrderDelivery
&& recentlyJoinedConsumers != null
&& consumerList.size() > 1
&& cursor.getNumberOfEntriesSinceFirstNotAckedMessage() > 1) {
&& cursor.getNumberOfEntriesSinceFirstNotAckedMessage() > 1
&& !shouldRewindBeforeReadingOrReplaying) {
recentlyJoinedConsumers.put(consumer, readPositionWhenJoining);
sortRecentlyJoinedConsumersIfNeeded();
}
Expand All @@ -174,23 +170,24 @@ private void sortRecentlyJoinedConsumersIfNeeded() {
if (recentlyJoinedConsumers.size() == 1) {
return;
}
// Since we check the order of queue after each consumer joined, we can only check the last two items.
boolean sortNeeded = false;
Position posPre = null;
Position posAfter = null;
for (Map.Entry<Consumer, Position> entry : recentlyJoinedConsumers.entrySet()) {
if (posPre == null) {
posPre = entry.getValue();
} else {
posPre = posAfter;
posAfter = entry.getValue();
}
if (posPre != null && posAfter != null) {
if (posPre.compareTo(posAfter) > 0) {
sortNeeded = true;
break;
}
posPre = posAfter;
}
if (posPre != null && posAfter != null) {
if (posPre.compareTo(posAfter) > 0) {
sortNeeded = true;
}
}
// Something went wrongly, sort the collection.
if (sortNeeded) {
log.error("[{}] [{}] The items in recentlyJoinedConsumers are out-of-order. {}",
topic.getName(), name, recentlyJoinedConsumers.entrySet().stream().map(entry ->
Expand Down

0 comments on commit 303e562

Please sign in to comment.