Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[fix][broker Fix bug in RangeCache where different instance of the key wouldn't ever match #23903

Merged
merged 7 commits into from
Jan 28, 2025

Conversation

lhotari
Copy link
Member

@lhotari lhotari commented Jan 28, 2025

Fixes #23900

Motivation

During work on PR #23901, a severe issue was discovered in the RangeCache implementation. Cache lookups consistently fail when the key used to look up an entry differs from the instance used to put the entry into the cache.

One detail of the current broker cache implementation is that it is necessary to configure cacheEvictionByMarkDeletedPosition=true so that cached entries don't get evicted by the read position in the managed ledger.

This problematic solution was introduced in PR #22789, which aimed to address race conditions in the RangeCache and prevent the use of already recycled object instances. The solution underwent subsequent refactoring in PRs #22814 and #22818. While the value wrapper solution ensures consistency between cached entries and their corresponding keys, the original design primarily focused on the getRange and removeRange methods.

In these method implementations, the ConcurrentSkipListMap's subMap method ensures the original key remains available, preventing the lookup problem from manifesting. This explains why the issue remained undetected for so long, as caching functions correctly in most scenarios.

The impact primarily affects single reads, such as initial readings of replay queue entries. Single reads have never added entries to the cache, a bug originally reported as part of #23504 and subsequently moved to #23900.

Issue #23900 represents a regression.
Prior to the changes introduced in PRs #22789, #22814, and #22818, replay queue entries might have been cached due to previous reads adding entries to the cache. However, the functionality to add replay queue reads to the cache has never been implemented.

Modifications

  • add unit test to verify that an entry can be looked up with a different key instance
  • fix the issue in RangeCache implementation

Documentation

  • doc
  • doc-required
  • doc-not-needed
  • doc-complete

@lhotari lhotari added this to the 4.1.0 milestone Jan 28, 2025
@lhotari lhotari self-assigned this Jan 28, 2025
@github-actions github-actions bot added the doc-not-needed Your PR changes do not impact docs label Jan 28, 2025
@lhotari lhotari requested a review from heesung-sn January 28, 2025 01:08
@codecov-commenter
Copy link

codecov-commenter commented Jan 28, 2025

Codecov Report

Attention: Patch coverage is 30.76923% with 9 lines in your changes missing coverage. Please review.

Project coverage is 74.18%. Comparing base (bbc6224) to head (c67c02c).
Report is 872 commits behind head on master.

Files with missing lines Patch % Lines
...org/apache/bookkeeper/mledger/util/RangeCache.java 30.76% 4 Missing and 5 partials ⚠️
Additional details and impacted files

Impacted file tree graph

@@             Coverage Diff              @@
##             master   #23903      +/-   ##
============================================
+ Coverage     73.57%   74.18%   +0.60%     
+ Complexity    32624    31835     -789     
============================================
  Files          1877     1854      -23     
  Lines        139502   143653    +4151     
  Branches      15299    16321    +1022     
============================================
+ Hits         102638   106568    +3930     
+ Misses        28908    28701     -207     
- Partials       7956     8384     +428     
Flag Coverage Δ
inttests 26.68% <30.76%> (+2.10%) ⬆️
systests 23.11% <30.76%> (-1.21%) ⬇️
unittests 73.69% <30.76%> (+0.85%) ⬆️

Flags with carried forward coverage won't be shown. Click here to find out more.

Files with missing lines Coverage Δ
...org/apache/bookkeeper/mledger/util/RangeCache.java 75.12% <30.76%> (-20.28%) ⬇️

... and 1033 files with indirect coverage changes

@lhotari
Copy link
Member Author

lhotari commented Jan 28, 2025

For preventing future regressions like #23900, it would be necessary to add specific tests for that purpose, to ensure that replay queue reads don't cause cache misses in a tailing read scenario. The regression is specific to tailing reads.

In the current broker cache implementation, catchup reads will result in cache misses for replay queue reads. This behavior is not a regression. This has a performance impact at least on Key_Shared subscriptions where it's common that entries get added to the replay queue. It's a future improvement that I'm planning to address that there would be caching also for that type of scenario.

Since we don't have currently sufficient test coverage for various scenarios, regressions like #23900 can happen.
The goal of recent test framework improvements such as #23875 and #23892 are to make it easier to write tests also for this type of scenarios.

@lhotari lhotari requested a review from heesung-sn January 28, 2025 18:04
@lhotari lhotari merged commit b6cfecc into apache:master Jan 28, 2025
54 checks passed
Copy link
Contributor

@heesung-sn heesung-sn left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Lgtm

lhotari added a commit that referenced this pull request Jan 28, 2025
…y wouldn't ever match (#23903)

(cherry picked from commit b6cfecc)
lhotari added a commit that referenced this pull request Jan 29, 2025
…y wouldn't ever match (#23903)

(cherry picked from commit b6cfecc)
lhotari added a commit that referenced this pull request Jan 29, 2025
…y wouldn't ever match (#23903)

(cherry picked from commit b6cfecc)
@heesung-sn
Copy link
Contributor

package org.apache.bookkeeper.mledger.util;

import io.netty.util.Recycler;
import java.util.Collections;
import java.util.concurrent.locks.StampedLock;
import org.testng.annotations.Test;
import java.lang.management.GarbageCollectorMXBean;
import java.lang.management.ManagementFactory;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.*;


public class EntryWrapperPerformanceTestNG {

    private static class EntryWrapper<K, V> {
        private final Recycler.Handle<EntryWrapper> recyclerHandle;
        private static final Recycler<EntryWrapper> RECYCLER = new Recycler<EntryWrapper>() {
            @Override
            protected EntryWrapper newObject(Handle<EntryWrapper> recyclerHandle) {
                return new EntryWrapper(recyclerHandle);
            }
        };
        private final StampedLock lock = new StampedLock();
        private K key;
        private V value;
        long size;

        private EntryWrapper(Recycler.Handle<EntryWrapper> recyclerHandle) {
            this.recyclerHandle = recyclerHandle;
        }

        static <K, V> EntryWrapper<K, V> create(K key, V value, long size) {
            EntryWrapper<K, V> entryWrapper = RECYCLER.get();
            long stamp = entryWrapper.lock.writeLock();
            entryWrapper.key = key;
            entryWrapper.value = value;
            entryWrapper.size = size;
            entryWrapper.lock.unlockWrite(stamp);
            return entryWrapper;
        }

        K getKey() {
            long stamp = lock.tryOptimisticRead();
            K localKey = key;
            if (!lock.validate(stamp)) {
                stamp = lock.readLock();
                localKey = key;
                lock.unlockRead(stamp);
            }
            return localKey;
        }

        V getValue(K key) {
            long stamp = lock.tryOptimisticRead();
            K localKey = this.key;
            V localValue = this.value;
            if (!lock.validate(stamp)) {
                stamp = lock.readLock();
                localKey = this.key;
                localValue = this.value;
                lock.unlockRead(stamp);
            }
            if (localKey != key) {
                return null;
            }
            return localValue;
        }

        long getSize() {
            long stamp = lock.tryOptimisticRead();
            long localSize = size;
            if (!lock.validate(stamp)) {
                stamp = lock.readLock();
                localSize = size;
                lock.unlockRead(stamp);
            }
            return localSize;
        }

        void recycle() {
            key = null;
            value = null;
            size = 0;
            recyclerHandle.recycle(this);
        }
    }


    private static final int ITERATIONS = 1_000_000;
    private static final int THREAD_COUNT = 10; // Simulating 10 concurrent threads
    private static final int TEST_RUNS = 100; // Run each test 10 times

    private long getGCCount() {
        long count = 0;
        for (GarbageCollectorMXBean gcBean : ManagementFactory.getGarbageCollectorMXBeans()) {
            long gcCount = gcBean.getCollectionCount();
            if (gcCount != -1) {
                count += gcCount;
            }
        }
        return count;
    }

    private long getUsedMemory() {
        System.gc(); // Suggest GC before measuring memory usage
        return Runtime.getRuntime().totalMemory() - Runtime.getRuntime().freeMemory();
    }

    private void runTest(String testName, Runnable testLogic) {
        List<Long> times = new ArrayList<>();
        List<Long> gcCounts = new ArrayList<>();
        List<Long> memoryUsages = new ArrayList<>();

        for (int i = 0; i < TEST_RUNS; i++) {
            long startGC = getGCCount();
            long startMemory = getUsedMemory();
            long startTime = System.nanoTime();

            testLogic.run();

            long elapsedTime = System.nanoTime() - startTime;
            long endGC = getGCCount();
            long endMemory = getUsedMemory();

            times.add(elapsedTime);
            gcCounts.add(endGC - startGC);
            memoryUsages.add(endMemory - startMemory);

            System.out.println(testName + " - Run " + (i + 1) + " - Time: " + TimeUnit.NANOSECONDS.toMillis(elapsedTime) + " ms, GC: " + (endGC - startGC) + ", Memory: " + (endMemory - startMemory) + " bytes");
        }

        printStatistics(testName, times, gcCounts, memoryUsages);
    }

    private void printStatistics(String testName, List<Long> times, List<Long> gcCounts, List<Long> memoryUsages) {
        System.out.println("========= " + testName + " Summary =========");
        System.out.println("Average Time: " + TimeUnit.NANOSECONDS.toMillis(average(times)) + " ms");
        System.out.println("Median Time: " + TimeUnit.NANOSECONDS.toMillis(median(times)) + " ms");
        System.out.println("P99 Time: " + TimeUnit.NANOSECONDS.toMillis(percentile(times, 99)) + " ms");
        System.out.println("Max Time: " + TimeUnit.NANOSECONDS.toMillis(Collections.max(times)) + " ms");

        System.out.println("Average GC: " + average(gcCounts));
        System.out.println("Median GC: " + median(gcCounts));
        System.out.println("P99 GC: " + percentile(gcCounts, 99));
        System.out.println("Max GC: " + Collections.max(gcCounts));

        System.out.println("Average Memory: " + average(memoryUsages) + " bytes");
        System.out.println("Median Memory: " + median(memoryUsages) + " bytes");
        System.out.println("P99 Memory: " + percentile(memoryUsages, 99) + " bytes");
        System.out.println("Max Memory: " + Collections.max(memoryUsages) + " bytes");
        System.out.println("====================================");
    }

    private long average(List<Long> values) {
        return values.stream().mapToLong(Long::longValue).sum() / values.size();
    }

    private long median(List<Long> values) {
        List<Long> sorted = new ArrayList<>(values);
        Collections.sort(sorted);
        int middle = sorted.size() / 2;
        return (sorted.size() % 2 == 0) ? (sorted.get(middle - 1) + sorted.get(middle)) / 2 : sorted.get(middle);
    }

    private long percentile(List<Long> values, int percentile) {
        List<Long> sorted = new ArrayList<>(values);
        Collections.sort(sorted);
        int index = (int) Math.ceil(percentile / 100.0 * sorted.size()) - 1;
        return sorted.get(Math.max(0, Math.min(index, sorted.size() - 1)));
    }

    @Test
    public void testObjectPoolingPerformance() {
        runTest("Object Pooling", () -> {
            for (int i = 0; i < ITERATIONS; i++) {
                EntryWrapper<Integer, String> entry = EntryWrapper.create(i, "Value" + i, i);
                entry.recycle();
            }
        });
    }

    @Test
    public void testNewObjectPerformance() {
        runTest("New Object Creation", () -> {
            for (int i = 0; i < ITERATIONS; i++) {
                var str = new String("Value" + i); // Creates a new String object each time
            }
        });
    }

    @Test
    public void testConcurrentObjectPooling() {
        runTest("Concurrent Pooling", () -> {
            ExecutorService executor = Executors.newFixedThreadPool(THREAD_COUNT);
            List<Future<?>> futures = new ArrayList<>();

            for (int t = 0; t < THREAD_COUNT; t++) {
                futures.add(executor.submit(() -> {
                    for (int i = 0; i < ITERATIONS / THREAD_COUNT; i++) {
                        EntryWrapper<Integer, String> entry = EntryWrapper.create(i, "Value" + i, i);
                        entry.recycle();
                    }
                }));
            }

            futures.forEach(f -> {
                try {
                    f.get();
                } catch (Exception ignored) {}
            });

            executor.shutdown();
        });
    }

    @Test
    public void testConcurrentNewObjectCreation() {
        runTest("Concurrent New Object", () -> {
            ExecutorService executor = Executors.newFixedThreadPool(THREAD_COUNT);
            List<Future<?>> futures = new ArrayList<>();

            for (int t = 0; t < THREAD_COUNT; t++) {
                futures.add(executor.submit(() -> {
                    //List<String> strings = new ArrayList<>();
                    for (int i = 0; i < ITERATIONS / THREAD_COUNT; i++) {
                        var str = new String("Value" + i); // Creates a new String object each time
                    }
                    //strings.clear();
                }));
            }

            futures.forEach(f -> {
                try {
                    f.get();
                } catch (Exception ignored) {}
            });

            executor.shutdown();
        });
    }
}
========= Concurrent New Object Summary =========
Average Time: 23 ms
Median Time: 22 ms
P99 Time: 32 ms
Max Time: 255 ms
Average GC: 4
Median GC: 4
P99 GC: 4
Max GC: 4
Average Memory: -20971 bytes
Median Memory: 0 bytes
P99 Memory: 0 bytes
Max Memory: 0 bytes
====================================

========= Concurrent Pooling Summary =========
Average Time: 40 ms
Median Time: 36 ms
P99 Time: 77 ms
Max Time: 439 ms
Average GC: 4
Median GC: 4
P99 GC: 4
Max GC: 4
Average Memory: -713031 bytes
Median Memory: 0 bytes
P99 Memory: 2097152 bytes
Max Memory: 2097152 bytes
====================================


========= New Object Creation Summary =========
Average Time: 10 ms
Median Time: 10 ms
P99 Time: 22 ms
Max Time: 40 ms
Average GC: 4
Median GC: 4
P99 GC: 4
Max GC: 4
Average Memory: -524288 bytes
Median Memory: 0 bytes
P99 Memory: 2097152 bytes
Max Memory: 2097152 bytes
====================================


========= Object Pooling Summary =========
Average Time: 54 ms
Median Time: 53 ms
P99 Time: 66 ms
Max Time: 92 ms
Average GC: 4
Median GC: 4
P99 GC: 4
Max GC: 4
Average Memory: -796917 bytes
Median Memory: 0 bytes
P99 Memory: 0 bytes
Max Memory: 2097152 bytes
====================================

Sharing some test result: object pooling vs jvm young gc performance test

Copy link
Contributor

@eolivelli eolivelli left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1

@lhotari
Copy link
Member Author

lhotari commented Jan 29, 2025

Sharing some test result: object pooling vs jvm young gc performance test

Thank you @heesung-sn for conducting these performance tests comparing object pooling and JVM young generation garbage collection. While the initial results are interesting, I have some concerns about the reliability of manual benchmarking on the JVM. Microbenchmarks are particularly susceptible to JVM optimizations and warmup effects that can skew results. I'd strongly recommend using JMH (Java Microbenchmark Harness) for this type of analysis, as it:

  • Handles JVM warmup properly
  • Provides detailed GC statistics
  • Accounts for common benchmarking pitfalls
  • Offers consistent methodology

The good news is that we already have JMH configured in our project at https://github.com/apache/pulsar/tree/master/microbench, so adding new benchmarks should be straightforward.

Some specific suggestions for the next round of testing:

  • Create comparative benchmarks for RangeCache with and without object recycling
  • Include tests with both G1GC and ZGC ZGenerational on Java 21 to understand performance across different garbage collectors

Once we have these JMH results, we can make a data-driven decision about whether to remove the recycled objects usage from RangeCache entirely. Let me know if you'd like help setting up these JMH benchmarks.

nikhil-ctds pushed a commit to datastax/pulsar that referenced this pull request Jan 31, 2025
…y wouldn't ever match (apache#23903)

(cherry picked from commit b6cfecc)

(cherry picked from commit c11ebe1)
srinath-ctds pushed a commit to datastax/pulsar that referenced this pull request Feb 3, 2025
…y wouldn't ever match (apache#23903)

(cherry picked from commit b6cfecc)

(cherry picked from commit c11ebe1)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
5 participants