Skip to content

Commit

Permalink
Optimize PGBK table to only update cache when there is a large enough…
Browse files Browse the repository at this point in the history
… size change. #21250 (#25219)

* Optimize PGBK table to only update cache when there is a large enough size change. #21250

This prevents an expensive scenario where a user is outputting lots of small values (e.g. ints) to be precombined and hence takes little to no space to store so updating the cache provides little value.

Note the 5-10x change for all types except for unique keys. Some early profiles show that there is an issue with the G1 garbage collector when storing so many small values that the GC management overhead dominates 75% of the execution which requires further investigation.

Before:
```
Benchmark                                                 (distribution)  (globallyWindowed)   Mode  Cnt   Score   Error  Units
PrecombineGroupingTableBenchmark.sumIntegerBinaryCombine         uniform                true  thrpt    5   8.306 ± 1.255  ops/s
PrecombineGroupingTableBenchmark.sumIntegerBinaryCombine         uniform               false  thrpt    5   7.849 ± 0.476  ops/s
PrecombineGroupingTableBenchmark.sumIntegerBinaryCombine          normal                true  thrpt    5  10.575 ± 1.295  ops/s
PrecombineGroupingTableBenchmark.sumIntegerBinaryCombine          normal               false  thrpt    5  10.772 ± 0.141  ops/s
PrecombineGroupingTableBenchmark.sumIntegerBinaryCombine          hotKey                true  thrpt    5   9.131 ± 2.761  ops/s
PrecombineGroupingTableBenchmark.sumIntegerBinaryCombine          hotKey               false  thrpt    5   8.302 ± 1.078  ops/s
PrecombineGroupingTableBenchmark.sumIntegerBinaryCombine      uniqueKeys                true  thrpt    5   3.899 ± 1.737  ops/s
PrecombineGroupingTableBenchmark.sumIntegerBinaryCombine      uniqueKeys               false  thrpt    5   4.203 ± 2.170  ops/s

```

After:
```
Benchmark                                                 (distribution)  (globallyWindowed)   Mode  Cnt   Score   Error  Units
PrecombineGroupingTableBenchmark.sumIntegerBinaryCombine         uniform                true  thrpt    5  88.740 ± 8.925  ops/s
PrecombineGroupingTableBenchmark.sumIntegerBinaryCombine         uniform               false  thrpt    5  76.005 ± 5.150  ops/s
PrecombineGroupingTableBenchmark.sumIntegerBinaryCombine          normal                true  thrpt    5  43.388 ± 1.966  ops/s
PrecombineGroupingTableBenchmark.sumIntegerBinaryCombine          normal               false  thrpt    5  37.804 ± 7.177  ops/s
PrecombineGroupingTableBenchmark.sumIntegerBinaryCombine          hotKey                true  thrpt    5  84.881 ± 5.040  ops/s
PrecombineGroupingTableBenchmark.sumIntegerBinaryCombine          hotKey               false  thrpt    5  74.183 ± 2.063  ops/s
PrecombineGroupingTableBenchmark.sumIntegerBinaryCombine      uniqueKeys                true  thrpt    5   5.567 ± 4.068  ops/s
PrecombineGroupingTableBenchmark.sumIntegerBinaryCombine      uniqueKeys               false  thrpt    5   6.957 ± 1.508  ops/s
```
  • Loading branch information
lukecwik authored Feb 3, 2023
1 parent b1c9d8a commit c65e777
Show file tree
Hide file tree
Showing 5 changed files with 117 additions and 58 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -1553,7 +1553,7 @@ class BeamModulePlugin implements Plugin<Project> {
if (project.file("/opt/cprof/profiler_java_agent.so").exists()) {
def gcpProject = project.findProperty('gcpProject') ?: 'apache-beam-testing'
def userName = System.getProperty("user.name").toLowerCase().replaceAll(" ", "_")
jvmArgs '-agentpath:/opt/cprof/profiler_java_agent.so=-cprof_service=' + userName + "_" + project.getProperty("benchmark").toLowerCase() + '_' + System.currentTimeMillis() + ',-cprof_project_id=' + gcpProject + ',-cprof_zone_name=us-central1-a'
jvmArgs '-agentpath:/opt/cprof/profiler_java_agent.so=-cprof_service=' + userName + "_" + project.getProperty("benchmark").toLowerCase() + '_' + String.format('%1$tY%1$tm%1$td_%1$tH%1$tM%1$tS_%1$tL', System.currentTimeMillis()) + ',-cprof_project_id=' + gcpProject + ',-cprof_zone_name=us-central1-a'
}
} else {
// We filter for only Apache Beam benchmarks to ensure that we aren't
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,9 @@
import java.util.Collections;
import java.util.List;
import java.util.Random;
import org.apache.beam.fn.harness.Cache;
import org.apache.beam.fn.harness.Caches;
import org.apache.beam.fn.harness.Caches.ClearableCache;
import org.apache.beam.fn.harness.PrecombineGroupingTable;
import org.apache.beam.sdk.coders.StringUtf8Coder;
import org.apache.beam.sdk.options.PipelineOptions;
Expand All @@ -36,15 +38,20 @@
import org.openjdk.jmh.annotations.Scope;
import org.openjdk.jmh.annotations.Setup;
import org.openjdk.jmh.annotations.State;
import org.openjdk.jmh.annotations.Threads;
import org.openjdk.jmh.infra.Blackhole;

public class PrecombineGroupingTableBenchmark {
private static final int TOTAL_VALUES = 1_000_000;
private static final int KEY_SPACE = 1_000;

@State(Scope.Benchmark)
public static class SumIntegerBinaryCombine {
final Combine.BinaryCombineIntegerFn sumInts = Sum.ofIntegers();
final PipelineOptions options = PipelineOptionsFactory.create();

final Cache<Object, Object> cache = Caches.fromOptions(options);

List<WindowedValue<KV<String, Integer>>> elements;

@Param({"true", "false"})
Expand All @@ -55,51 +62,60 @@ public static class SumIntegerBinaryCombine {

@Setup(Level.Trial)
public void setUp() {
// Use a stable seed to ensure consistency across benchmark runs
Random random = new Random(-2134890234);
elements = new ArrayList<>();
switch (distribution) {
case "uniform":
for (int i = 0; i < TOTAL_VALUES; ++i) {
int key = random.nextInt(1000);
elements.add(WindowedValue.valueInGlobalWindow(KV.of(Integer.toString(key), key)));
}
break;
case "normal":
for (int i = 0; i < TOTAL_VALUES; ++i) {
int key = (int) (random.nextGaussian() * 1000);
elements.add(WindowedValue.valueInGlobalWindow(KV.of(Integer.toString(key), key)));
}
break;
case "hotKey":
for (int i = 0; i < TOTAL_VALUES; ++i) {
int key;
if (random.nextBoolean()) {
key = 0;
} else {
key = random.nextInt(1000);
}
elements.add(WindowedValue.valueInGlobalWindow(KV.of(Integer.toString(key), key)));
}
break;
case "uniqueKeys":
for (int i = 0; i < TOTAL_VALUES; ++i) {
elements.add(WindowedValue.valueInGlobalWindow(KV.of(Integer.toString(i), i)));
this.elements = generateTestData(distribution);
}
}

private static List<WindowedValue<KV<String, Integer>>> generateTestData(String distribution) {
// Use a stable seed to ensure consistency across benchmark runs
Random random = new Random(-2134890234);
List<WindowedValue<KV<String, Integer>>> elements = new ArrayList<>();
switch (distribution) {
case "uniform":
for (int i = 0; i < TOTAL_VALUES; ++i) {
int key = random.nextInt(KEY_SPACE);
elements.add(WindowedValue.valueInGlobalWindow(KV.of(Integer.toString(key), key)));
}
break;
case "normal":
for (int i = 0; i < TOTAL_VALUES; ++i) {
int key = (int) (random.nextGaussian() * KEY_SPACE);
elements.add(WindowedValue.valueInGlobalWindow(KV.of(Integer.toString(key), key)));
}
break;
case "hotKey":
for (int i = 0; i < TOTAL_VALUES; ++i) {
int key;
if (random.nextBoolean()) {
key = -123814201;
} else {
key = random.nextInt(KEY_SPACE);
}
Collections.shuffle(elements, random);
break;
default:
}
elements.add(WindowedValue.valueInGlobalWindow(KV.of(Integer.toString(key), key)));
}
break;
case "uniqueKeys":
for (int i = 0; i < TOTAL_VALUES; ++i) {
elements.add(WindowedValue.valueInGlobalWindow(KV.of(Integer.toString(i), i)));
}
Collections.shuffle(elements, random);
break;
default:
throw new IllegalArgumentException("Unknown distribution: " + distribution);
}
return elements;
}

@Benchmark
@Threads(16)
public void sumIntegerBinaryCombine(SumIntegerBinaryCombine table, Blackhole blackhole)
throws Exception {
ClearableCache<Object, Object> cache =
new ClearableCache<>(Caches.subCache(table.cache, Thread.currentThread().getName()));
PrecombineGroupingTable<String, Integer, int[]> groupingTable =
PrecombineGroupingTable.combiningAndSampling(
table.options,
Caches.eternal(),
cache,
table.sumInts,
StringUtf8Coder.of(),
.001,
Expand All @@ -108,5 +124,6 @@ public void sumIntegerBinaryCombine(SumIntegerBinaryCombine table, Blackhole bla
groupingTable.put(table.elements.get(i), blackhole::consume);
}
groupingTable.flush(blackhole::consume);
cache.clear();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -53,12 +53,25 @@ public final class Caches {
*/
@VisibleForTesting static final int WEIGHT_RATIO = 6;

/** All objects less than or equal to this size will account for 1. */
private static final long MIN_OBJECT_SIZE = 1 << WEIGHT_RATIO;

/**
* Objects which change in this amount should always update the cache.
*
* <p>The limit of 2^16 is chosen to be small enough such that objects will be close enough if
* they change frequently. Future work could scale these ratios based upon the configured cache
* size.
*/
private static final long CACHE_SIZE_CHANGE_LIMIT_BYTES = 1 << 16;

private static final MemoryMeter MEMORY_METER =
MemoryMeter.builder().withGuessing(Guess.BEST).build();

/** The size of a reference. */
public static final long REFERENCE_SIZE = 8;

/** Returns the amount of memory in bytes the provided object consumes. */
public static long weigh(Object o) {
if (o == null) {
return REFERENCE_SIZE;
Expand All @@ -73,6 +86,25 @@ public static long weigh(Object o) {
}
}

/**
* Returns whether the cache should be updated in the case where the objects size has changed.
*
* <p>Note that this should only be used in the case where the cache is being updated very often
* in a tight loop and is not a good fit for cases where the object being cached is the result of
* an expensive operation like a disk read or remote service call.
*/
public static boolean shouldUpdateOnSizeChange(long oldSize, long newSize) {
/*
Our strategy is three fold:
- tiny objects don't impact the cache accounting and count as a size of `1` in the cache.
- large changes (>= CACHE_SIZE_CHANGE_LIMIT_BYTES) should always update the size
- all others if the size changed by a factor of 2
*/
return (oldSize > MIN_OBJECT_SIZE || newSize > MIN_OBJECT_SIZE)
&& ((newSize - oldSize >= CACHE_SIZE_CHANGE_LIMIT_BYTES)
|| Long.highestOneBit(oldSize) != Long.highestOneBit(newSize));
}

/** An eviction listener that reduces the size of entries that are {@link Shrinkable}. */
@VisibleForTesting
static class ShrinkOnEviction implements RemovalListener<CompositeKey, WeightedValue<Object>> {
Expand Down Expand Up @@ -184,8 +216,15 @@ public int weigh(CompositeKey key, WeightedValue<Object> value) {
// which is why we set the concurrency level to 1. See
// https://github.com/google/guava/issues/3462 for further details.
//
// The ProcessBundleBenchmark#testStateWithCaching shows no noticeable change
// when this parameter is left at the default.
// The PrecombineGroupingTable showed contention here since it was working in
// a tight loop. We were able to resolve the contention by reducing the
// frequency of updates. Reconsider this value if we could solve the maximum
// entry size issue. Note that using Runtime.getRuntime().availableProcessors()
// is subject to docker CPU shares issues
// (https://bugs.openjdk.org/browse/JDK-8281181).
//
// We could revisit the caffeine cache library based upon reinvestigating
// recursive computeIfAbsent calls since it doesn't have this limit.
.concurrencyLevel(1)
.recordStats(),
weightInBytes)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -128,8 +128,7 @@ public interface SizeEstimator {
private final AtomicLong maxWeight;
private long weight;
private final boolean isGloballyWindowed;
private long checkFlushCounter;
private long checkFlushLimit = -5;
private long lastWeightForFlush;

private static final class Key implements Weighted {
private static final Key INSTANCE = new Key();
Expand Down Expand Up @@ -407,12 +406,9 @@ public void put(
return tableEntry;
});

if (checkFlushCounter++ < checkFlushLimit) {
return;
} else {
checkFlushLimit = Math.min(checkFlushLimit + 1, 25);
checkFlushCounter = 0;
if (Caches.shouldUpdateOnSizeChange(lastWeightForFlush, weight)) {
flushIfNeeded(receiver);
lastWeightForFlush = weight;
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import static org.hamcrest.Matchers.empty;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.in;
import static org.hamcrest.Matchers.lessThan;
import static org.hamcrest.core.Is.is;
import static org.junit.Assert.assertEquals;

Expand Down Expand Up @@ -156,19 +157,19 @@ public void testCombiningGroupingTableHonorsKeyWeights() throws Exception {
assertThat(receiver.outputElems, empty());

// Putting in other large keys should cause eviction.
table.put(valueInGlobalWindow(KV.of("BBB", 9)), receiver);
table.put(valueInGlobalWindow(KV.of("BB", 509)), receiver);
table.put(valueInGlobalWindow(KV.of("CCC", 11)), receiver);
assertThat(
receiver.outputElems,
containsInAnyOrder(
valueInGlobalWindow(KV.of("AAA", 1L + 2 + 4)), valueInGlobalWindow(KV.of("BBB", 9L))));
valueInGlobalWindow(KV.of("AAA", 1L + 2 + 4)), valueInGlobalWindow(KV.of("BB", 509L))));

table.flush(receiver);
assertThat(
receiver.outputElems,
containsInAnyOrder(
valueInGlobalWindow(KV.of("AAA", 1L + 2 + 4)),
valueInGlobalWindow(KV.of("BBB", 9L)),
valueInGlobalWindow(KV.of("BB", 509L)),
valueInGlobalWindow(KV.of("CCC", 11L))));
}

Expand Down Expand Up @@ -225,17 +226,20 @@ public void testCombiningGroupingTableCompactionSaves() throws Exception {

// Insert three compactable values which shouldn't lead to eviction even though we are over
// the maximum size.
table.put(valueInGlobalWindow(KV.of("A", 1004)), receiver);
table.put(valueInGlobalWindow(KV.of("B", 1004)), receiver);
table.put(valueInGlobalWindow(KV.of("A", 804)), receiver);
table.put(valueInGlobalWindow(KV.of("B", 904)), receiver);
table.put(valueInGlobalWindow(KV.of("C", 1004)), receiver);
assertThat(receiver.outputElems, empty());

// Ensure that compaction occurred during the insertion of the above elements before flushing.
assertThat(table.getWeight(), lessThan(804L + 904L + 1004L));

table.flush(receiver);
assertThat(
receiver.outputElems,
containsInAnyOrder(
valueInGlobalWindow(KV.of("A", 1004L / 4)),
valueInGlobalWindow(KV.of("B", 1004L / 4)),
valueInGlobalWindow(KV.of("A", 804L / 4)),
valueInGlobalWindow(KV.of("B", 904L / 4)),
valueInGlobalWindow(KV.of("C", 1004L / 4))));
}

Expand All @@ -254,20 +258,20 @@ public void testCombiningGroupingTablePartialEviction() throws Exception {

// Insert three values which even with compaction isn't enough so we evict A & B to get
// under the max weight.
table.put(valueInGlobalWindow(KV.of("A", 1001)), receiver);
table.put(valueInGlobalWindow(KV.of("B", 1001)), receiver);
table.put(valueInGlobalWindow(KV.of("A", 801)), receiver);
table.put(valueInGlobalWindow(KV.of("B", 901)), receiver);
table.put(valueInGlobalWindow(KV.of("C", 1001)), receiver);
assertThat(
receiver.outputElems,
containsInAnyOrder(
valueInGlobalWindow(KV.of("A", 1001L)), valueInGlobalWindow(KV.of("B", 1001L))));
valueInGlobalWindow(KV.of("A", 801L)), valueInGlobalWindow(KV.of("B", 901L))));

table.flush(receiver);
assertThat(
receiver.outputElems,
containsInAnyOrder(
valueInGlobalWindow(KV.of("A", 1001L)),
valueInGlobalWindow(KV.of("B", 1001L)),
valueInGlobalWindow(KV.of("A", 801L)),
valueInGlobalWindow(KV.of("B", 901L)),
valueInGlobalWindow(KV.of("C", 1001L))));
}

Expand Down Expand Up @@ -460,7 +464,10 @@ protected boolean matchesSafely(T item, Description mismatchDescription) {
};
}

/** "Estimate" the size of strings by taking the tenth power of their length. */
/**
* Used to simulate very specific compaction/eviction tests under certain scenarios instead of
* relying on JAMM for size estimation. Strings are 10^length and longs are their value.
*/
private static class TestSizeEstimator implements SizeEstimator {
int calls = 0;

Expand Down

0 comments on commit c65e777

Please sign in to comment.