Skip to content

Commit

Permalink
Swap to a random number generator that doesn't use atomics/synchroniz…
Browse files Browse the repository at this point in the history
…ation since the Java Random and ThreadLocalRandom are thread safe and use atomics and/or synchonization internally.

This is for apache#21250
  • Loading branch information
lukecwik committed Feb 18, 2023
1 parent 208763d commit b8536da
Show file tree
Hide file tree
Showing 2 changed files with 36 additions and 60 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@
import org.apache.beam.sdk.util.Weighted;
import org.apache.beam.sdk.util.WindowedValue;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.vendor.grpc.v1p48p1.io.netty.util.internal.ThreadLocalRandom;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.annotations.VisibleForTesting;
import org.joda.time.Instant;

Expand Down Expand Up @@ -524,7 +525,12 @@ static class SamplingSizeEstimator implements SizeEstimator {

private SamplingSizeEstimator(
SizeEstimator underlying, double minSampleRate, double maxSampleRate) {
this(underlying, minSampleRate, maxSampleRate, DEFAULT_MIN_SAMPLED, new Random());
this(
underlying,
minSampleRate,
maxSampleRate,
DEFAULT_MIN_SAMPLED,
ThreadLocalRandom.current());
}

@VisibleForTesting
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@
import org.apache.beam.sdk.util.WindowedValue;
import org.apache.beam.sdk.util.WindowedValue.WindowedValueCoder;
import org.apache.beam.sdk.util.common.ElementByteSizeObserver;
import org.apache.beam.vendor.grpc.v1p48p1.io.netty.util.internal.ThreadLocalRandom;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableList;

/**
Expand Down Expand Up @@ -393,89 +394,58 @@ public double getProgress() {
}
}

private static class SampleByteSizeDistribution<T> {
public static class SampleByteSizeDistribution<T> extends ElementByteSizeObserver {
/** Basic implementation of {@link ElementByteSizeObserver} for use in size estimation. */
private static class ByteSizeObserver extends ElementByteSizeObserver {
private long observedSize = 0;

@Override
protected void reportElementSize(long elementSize) {
observedSize += elementSize;
}
@Override
protected void reportElementSize(long elementSize) {
distribution.update(elementSize);
}

final Distribution distribution;
ByteSizeObserver byteCountObserver;

public SampleByteSizeDistribution(Distribution distribution) {
this.distribution = distribution;
this.byteCountObserver = null;
}

public void tryUpdate(T value, Coder<T> coder) throws Exception {
if (shouldSampleElement()) {
// First try using byte size observer
byteCountObserver = new ByteSizeObserver();
coder.registerByteSizeObserver(value, byteCountObserver);
coder.registerByteSizeObserver(value, this);

if (!byteCountObserver.getIsLazy()) {
byteCountObserver.advance();
this.distribution.update(byteCountObserver.observedSize);
if (!getIsLazy()) {
advance();
}
} else {
byteCountObserver = null;
}
}

public void finishLazyUpdate() {
// Advance lazy ElementByteSizeObservers, if any.
if (byteCountObserver != null && byteCountObserver.getIsLazy()) {
byteCountObserver.advance();
this.distribution.update(byteCountObserver.observedSize);
// Note that user's code is allowed to store the element of one
// DoFn.processElement() call and access it later on. We are still
// calling next() here, causing an update to byteCount. If user's
// code really accesses more element's pieces later on, their byte
// count would accrue against a future element. This is not ideal,
// but still approximately correct.
if (getIsLazy()) {
advance();
}
}

private static final int RESERVOIR_SIZE = 10;
private static final int SAMPLING_THRESHOLD = 30;
private long samplingToken = 0;
private long nextSamplingToken = 0;
private Random randomGenerator = new Random();
// Lowest sampling probability: 0.001%.
private static final int SAMPLING_TOKEN_UPPER_BOUND = 1000000;
private static final int SAMPLING_CUTOFF = 10;
private int samplingToken = 0;
private Random randomGenerator = ThreadLocalRandom.current();

private boolean shouldSampleElement() {
// Sampling probability decreases as the element count is increasing.
// We unconditionally sample the first samplingCutoff elements. Calculating
// nextInt(samplingToken) for each element is expensive, so after a threshold, calculate the
// gap to next sample.
// https://erikerlandson.github.io/blog/2015/11/20/very-fast-reservoir-sampling/

// Reset samplingToken if it's going to exceed the max value.
if (samplingToken + 1 == Long.MAX_VALUE) {
samplingToken = 0;
nextSamplingToken = getNextSamplingToken(samplingToken);
}

samplingToken++;
// Use traditional sampling until the threshold of 30
if (nextSamplingToken == 0) {
if (samplingToken <= RESERVOIR_SIZE
|| randomGenerator.nextInt((int) samplingToken) < RESERVOIR_SIZE) {
if (samplingToken > SAMPLING_THRESHOLD) {
nextSamplingToken = getNextSamplingToken(samplingToken);
}
return true;
}
} else if (samplingToken >= nextSamplingToken) {
nextSamplingToken = getNextSamplingToken(samplingToken);
return true;
}
return false;
}

private long getNextSamplingToken(long samplingToken) {
double gap =
Math.log(1.0 - randomGenerator.nextDouble())
/ Math.log(1.0 - RESERVOIR_SIZE / (double) samplingToken);
return samplingToken + (int) gap;
// We unconditionally sample the first samplingCutoff elements. For the
// next samplingCutoff elements, the sampling probability drops from 100%
// to 50%. The probability of sampling the Nth element is:
// min(1, samplingCutoff / N), with an additional lower bound of
// samplingCutoff / samplingTokenUpperBound. This algorithm may be refined
// later.
samplingToken = Math.min(samplingToken + 1, SAMPLING_TOKEN_UPPER_BOUND);
return randomGenerator.nextInt(samplingToken) < SAMPLING_CUTOFF;
}
}
}

0 comments on commit b8536da

Please sign in to comment.