Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[fix][broker Fix bug in RangeCache where different instance of the key wouldn't ever match #23903

Merged
merged 7 commits into from
Jan 28, 2025
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,39 @@ K getKey() {
return localKey;
}

/**
* Get the value associated with the key. Returns null if the key does not match the key.
*
* @param key the key to match
* @return the value associated with the key, or null if the value has already been recycled or the key does not
* match
*/
V getValue(K key) {
return getValueInternal(key, false);
lhotari marked this conversation as resolved.
Show resolved Hide resolved
}

/**
* Get the value associated with the Map.Entry's key and value. Exact instance of the key is required to match.
* @param entry the entry which contains the key and {@link EntryWrapper} value to get the value from
* @return the value associated with the key, or null if the value has already been recycled or the key does not
* exactly match the same instance
*/
static <K, V> V getValueMatchingMapEntry(Map.Entry<K, EntryWrapper<K, V>> entry) {
return entry.getValue().getValueInternal(entry.getKey(), true);
lhotari marked this conversation as resolved.
Show resolved Hide resolved
}

/**
* Get the value associated with the key. Returns null if the key does not match the key associated with the
* value.
*
* @param key the key to match
* @param requireSameKeyInstance when true, the matching will be restricted to exactly the same instance of the
* key as the one stored in the wrapper. This is used to avoid any races
* when retrieving or removing the entries from the cache when the key and value
* instances are available.
* @return the value associated with the key, or null if the key does not match
*/
private V getValueInternal(K key, boolean requireSameKeyInstance) {
lhotari marked this conversation as resolved.
Show resolved Hide resolved
long stamp = lock.tryOptimisticRead();
K localKey = this.key;
V localValue = this.value;
Expand All @@ -116,7 +148,11 @@ V getValue(K key) {
localValue = this.value;
lock.unlockRead(stamp);
lhotari marked this conversation as resolved.
Show resolved Hide resolved
}
if (localKey != key) {
// check that the given key matches the key associated with the value in the entry
lhotari marked this conversation as resolved.
Show resolved Hide resolved
// this is used to detect if the entry has already been recycled and contains another key
// when requireSameKeyInstance is true, the key must be exactly the same instance as the one stored in the
// entry to match
if (localKey != key && (requireSameKeyInstance || localKey == null || !localKey.equals(key))) {
return null;
}
return localValue;
Expand Down Expand Up @@ -236,34 +272,45 @@ public boolean exists(Key key) {
* The caller is responsible for releasing the reference.
*/
public Value get(Key key) {
return getValue(key, entries.get(key));
return getValueFromWrapper(key, entries.get(key));
}

private Value getValue(Key key, EntryWrapper<Key, Value> valueWrapper) {
private Value getValueFromWrapper(Key key, EntryWrapper<Key, Value> valueWrapper) {
if (valueWrapper == null) {
return null;
} else {
Value value = valueWrapper.getValue(key);
if (value == null) {
// the wrapper has been recycled and contains another key
return null;
}
try {
value.retain();
} catch (IllegalReferenceCountException e) {
// Value was already deallocated
return null;
}
// check that the value matches the key and that there's at least 2 references to it since
// the cache should be holding one reference and a new reference was just added in this method
if (value.refCnt() > 1 && value.matchesKey(key)) {
return value;
} else {
// Value or IdentityWrapper was recycled and already contains another value
// release the reference added in this method
value.release();
return null;
}
return getRetainedValueMatchingKey(key, value);
}
}

private Value getValueMatchingEntry(Map.Entry<Key, EntryWrapper<Key, Value>> entry) {
Value valueMatchingEntry = EntryWrapper.getValueMatchingMapEntry(entry);
return getRetainedValueMatchingKey(entry.getKey(), valueMatchingEntry);
}

// validates that the value matches the key and that the value has not been recycled
// which are possible due to the lack of exclusive locks in the cache and the use of reference counted objects
private Value getRetainedValueMatchingKey(Key key, Value value) {
if (value == null) {
// the wrapper has been recycled and contains another key
return null;
}
try {
value.retain();
} catch (IllegalReferenceCountException e) {
// Value was already deallocated
return null;
}
// check that the value matches the key and that there's at least 2 references to it since
// the cache should be holding one reference and a new reference was just added in this method
if (value.refCnt() > 1 && value.matchesKey(key)) {
return value;
lhotari marked this conversation as resolved.
Show resolved Hide resolved
} else {
// Value or IdentityWrapper was recycled and already contains another value
// release the reference added in this method
value.release();
return null;
}
}

Expand All @@ -280,7 +327,7 @@ public Collection<Value> getRange(Key first, Key last) {

// Return the values of the entries found in cache
for (Map.Entry<Key, EntryWrapper<Key, Value>> entry : entries.subMap(first, true, last, true).entrySet()) {
Value value = getValue(entry.getKey(), entry.getValue());
Value value = getValueMatchingEntry(entry);
if (value != null) {
values.add(value);
}
Expand All @@ -297,6 +344,9 @@ public Collection<Value> getRange(Key first, Key last) {
* @return an pair of ints, containing the number of removed entries and the total size
*/
public Pair<Integer, Long> removeRange(Key first, Key last, boolean lastInclusive) {
if (log.isDebugEnabled()) {
log.debug("Removing entries in range [{}, {}], lastInclusive: {}", first, last, lastInclusive);
}
RemovalCounters counters = RemovalCounters.create();
Map<Key, EntryWrapper<Key, Value>> subMap = entries.subMap(first, true, last, lastInclusive);
for (Map.Entry<Key, EntryWrapper<Key, Value>> entry : subMap.entrySet()) {
Expand All @@ -320,7 +370,7 @@ private RemoveEntryResult removeEntry(Map.Entry<Key, EntryWrapper<Key, Value>> e
boolean skipInvalid, Predicate<Value> removeCondition) {
Key key = entry.getKey();
EntryWrapper<Key, Value> entryWrapper = entry.getValue();
Value value = entryWrapper.getValue(key);
Value value = getValueMatchingEntry(entry);
if (value == null) {
// the wrapper has already been recycled and contains another key
if (!skipInvalid) {
Expand Down Expand Up @@ -404,6 +454,9 @@ private Pair<Integer, Long> handleRemovalResult(RemovalCounters counters) {
* @return a pair containing the number of entries evicted and their total size
*/
public Pair<Integer, Long> evictLeastAccessedEntries(long minSize) {
if (log.isDebugEnabled()) {
log.debug("Evicting entries to reach a minimum size of {}", minSize);
}
checkArgument(minSize > 0);
RemovalCounters counters = RemovalCounters.create();
while (counters.removedSize < minSize && !Thread.currentThread().isInterrupted()) {
Expand All @@ -422,6 +475,9 @@ public Pair<Integer, Long> evictLeastAccessedEntries(long minSize) {
* @return the tota
*/
public Pair<Integer, Long> evictLEntriesBeforeTimestamp(long maxTimestamp) {
if (log.isDebugEnabled()) {
log.debug("Evicting entries with timestamp <= {}", maxTimestamp);
}
RemovalCounters counters = RemovalCounters.create();
while (!Thread.currentThread().isInterrupted()) {
Map.Entry<Key, EntryWrapper<Key, Value>> entry = entries.firstEntry();
Expand Down Expand Up @@ -453,6 +509,9 @@ public long getSize() {
* @return size of removed entries
*/
public Pair<Integer, Long> clear() {
if (log.isDebugEnabled()) {
log.debug("Clearing the cache with {} entries and size {}", entries.size(), size.get());
}
RemovalCounters counters = RemovalCounters.create();
while (!Thread.currentThread().isInterrupted()) {
Map.Entry<Key, EntryWrapper<Key, Value>> entry = entries.firstEntry();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,24 +24,28 @@
import static org.testng.Assert.assertNotNull;
import static org.testng.Assert.assertTrue;
import static org.testng.Assert.fail;

import com.fasterxml.jackson.databind.ObjectMapper;
import io.netty.buffer.ByteBuf;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.List;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.CyclicBarrier;
import java.util.concurrent.Future;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;

import lombok.Cleanup;
import lombok.extern.slf4j.Slf4j;
import org.apache.bookkeeper.client.BookKeeper;
import org.apache.bookkeeper.client.BookKeeperTestClient;
import org.apache.bookkeeper.client.LedgerEntry;
import org.apache.bookkeeper.client.api.DigestType;
import org.apache.bookkeeper.mledger.AsyncCallbacks;
import org.apache.bookkeeper.mledger.AsyncCallbacks.AddEntryCallback;
import org.apache.bookkeeper.mledger.AsyncCallbacks.DeleteCallback;
import org.apache.bookkeeper.mledger.Entry;
Expand All @@ -53,18 +57,17 @@
import org.apache.bookkeeper.mledger.ManagedLedgerFactory;
import org.apache.bookkeeper.mledger.ManagedLedgerFactoryConfig;
import org.apache.bookkeeper.mledger.Position;
import org.apache.bookkeeper.mledger.PositionFactory;
import org.apache.bookkeeper.mledger.impl.cache.EntryCacheManager;
import org.apache.bookkeeper.mledger.proto.MLDataFormats;
import org.apache.bookkeeper.mledger.util.ThrowableToStringUtil;
import org.apache.bookkeeper.test.BookKeeperClusterTestCase;
import org.apache.pulsar.common.policies.data.PersistentOfflineTopicStats;
import org.apache.pulsar.common.util.FutureUtil;
import org.awaitility.Awaitility;
import org.testng.annotations.DataProvider;
import org.testng.annotations.Test;

import io.netty.buffer.ByteBuf;
import lombok.Cleanup;

@Slf4j
public class ManagedLedgerBkTest extends BookKeeperClusterTestCase {

Expand Down Expand Up @@ -241,6 +244,108 @@ public void verifyConcurrentUsage() throws Exception {
assertEquals(factory.getMbean().getNumberOfCacheEvictions(), 0);
}

@Test
public void verifyAsyncReadEntryUsingCache() throws Exception {
ManagedLedgerFactoryConfig config = new ManagedLedgerFactoryConfig();

config.setMaxCacheSize(100 * 1024 * 1024);
config.setCacheEvictionTimeThresholdMillis(10000);
config.setCacheEvictionIntervalMs(10000);

@Cleanup("shutdown")
ManagedLedgerFactoryImpl factory = new ManagedLedgerFactoryImpl(metadataStore, bkc, config);

ManagedLedgerConfig conf = new ManagedLedgerConfig();
conf.setEnsembleSize(2).setAckQuorumSize(2).setMetadataEnsembleSize(2)
.setRetentionSizeInMB(-1).setRetentionTime(-1, TimeUnit.MILLISECONDS);
final ManagedLedgerImpl ledger = (ManagedLedgerImpl) factory.open("my-ledger" + testName, conf);

int NumProducers = 5;
int NumConsumers = 10;

final AtomicBoolean done = new AtomicBoolean();
final CyclicBarrier barrier = new CyclicBarrier(NumProducers + NumConsumers + 1);

List<Future<?>> futures = new ArrayList();
List<Position> positions = new CopyOnWriteArrayList<>();

for (int i = 0; i < NumProducers; i++) {
futures.add(executor.submit(() -> {
try {
// wait for all threads to be ready to start at once
barrier.await();
while (!done.get()) {
Position position = ledger.addEntry("entry".getBytes());
positions.add(position);
Thread.sleep(1);
}
} catch (Exception e) {
e.printStackTrace();
throw FutureUtil.wrapToCompletionException(e);
}
}));
}

// create a dummy cursor since caching happens only when there are active consumers
ManagedCursor cursor = ledger.openCursor("dummy");

for (int i = 0; i < NumConsumers; i++) {
futures.add(executor.submit(() -> {
try {
// wait for all threads to be ready to start at once
barrier.await();
while (!done.get()) {
if (positions.isEmpty()) {
Thread.sleep(1);
continue;
}
// Simulate a replay queue read pattern where individual entries are read
Position randomPosition = positions.get(ThreadLocalRandom.current().nextInt(positions.size()));
// Clone the original instance so that another instance is used in the asyncReadEntry call
// This is to test that keys are compared by .equals and not by reference under the covers
randomPosition = PositionFactory.create(randomPosition);
CompletableFuture<Void> future = new CompletableFuture<>();
ledger.asyncReadEntry(randomPosition, new AsyncCallbacks.ReadEntryCallback() {
@Override
public void readEntryComplete(Entry entry, Object ctx) {
entry.release();
future.complete(null);
}

@Override
public void readEntryFailed(ManagedLedgerException exception, Object ctx) {
future.completeExceptionally(exception);
}
}, null);
future.get();
Thread.sleep(2);
}
} catch (Exception e) {
e.printStackTrace();
throw FutureUtil.wrapToCompletionException(e);
}
}));
}

// trigger all worker threads at once to continue from the barrier
barrier.await();

int testDurationSeconds = 3;
Thread.sleep(testDurationSeconds * 1000);

done.set(true);
for (Future<?> future : futures) {
future.get();
}

factory.getMbean().refreshStats(testDurationSeconds, TimeUnit.SECONDS);

assertTrue(factory.getMbean().getCacheHitsRate() > 0.0);
assertEquals(factory.getMbean().getCacheMissesRate(), 0.0);
assertTrue(factory.getMbean().getCacheHitsThroughput() > 0.0);
assertEquals(factory.getMbean().getNumberOfCacheEvictions(), 0);
}

@Test
public void testSimple() throws Exception {
@Cleanup("shutdown")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@

import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertFalse;
import static org.testng.Assert.assertNotSame;
import static org.testng.Assert.assertNull;
import static org.testng.Assert.assertTrue;
import static org.testng.Assert.fail;
Expand Down Expand Up @@ -338,4 +339,20 @@ public void testRemoveEntryWithInvalidMatchingKey() {
cache.clear();
assertEquals(cache.getNumberOfEntries(), 0);
}
}

@Test
public void testGetKeyWithDifferentInstance() {
RangeCache<Integer, RefString> cache = new RangeCache<>();
Integer key = 129;
cache.put(key, new RefString("129"));
// create a different instance of the key
Integer key2 = Integer.valueOf(129);
// key and key2 are different instances but they are equal
assertNotSame(key, key2);
assertEquals(key, key2);
// get the value using key2
RefString s = cache.get(key2);
// the value should be found
assertEquals(s.s, "129");
}
}
Loading
Loading