Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[fix][broker] PIP-379 Key_Shared implementation race condition causing out-of-order message delivery #23874

Merged
Merged
Show file tree
Hide file tree
Changes from 15 commits
Commits
Show all changes
28 commits
Select commit Hold shift + click to select a range
67605e7
Add solution to PulsarMockBookKeeper for intercepting reads
lhotari Oct 21, 2024
e40461f
Improve quiet time implementation in receiveMessages
lhotari Jan 17, 2025
7e69af5
Add debug when added to replay
lhotari Jan 17, 2025
6a354df
Enable test logging at debug level, add more logging
lhotari Jan 20, 2025
7c1d3db
Cancel pending read
lhotari Jan 20, 2025
47f7583
Add debug log to skipping pending replay read
lhotari Jan 21, 2025
4dc843b
Add test
lhotari Jan 16, 2025
0512f38
Notify also when the consumer isn't closing
lhotari Jan 21, 2025
3913e64
Postpone removals after critical sections to prevent race conditions
lhotari Jan 21, 2025
fa60a7a
Add test and more docs for OutsideCriticalSectionsExecutor
lhotari Jan 21, 2025
ec898d6
Adjust test logging - disable debug logging for key_shared related di…
lhotari Jan 21, 2025
7bc7b92
Fix failing test
lhotari Jan 21, 2025
7bca0ad
Fix race condition in test
lhotari Jan 21, 2025
c8d14f3
Revert "Add solution to PulsarMockBookKeeper for intercepting reads"
lhotari Jan 22, 2025
799fab9
Revert "Improve quiet time implementation in receiveMessages"
lhotari Jan 22, 2025
d8361bf
Fix test after reverting read handle interceptor changes
lhotari Jan 22, 2025
eef83e9
Revert "Add test and more docs for OutsideCriticalSectionsExecutor"
lhotari Jan 22, 2025
31db5fa
Revert "Postpone removals after critical sections to prevent race con…
lhotari Jan 22, 2025
231b300
Fix test after reverting previous changes
lhotari Jan 22, 2025
e734cdf
Add solution by Yubiao to prevent race condition in unblocking while …
lhotari Jan 22, 2025
a5d2029
Add "already blocked hashes" solution to dispatching phase
lhotari Jan 22, 2025
450069a
Run test with most classic and PIP-379 implementation
lhotari Jan 22, 2025
06827df
Revert havePendingReplayRead related changes
lhotari Jan 23, 2025
98942d7
Validate that no exceptions were thrown in message handler
lhotari Jan 23, 2025
65401dc
Make failing configurable when out-of-order message is a duplicate
lhotari Jan 23, 2025
ddbe35b
Improve test logging
lhotari Jan 23, 2025
4b0f08b
Remove unused import
lhotari Jan 23, 2025
be9b80c
Revert unrelated DrainingHashesTracker changes
lhotari Jan 23, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import java.util.Map;
import java.util.PrimitiveIterator;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Executor;
import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import lombok.ToString;
Expand All @@ -50,10 +51,9 @@ public class DrainingHashesTracker {
// optimize the memory consumption of the map by using primitive int keys
private final Int2ObjectOpenHashMap<DrainingHashEntry> drainingHashes = new Int2ObjectOpenHashMap<>();
private final ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
int batchLevel;
boolean unblockedWhileBatching;
private final Map<ConsumerIdentityWrapper, ConsumerDrainingHashesStats> consumerDrainingHashesStatsMap =
new ConcurrentHashMap<>();
private final Executor removalExecutor;

/**
* Represents an entry in the draining hashes tracker.
Expand Down Expand Up @@ -220,8 +220,13 @@ public interface UnblockingHandler {
}

public DrainingHashesTracker(String dispatcherName, UnblockingHandler unblockingHandler) {
this(dispatcherName, unblockingHandler, Runnable::run);
}

public DrainingHashesTracker(String dispatcherName, UnblockingHandler unblockingHandler, Executor removalExecutor) {
this.dispatcherName = dispatcherName;
this.unblockingHandler = unblockingHandler;
this.removalExecutor = removalExecutor;
}

/**
Expand Down Expand Up @@ -274,39 +279,6 @@ public void addEntry(Consumer consumer, int stickyHash) {
}
}

/**
* Start a batch operation. There could be multiple nested batch operations.
* The unblocking of sticky key hashes will be done only when the last batch operation ends.
*/
public void startBatch() {
lock.writeLock().lock();
try {
batchLevel++;
} finally {
lock.writeLock().unlock();
}
}

/**
* End a batch operation.
*/
public void endBatch() {
boolean notifyUnblocking = false;
lock.writeLock().lock();
try {
if (--batchLevel == 0 && unblockedWhileBatching) {
unblockedWhileBatching = false;
notifyUnblocking = true;
}
} finally {
lock.writeLock().unlock();
}
// notify unblocking of the hash outside the lock
if (notifyUnblocking) {
unblockingHandler.stickyKeyHashUnblocked(-1);
}
}

/**
* Reduce the reference count for a given sticky hash.
*
Expand All @@ -330,40 +302,38 @@ public void reduceRefCount(Consumer consumer, int stickyHash, boolean closing) {
+ ".");
}
if (entry.decrementRefCount()) {
if (log.isDebugEnabled()) {
log.debug("[{}] Draining hash {} removing consumer id:{} name:{}", dispatcherName, stickyHash,
consumer.consumerId(), consumer.consumerName());
}
removalExecutor.execute(() -> {
if (log.isDebugEnabled()) {
log.debug("[{}] Draining hash {} removing consumer id:{} name:{}", dispatcherName, stickyHash,
consumer.consumerId(), consumer.consumerName());
}

DrainingHashEntry removed;
boolean notifyUnblocking = false;
lock.writeLock().lock();
try {
removed = drainingHashes.remove(stickyHash);
if (!closing && removed.isBlocking()) {
if (batchLevel > 0) {
unblockedWhileBatching = true;
} else {
DrainingHashEntry removed;
boolean notifyUnblocking = false;
lock.writeLock().lock();
try {
removed = drainingHashes.remove(stickyHash);
if (removed.isBlocking()) {
notifyUnblocking = true;
}
} finally {
lock.writeLock().unlock();
}
} finally {
lock.writeLock().unlock();
}

// perform side-effects outside of the lock to reduce chances for deadlocks
// perform side-effects outside of the lock to reduce chances for deadlocks

// update the consumer specific stats
ConsumerDrainingHashesStats drainingHashesStats =
consumerDrainingHashesStatsMap.get(new ConsumerIdentityWrapper(consumer));
if (drainingHashesStats != null) {
drainingHashesStats.clearHash(stickyHash);
}
// update the consumer specific stats
ConsumerDrainingHashesStats drainingHashesStats =
consumerDrainingHashesStatsMap.get(new ConsumerIdentityWrapper(consumer));
if (drainingHashesStats != null) {
drainingHashesStats.clearHash(stickyHash);
}

// notify unblocking of the hash outside the lock
if (notifyUnblocking) {
unblockingHandler.stickyKeyHashUnblocked(stickyHash);
}
// notify unblocking of the hash outside the lock
if (notifyUnblocking) {
unblockingHandler.stickyKeyHashUnblocked(stickyHash);
}
});
} else {
if (log.isDebugEnabled()) {
log.debug("[{}] Draining hash {} decrementing {} consumer id:{} name:{}", dispatcherName,
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,116 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.pulsar.broker.service;

import java.util.Queue;
import java.util.concurrent.Callable;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.Executor;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;

/**
* Executor that runs tasks in the current thread when
* there aren't any critical sections in execution.
* If there's a critical section in execution, the tasks are queued
* and postponed until all critical sections have been exited.
* The tasks are run on the thread that exited the last critical section.
*/
public class OutsideCriticalSectionsExecutor implements Executor {
private final AtomicInteger criticalSectionsCount = new AtomicInteger();
private final Queue<Runnable> queuedTasks = new ConcurrentLinkedQueue<>();
private final ReadWriteLock executionLock = new ReentrantReadWriteLock();

/**
* Executes the given command at some time in the future.
* If there are no critical sections in execution, the command is executed immediately.
* If there are critical sections in execution, the command is queued and executed after all critical sections have
* been exited.
*/
@Override
public void execute(Runnable command) {
executionLock.writeLock().lock();
try {
if (criticalSectionsCount.get() == 0) {
command.run();
} else {
queuedTasks.add(command);
}
} finally {
executionLock.writeLock().unlock();
}
}

/**
* Enters a critical section. This method should be called before entering a critical section.
*/
public void enterCriticalSection() {
executionLock.readLock().lock();
try {
criticalSectionsCount.incrementAndGet();
} finally {
executionLock.readLock().unlock();
}
}

/**
* Exits a critical section. This method should be called after exiting a critical section.
*/
public void exitCriticalSection() {
if (criticalSectionsCount.decrementAndGet() == 0) {
runQueuedTasks();
}
}

/**
* Runs a callable which is a critical section. This method should be used when
* the result of the callable is needed and it should run as a critical section.
*/
public <T> T runCriticalSectionCallable(Callable<T> callable) {
executionLock.readLock().lock();
try {
criticalSectionsCount.incrementAndGet();
return callable.call();
} catch (Exception e) {
if (e instanceof RuntimeException) {
throw (RuntimeException) e;
}
throw new RuntimeException(e);
} finally {
executionLock.readLock().unlock();
exitCriticalSection();
}
}

private void runQueuedTasks() {
executionLock.writeLock().lock();
try {
if (criticalSectionsCount.get() != 0) {
return;
}
Runnable command;
while ((command = queuedTasks.poll()) != null) {
command.run();
}
} finally {
executionLock.writeLock().unlock();
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -335,6 +335,13 @@ public synchronized void readMoreEntries() {
}
return;
}
if (havePendingReplayRead) {
lhotari marked this conversation as resolved.
Show resolved Hide resolved
if (log.isDebugEnabled()) {
log.debug("[{}] [{}] Skipping read for the topic, Due to replay in-progress.", topic.getName(),
getSubscriptionName());
}
return;
}
if (isSendInProgress()) {
// we cannot read more entries while sending the previous batch
// otherwise we could re-read the same entries and send duplicates
Expand Down Expand Up @@ -379,13 +386,23 @@ public synchronized void readMoreEntries() {
long bytesToRead = calculateResult.getRight();

if (messagesToRead == -1 || bytesToRead == -1) {
// Skip read as topic/dispatcher has exceed the dispatch rate or previous pending read hasn't complete.
// Skip read as topic/dispatcher has exceed the dispatch rate
return;
}

Set<Position> messagesToReplayNow =
canReplayMessages() ? getMessagesToReplayNow(messagesToRead) : Collections.emptySet();
if (!messagesToReplayNow.isEmpty()) {
// before replaying, cancel possible pending read that is waiting for more entries
cancelPendingRead();
if (havePendingRead) {
// skip read since a pending read is already in progress which cannot be cancelled
if (log.isDebugEnabled()) {
log.debug("[{}] [{}] Skipping replay read for the topic, Due to pending read in-progress.",
topic.getName(), getSubscriptionName());
}
return;
}
if (log.isDebugEnabled()) {
log.debug("[{}] Schedule replay of {} messages for {} consumers", name,
messagesToReplayNow.size(), consumerList.size());
Expand Down Expand Up @@ -615,13 +632,6 @@ protected Pair<Integer, Long> calculateToRead(int currentTotalAvailablePermits)
}
}

if (havePendingReplayRead) {
if (log.isDebugEnabled()) {
log.debug("[{}] Skipping replay while awaiting previous read to complete", name);
}
return Pair.of(-1, -1L);
}

// If messagesToRead is 0 or less, correct it to 1 to prevent IllegalArgumentException
messagesToRead = Math.max(messagesToRead, 1);
bytesToRead = Math.max(bytesToRead, 1);
Expand Down Expand Up @@ -717,6 +727,12 @@ public SubType getType() {
public final synchronized void readEntriesComplete(List<Entry> entries, Object ctx) {
ReadType readType = (ReadType) ctx;
if (readType == ReadType.Normal) {
if (!havePendingRead) {
log.debug("Discarding read entries as there is no pending read");
entries.forEach(Entry::release);
readMoreEntriesAsync();
return;
}
havePendingRead = false;
} else {
havePendingReplayRead = false;
Expand Down Expand Up @@ -1422,6 +1438,9 @@ public void cursorIsReset() {

protected boolean addMessageToReplay(long ledgerId, long entryId, long stickyKeyHash) {
if (checkIfMessageIsUnacked(ledgerId, entryId)) {
if (log.isDebugEnabled()) {
log.debug("[{}] Adding message to replay for {}:{} hash: {}", name, ledgerId, entryId, stickyKeyHash);
}
redeliveryMessages.add(ledgerId, entryId, stickyKeyHash);
return true;
} else {
Expand Down
Loading
Loading