Skip to content

Commit

Permalink
AsyncReporter/SpanHandler: make queuedMaxBytes=0 disable pre-flight s…
Browse files Browse the repository at this point in the history
…ize checks (#260)

Signed-off-by: Andriy Redko <[email protected]>
Signed-off-by: Adrian Cole <[email protected]>
Co-authored-by: Adrian Cole <[email protected]>
  • Loading branch information
reta and Adrian Cole authored Apr 12, 2024
1 parent 0c62f8b commit f30ee66
Show file tree
Hide file tree
Showing 10 changed files with 588 additions and 72 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@

import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import java.util.stream.Stream;

import org.openjdk.jmh.annotations.AuxCounters;
import org.openjdk.jmh.annotations.Benchmark;
import org.openjdk.jmh.annotations.BenchmarkMode;
Expand All @@ -24,6 +26,7 @@
import org.openjdk.jmh.annotations.Warmup;
import zipkin2.Span;
import zipkin2.TestObjects;
import zipkin2.reporter.BytesEncoder;
import zipkin2.reporter.Encoding;
import zipkin2.reporter.InMemoryReporterMetrics;
import zipkin2.reporter.SpanBytesEncoder;
Expand All @@ -42,6 +45,9 @@ public class AsyncReporterBenchmarks {
@Param
public Encoding encoding;

@Param({"0", "20000000"})
public int maxBytes;

@AuxCounters
@State(Scope.Thread)
public static class InMemoryReporterMetricsAsCounters {
Expand Down Expand Up @@ -77,10 +83,17 @@ public void clean() {

@Setup(Level.Trial)
public void setup() {
final BytesEncoder<Span> encoder = Stream
.of(SpanBytesEncoder.JSON_V2, SpanBytesEncoder.PROTO3, SpanBytesEncoder.THRIFT)
.filter(e -> e.encoding().equals(encoding))
.findAny()
.orElseThrow(() -> new IllegalStateException("Unable to find BytesEncoder<Span> for " + encoding));

reporter = AsyncReporter.newBuilder(new NoopSender(encoding))
.messageMaxBytes(1000000) // example default from Kafka message.max.bytes
.queuedMaxBytes(maxBytes)
.metrics(metrics)
.build(SpanBytesEncoder.JSON_V2);
.build(encoder);
}

@Benchmark @Group("no_contention") @GroupThreads(1)
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,146 @@
/*
* Copyright The OpenZipkin Authors
* SPDX-License-Identifier: Apache-2.0
*/
package zipkin2.reporter.internal;

import java.util.concurrent.TimeUnit;
import org.openjdk.jmh.annotations.AuxCounters;
import org.openjdk.jmh.annotations.Benchmark;
import org.openjdk.jmh.annotations.BenchmarkMode;
import org.openjdk.jmh.annotations.Fork;
import org.openjdk.jmh.annotations.Group;
import org.openjdk.jmh.annotations.GroupThreads;
import org.openjdk.jmh.annotations.Level;
import org.openjdk.jmh.annotations.Measurement;
import org.openjdk.jmh.annotations.Mode;
import org.openjdk.jmh.annotations.OutputTimeUnit;
import org.openjdk.jmh.annotations.Param;
import org.openjdk.jmh.annotations.Scope;
import org.openjdk.jmh.annotations.Setup;
import org.openjdk.jmh.annotations.State;
import org.openjdk.jmh.annotations.TearDown;
import org.openjdk.jmh.annotations.Warmup;
import org.openjdk.jmh.runner.Runner;
import org.openjdk.jmh.runner.RunnerException;
import org.openjdk.jmh.runner.options.Options;
import org.openjdk.jmh.runner.options.OptionsBuilder;

@Measurement(iterations = 5, time = 1)
@Warmup(iterations = 10, time = 1)
@Fork(3)
@BenchmarkMode(Mode.Throughput)
@OutputTimeUnit(TimeUnit.MICROSECONDS)
@State(Scope.Group)
public class BoundedQueueBenchmarks {
static final byte ONE = 1;

@Param( {"0", "10000"})
public int maxBytes;

@AuxCounters
@State(Scope.Thread)
public static class OfferCounters {
public int offersFailed;
public int offersMade;

@Setup(Level.Iteration)
public void clean() {
offersFailed = offersMade = 0;
}
}

@AuxCounters
@State(Scope.Thread)
public static class DrainCounters {
public int drained;

@Setup(Level.Iteration)
public void clean() {
drained = 0;
}
}

private static ThreadLocal<Object> marker = new ThreadLocal<>();

@State(Scope.Thread)
public static class ConsumerMarker {
public ConsumerMarker() {
marker.set(this);
}
}

BoundedQueue<Byte> q;

@Setup
public void setup() {
q = BoundedQueue.create(null, null, null, 10000, 10000, maxBytes);
}

@Benchmark @Group("no_contention") @GroupThreads(1)
public void no_contention_offer(OfferCounters counters) {
if (q.offer(ONE, 1)) {
counters.offersMade++;
} else {
counters.offersFailed++;
}
}

@Benchmark @Group("no_contention") @GroupThreads(1)
public void no_contention_drain(DrainCounters counters, ConsumerMarker cm) {
q.drainTo((s, b) -> {
counters.drained++;
return true;
}, 1000);
}

@Benchmark @Group("mild_contention") @GroupThreads(2)
public void mild_contention_offer(OfferCounters counters) {
if (q.offer(ONE, 1)) {
counters.offersMade++;
} else {
counters.offersFailed++;
}
}

@Benchmark @Group("mild_contention") @GroupThreads(1)
public void mild_contention_drain(DrainCounters counters, ConsumerMarker cm) {
q.drainTo((s, b) -> {
counters.drained++;
return true;
}, 1000);
}

@Benchmark @Group("high_contention") @GroupThreads(8)
public void high_contention_offer(OfferCounters counters) {
if (q.offer(ONE, 1)) {
counters.offersMade++;
} else {
counters.offersFailed++;
}
}

@Benchmark @Group("high_contention") @GroupThreads(1)
public void high_contention_drain(DrainCounters counters, ConsumerMarker cm) {
q.drainTo((s, b) -> {
counters.drained++;
return true;
}, 1000);
}

@TearDown(Level.Iteration)
public void emptyQ() {
// If this thread didn't drain, return
if (marker.get() == null) return;
q.clear();
}

// Convenience main entry-point
public static void main(String[] args) throws RunnerException {
Options opt = new OptionsBuilder()
.include(".*" + BoundedQueueBenchmarks.class.getSimpleName() + ".*")
.build();

new Runner(opt).run();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ public ConsumerMarker() {

@Setup
public void setup() {
q = new ByteBoundedQueue<>(10000, 10000);
q = new ByteBoundedQueue<>(null, null, null, 10000, 10000, 10000);
}

@Benchmark @Group("no_contention") @GroupThreads(1)
Expand Down
29 changes: 13 additions & 16 deletions core/src/main/java/zipkin2/reporter/internal/AsyncReporter.java
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,8 @@
* @param <S> type of the span, usually {@code zipkin2.Span}
* @since 3.0
*/
public abstract class AsyncReporter<S> extends Component implements Reporter<S>, Closeable, Flushable {
public abstract class AsyncReporter<S> extends Component
implements Reporter<S>, Closeable, Flushable {
public static Builder newBuilder(BytesMessageSender sender) {
return new Builder(sender);
}
Expand Down Expand Up @@ -82,8 +83,8 @@ public static final class Builder {
this.messageMaxBytes = asyncReporter.messageMaxBytes;
this.messageTimeoutNanos = asyncReporter.messageTimeoutNanos;
this.closeTimeoutNanos = asyncReporter.closeTimeoutNanos;
this.queuedMaxSpans = asyncReporter.pending.maxSize;
this.queuedMaxBytes = asyncReporter.pending.maxBytes;
this.queuedMaxSpans = asyncReporter.pending.maxSize();
this.queuedMaxBytes = asyncReporter.queuedMaxBytes;
}

static int onePercentOfMemory() {
Expand Down Expand Up @@ -181,8 +182,9 @@ static final class BoundedAsyncReporter<S> extends AsyncReporter<S> {
static final Logger logger = Logger.getLogger(BoundedAsyncReporter.class.getName());
final AtomicBoolean started, closed;
final BytesEncoder<S> encoder;
final ByteBoundedQueue<S> pending;
final BoundedQueue<S> pending;
final BytesMessageSender sender;
final int queuedMaxBytes;
final int messageMaxBytes;
final long messageTimeoutNanos, closeTimeoutNanos;
final CountDownLatch close;
Expand All @@ -193,8 +195,10 @@ static final class BoundedAsyncReporter<S> extends AsyncReporter<S> {
private boolean shouldWarnException = true;

BoundedAsyncReporter(Builder builder, BytesEncoder<S> encoder) {
this.pending = new ByteBoundedQueue<S>(builder.queuedMaxSpans, builder.queuedMaxBytes);
this.pending = BoundedQueue.create(encoder, builder.sender, builder.metrics,
builder.messageMaxBytes, builder.queuedMaxSpans, builder.queuedMaxBytes);
this.sender = builder.sender;
this.queuedMaxBytes = builder.queuedMaxBytes;
this.messageMaxBytes = builder.messageMaxBytes;
this.messageTimeoutNanos = builder.messageTimeoutNanos;
this.closeTimeoutNanos = builder.closeTimeoutNanos;
Expand All @@ -216,18 +220,15 @@ void startFlusherThread() {
flushThread.start();
}

@SuppressWarnings("unchecked")
@Override public void report(S next) {
if (next == null) throw new NullPointerException("span == null");
// Lazy start so that reporters never used don't spawn threads
if (started.compareAndSet(false, true)) startFlusherThread();
metrics.incrementSpans(1);
int nextSizeInBytes = encoder.sizeInBytes(next);
int messageSizeOfNextSpan = sender.messageSizeInBytes(nextSizeInBytes);
metrics.incrementSpanBytes(nextSizeInBytes);
if (closed.get() ||
// don't enqueue something larger than we can drain
messageSizeOfNextSpan > messageMaxBytes ||
!pending.offer(next, nextSizeInBytes)) {

// enqueue now and filter our when we drain
if (closed.get() || !pending.offer(next)) {
metrics.incrementSpansDropped(1);
}
}
Expand All @@ -240,10 +241,6 @@ void startFlusherThread() {
void flush(BufferNextMessage<S> bundler) {
pending.drainTo(bundler, bundler.remainingNanos());

// record after flushing reduces the amount of gauge events vs on doing this on report
metrics.updateQueuedSpans(pending.count);
metrics.updateQueuedBytes(pending.sizeInBytes);

// loop around if we are running, and the bundle isn't full
// if we are closed, try to send what's pending
if (!bundler.isReady() && !closed.get()) return;
Expand Down
49 changes: 49 additions & 0 deletions core/src/main/java/zipkin2/reporter/internal/BoundedQueue.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
/*
* Copyright The OpenZipkin Authors
* SPDX-License-Identifier: Apache-2.0
*/

package zipkin2.reporter.internal;

import zipkin2.reporter.BytesEncoder;
import zipkin2.reporter.BytesMessageSender;
import zipkin2.reporter.ReporterMetrics;

/**
* Multi-producer, multi-consumer queue that could be bounded by count or/and size.
*/
abstract class BoundedQueue<S> implements SpanWithSizeConsumer<S> {
static <S> BoundedQueue<S> create(BytesEncoder<S> encoder, BytesMessageSender sender,
ReporterMetrics metrics, int messageMaxBytes, int maxSize, int maxBytes) {
if (maxBytes > 0) {
return new ByteBoundedQueue<S>(encoder, sender, metrics, messageMaxBytes, maxSize, maxBytes);
} else {
return new CountBoundedQueue<S>(encoder, sender, metrics, messageMaxBytes, maxSize);
}
}

/**
* Max element's count of this bounded queue
*/
abstract int maxSize();

/**
* Clear this bounded queue
*/
abstract int clear();

/**
* Drains this bounded queue. Blocks for up to nanosTimeout for spans to appear.
* Then, consume as many as possible.
*/
abstract int drainTo(SpanWithSizeConsumer<S> bundler, long remainingNanos);

/** Returns true if the element could be added or false if it could not. */
abstract boolean offer(S next);
}

interface SpanWithSizeConsumer<S> {
/** Returns true if the element could be added or false if it could not due to its size. */
boolean offer(S next, int nextSizeInBytes);
}

Loading

0 comments on commit f30ee66

Please sign in to comment.