-
Notifications
You must be signed in to change notification settings - Fork 3.6k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[fix][broker Fix bug in RangeCache where different instance of the key wouldn't ever match #23903
[fix][broker Fix bug in RangeCache where different instance of the key wouldn't ever match #23903
Conversation
Codecov ReportAttention: Patch coverage is
Additional details and impacted files@@ Coverage Diff @@
## master #23903 +/- ##
============================================
+ Coverage 73.57% 74.18% +0.60%
+ Complexity 32624 31835 -789
============================================
Files 1877 1854 -23
Lines 139502 143653 +4151
Branches 15299 16321 +1022
============================================
+ Hits 102638 106568 +3930
+ Misses 28908 28701 -207
- Partials 7956 8384 +428
Flags with carried forward coverage won't be shown. Click here to find out more.
|
For preventing future regressions like #23900, it would be necessary to add specific tests for that purpose, to ensure that replay queue reads don't cause cache misses in a tailing read scenario. The regression is specific to tailing reads. In the current broker cache implementation, catchup reads will result in cache misses for replay queue reads. This behavior is not a regression. This has a performance impact at least on Key_Shared subscriptions where it's common that entries get added to the replay queue. It's a future improvement that I'm planning to address that there would be caching also for that type of scenario. Since we don't have currently sufficient test coverage for various scenarios, regressions like #23900 can happen. |
managed-ledger/src/main/java/org/apache/bookkeeper/mledger/util/RangeCache.java
Outdated
Show resolved
Hide resolved
managed-ledger/src/main/java/org/apache/bookkeeper/mledger/util/RangeCache.java
Show resolved
Hide resolved
managed-ledger/src/main/java/org/apache/bookkeeper/mledger/util/RangeCache.java
Show resolved
Hide resolved
managed-ledger/src/main/java/org/apache/bookkeeper/mledger/util/RangeCache.java
Show resolved
Hide resolved
managed-ledger/src/main/java/org/apache/bookkeeper/mledger/util/RangeCache.java
Show resolved
Hide resolved
managed-ledger/src/main/java/org/apache/bookkeeper/mledger/util/RangeCache.java
Show resolved
Hide resolved
managed-ledger/src/main/java/org/apache/bookkeeper/mledger/util/RangeCache.java
Show resolved
Hide resolved
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Lgtm
package org.apache.bookkeeper.mledger.util;
import io.netty.util.Recycler;
import java.util.Collections;
import java.util.concurrent.locks.StampedLock;
import org.testng.annotations.Test;
import java.lang.management.GarbageCollectorMXBean;
import java.lang.management.ManagementFactory;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.*;
public class EntryWrapperPerformanceTestNG {
private static class EntryWrapper<K, V> {
private final Recycler.Handle<EntryWrapper> recyclerHandle;
private static final Recycler<EntryWrapper> RECYCLER = new Recycler<EntryWrapper>() {
@Override
protected EntryWrapper newObject(Handle<EntryWrapper> recyclerHandle) {
return new EntryWrapper(recyclerHandle);
}
};
private final StampedLock lock = new StampedLock();
private K key;
private V value;
long size;
private EntryWrapper(Recycler.Handle<EntryWrapper> recyclerHandle) {
this.recyclerHandle = recyclerHandle;
}
static <K, V> EntryWrapper<K, V> create(K key, V value, long size) {
EntryWrapper<K, V> entryWrapper = RECYCLER.get();
long stamp = entryWrapper.lock.writeLock();
entryWrapper.key = key;
entryWrapper.value = value;
entryWrapper.size = size;
entryWrapper.lock.unlockWrite(stamp);
return entryWrapper;
}
K getKey() {
long stamp = lock.tryOptimisticRead();
K localKey = key;
if (!lock.validate(stamp)) {
stamp = lock.readLock();
localKey = key;
lock.unlockRead(stamp);
}
return localKey;
}
V getValue(K key) {
long stamp = lock.tryOptimisticRead();
K localKey = this.key;
V localValue = this.value;
if (!lock.validate(stamp)) {
stamp = lock.readLock();
localKey = this.key;
localValue = this.value;
lock.unlockRead(stamp);
}
if (localKey != key) {
return null;
}
return localValue;
}
long getSize() {
long stamp = lock.tryOptimisticRead();
long localSize = size;
if (!lock.validate(stamp)) {
stamp = lock.readLock();
localSize = size;
lock.unlockRead(stamp);
}
return localSize;
}
void recycle() {
key = null;
value = null;
size = 0;
recyclerHandle.recycle(this);
}
}
private static final int ITERATIONS = 1_000_000;
private static final int THREAD_COUNT = 10; // Simulating 10 concurrent threads
private static final int TEST_RUNS = 100; // Run each test 10 times
private long getGCCount() {
long count = 0;
for (GarbageCollectorMXBean gcBean : ManagementFactory.getGarbageCollectorMXBeans()) {
long gcCount = gcBean.getCollectionCount();
if (gcCount != -1) {
count += gcCount;
}
}
return count;
}
private long getUsedMemory() {
System.gc(); // Suggest GC before measuring memory usage
return Runtime.getRuntime().totalMemory() - Runtime.getRuntime().freeMemory();
}
private void runTest(String testName, Runnable testLogic) {
List<Long> times = new ArrayList<>();
List<Long> gcCounts = new ArrayList<>();
List<Long> memoryUsages = new ArrayList<>();
for (int i = 0; i < TEST_RUNS; i++) {
long startGC = getGCCount();
long startMemory = getUsedMemory();
long startTime = System.nanoTime();
testLogic.run();
long elapsedTime = System.nanoTime() - startTime;
long endGC = getGCCount();
long endMemory = getUsedMemory();
times.add(elapsedTime);
gcCounts.add(endGC - startGC);
memoryUsages.add(endMemory - startMemory);
System.out.println(testName + " - Run " + (i + 1) + " - Time: " + TimeUnit.NANOSECONDS.toMillis(elapsedTime) + " ms, GC: " + (endGC - startGC) + ", Memory: " + (endMemory - startMemory) + " bytes");
}
printStatistics(testName, times, gcCounts, memoryUsages);
}
private void printStatistics(String testName, List<Long> times, List<Long> gcCounts, List<Long> memoryUsages) {
System.out.println("========= " + testName + " Summary =========");
System.out.println("Average Time: " + TimeUnit.NANOSECONDS.toMillis(average(times)) + " ms");
System.out.println("Median Time: " + TimeUnit.NANOSECONDS.toMillis(median(times)) + " ms");
System.out.println("P99 Time: " + TimeUnit.NANOSECONDS.toMillis(percentile(times, 99)) + " ms");
System.out.println("Max Time: " + TimeUnit.NANOSECONDS.toMillis(Collections.max(times)) + " ms");
System.out.println("Average GC: " + average(gcCounts));
System.out.println("Median GC: " + median(gcCounts));
System.out.println("P99 GC: " + percentile(gcCounts, 99));
System.out.println("Max GC: " + Collections.max(gcCounts));
System.out.println("Average Memory: " + average(memoryUsages) + " bytes");
System.out.println("Median Memory: " + median(memoryUsages) + " bytes");
System.out.println("P99 Memory: " + percentile(memoryUsages, 99) + " bytes");
System.out.println("Max Memory: " + Collections.max(memoryUsages) + " bytes");
System.out.println("====================================");
}
private long average(List<Long> values) {
return values.stream().mapToLong(Long::longValue).sum() / values.size();
}
private long median(List<Long> values) {
List<Long> sorted = new ArrayList<>(values);
Collections.sort(sorted);
int middle = sorted.size() / 2;
return (sorted.size() % 2 == 0) ? (sorted.get(middle - 1) + sorted.get(middle)) / 2 : sorted.get(middle);
}
private long percentile(List<Long> values, int percentile) {
List<Long> sorted = new ArrayList<>(values);
Collections.sort(sorted);
int index = (int) Math.ceil(percentile / 100.0 * sorted.size()) - 1;
return sorted.get(Math.max(0, Math.min(index, sorted.size() - 1)));
}
@Test
public void testObjectPoolingPerformance() {
runTest("Object Pooling", () -> {
for (int i = 0; i < ITERATIONS; i++) {
EntryWrapper<Integer, String> entry = EntryWrapper.create(i, "Value" + i, i);
entry.recycle();
}
});
}
@Test
public void testNewObjectPerformance() {
runTest("New Object Creation", () -> {
for (int i = 0; i < ITERATIONS; i++) {
var str = new String("Value" + i); // Creates a new String object each time
}
});
}
@Test
public void testConcurrentObjectPooling() {
runTest("Concurrent Pooling", () -> {
ExecutorService executor = Executors.newFixedThreadPool(THREAD_COUNT);
List<Future<?>> futures = new ArrayList<>();
for (int t = 0; t < THREAD_COUNT; t++) {
futures.add(executor.submit(() -> {
for (int i = 0; i < ITERATIONS / THREAD_COUNT; i++) {
EntryWrapper<Integer, String> entry = EntryWrapper.create(i, "Value" + i, i);
entry.recycle();
}
}));
}
futures.forEach(f -> {
try {
f.get();
} catch (Exception ignored) {}
});
executor.shutdown();
});
}
@Test
public void testConcurrentNewObjectCreation() {
runTest("Concurrent New Object", () -> {
ExecutorService executor = Executors.newFixedThreadPool(THREAD_COUNT);
List<Future<?>> futures = new ArrayList<>();
for (int t = 0; t < THREAD_COUNT; t++) {
futures.add(executor.submit(() -> {
//List<String> strings = new ArrayList<>();
for (int i = 0; i < ITERATIONS / THREAD_COUNT; i++) {
var str = new String("Value" + i); // Creates a new String object each time
}
//strings.clear();
}));
}
futures.forEach(f -> {
try {
f.get();
} catch (Exception ignored) {}
});
executor.shutdown();
});
}
}
Sharing some test result: object pooling vs jvm young gc performance test |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
+1
Thank you @heesung-sn for conducting these performance tests comparing object pooling and JVM young generation garbage collection. While the initial results are interesting, I have some concerns about the reliability of manual benchmarking on the JVM. Microbenchmarks are particularly susceptible to JVM optimizations and warmup effects that can skew results. I'd strongly recommend using JMH (Java Microbenchmark Harness) for this type of analysis, as it:
The good news is that we already have JMH configured in our project at https://github.com/apache/pulsar/tree/master/microbench, so adding new benchmarks should be straightforward. Some specific suggestions for the next round of testing:
Once we have these JMH results, we can make a data-driven decision about whether to remove the recycled objects usage from RangeCache entirely. Let me know if you'd like help setting up these JMH benchmarks. |
…y wouldn't ever match (apache#23903) (cherry picked from commit b6cfecc) (cherry picked from commit c11ebe1)
…y wouldn't ever match (apache#23903) (cherry picked from commit b6cfecc) (cherry picked from commit c11ebe1)
Fixes #23900
Motivation
During work on PR #23901, a severe issue was discovered in the RangeCache implementation. Cache lookups consistently fail when the key used to look up an entry differs from the instance used to put the entry into the cache.
One detail of the current broker cache implementation is that it is necessary to configure
cacheEvictionByMarkDeletedPosition=true
so that cached entries don't get evicted by the read position in the managed ledger.This problematic solution was introduced in PR #22789, which aimed to address race conditions in the RangeCache and prevent the use of already recycled object instances. The solution underwent subsequent refactoring in PRs #22814 and #22818. While the value wrapper solution ensures consistency between cached entries and their corresponding keys, the original design primarily focused on the
getRange
andremoveRange
methods.In these method implementations, the ConcurrentSkipListMap's subMap method ensures the original key remains available, preventing the lookup problem from manifesting. This explains why the issue remained undetected for so long, as caching functions correctly in most scenarios.
The impact primarily affects single reads, such as initial readings of replay queue entries. Single reads have never added entries to the cache, a bug originally reported as part of #23504 and subsequently moved to #23900.
Issue #23900 represents a regression.
Prior to the changes introduced in PRs #22789, #22814, and #22818, replay queue entries might have been cached due to previous reads adding entries to the cache. However, the functionality to add replay queue reads to the cache has never been implemented.
Modifications
Documentation
doc
doc-required
doc-not-needed
doc-complete