Skip to content

Commit

Permalink
Institute a 10 snapshot per probe per trace budget (#8277)
Browse files Browse the repository at this point in the history
  • Loading branch information
evanchooly authored Jan 24, 2025
1 parent 93b44ea commit 71309b3
Show file tree
Hide file tree
Showing 3 changed files with 91 additions and 21 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -16,12 +16,14 @@
import com.datadog.debugger.sink.DebuggerSink;
import com.datadog.debugger.sink.Snapshot;
import com.datadog.debugger.util.MoshiHelper;
import com.datadog.debugger.util.WeakIdentityHashMap;
import com.squareup.moshi.Json;
import com.squareup.moshi.JsonAdapter;
import com.squareup.moshi.JsonReader;
import com.squareup.moshi.JsonWriter;
import com.squareup.moshi.Types;
import datadog.trace.api.Config;
import datadog.trace.api.DDTraceId;
import datadog.trace.bootstrap.debugger.CapturedContext;
import datadog.trace.bootstrap.debugger.CorrelationAccess;
import datadog.trace.bootstrap.debugger.DebuggerContext;
Expand Down Expand Up @@ -49,6 +51,7 @@
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Consumer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand All @@ -59,6 +62,8 @@ public class LogProbe extends ProbeDefinition implements Sampled {
private static final Limits LIMITS = new Limits(1, 3, 8192, 5);
private static final int LOG_MSG_LIMIT = 8192;

public static final int PROBE_BUDGET = 10;

/** Stores part of a templated message either a str or an expression */
public static class Segment {
private final String str;
Expand Down Expand Up @@ -278,6 +283,8 @@ public String toString() {
private final Capture capture;
private final Sampling sampling;
private transient Consumer<Snapshot> snapshotProcessor;
protected transient Map<DDTraceId, AtomicInteger> budget =
Collections.synchronizedMap(new WeakIdentityHashMap<>());

// no-arg constructor is required by Moshi to avoid creating instance with unsafe and by-passing
// constructors, including field initializers.
Expand Down Expand Up @@ -568,10 +575,12 @@ public void commit(
CapturedContext exitContext,
List<CapturedContext.CapturedThrowable> caughtExceptions) {
Snapshot snapshot = createSnapshot();
boolean shouldCommit = fillSnapshot(entryContext, exitContext, caughtExceptions, snapshot);
boolean shouldCommit =
inBudget() && fillSnapshot(entryContext, exitContext, caughtExceptions, snapshot);
DebuggerSink sink = DebuggerAgent.getSink();
if (shouldCommit) {
commitSnapshot(snapshot, sink);
incrementBudget();
if (snapshotProcessor != null) {
snapshotProcessor.accept(snapshot);
}
Expand Down Expand Up @@ -855,6 +864,26 @@ public String toString() {
}
}

private boolean inBudget() {
AtomicInteger budgetLevel = getBudgetLevel();
return budgetLevel == null || budgetLevel.get() < PROBE_BUDGET;
}

private AtomicInteger getBudgetLevel() {
TracerAPI tracer = AgentTracer.get();
AgentSpan span = tracer != null ? tracer.activeSpan() : null;
return getDebugSessionId() == null || span == null
? null
: budget.computeIfAbsent(span.getLocalRootSpan().getTraceId(), id -> new AtomicInteger());
}

private void incrementBudget() {
AtomicInteger budgetLevel = getBudgetLevel();
if (budgetLevel != null) {
budgetLevel.incrementAndGet();
}
}

@Generated
@Override
public boolean equals(Object o) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,10 @@ public SnapshotSink(Config config, String tags, BatchUploader snapshotUploader)
this.snapshotUploader = snapshotUploader;
}

public BlockingQueue<Snapshot> getLowRateSnapshots() {
return lowRateSnapshots;
}

public void start() {
if (started.compareAndSet(false, true)) {
highRateScheduled =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,18 +43,18 @@ public class LogProbeTest {

@Test
public void testCapture() {
LogProbe.Builder builder = createLog(null);
Builder builder = createLog(null);
LogProbe snapshotProbe = builder.capture(1, 420, 255, 20).build();
Assertions.assertEquals(1, snapshotProbe.getCapture().getMaxReferenceDepth());
Assertions.assertEquals(420, snapshotProbe.getCapture().getMaxCollectionSize());
Assertions.assertEquals(255, snapshotProbe.getCapture().getMaxLength());
assertEquals(1, snapshotProbe.getCapture().getMaxReferenceDepth());
assertEquals(420, snapshotProbe.getCapture().getMaxCollectionSize());
assertEquals(255, snapshotProbe.getCapture().getMaxLength());
}

@Test
public void testSampling() {
LogProbe.Builder builder = createLog(null);
Builder builder = createLog(null);
LogProbe snapshotProbe = builder.sampling(0.25).build();
Assertions.assertEquals(0.25, snapshotProbe.getSampling().getEventsPerSecond(), 0.01);
assertEquals(0.25, snapshotProbe.getSampling().getEventsPerSecond(), 0.01);
}

@Test
Expand All @@ -78,6 +78,44 @@ public void noDebugSession() {
"With no debug sessions, snapshots should get filled.");
}

@Test
public void budgets() {
DebuggerSink sink = new DebuggerSink(getConfig(), mock(ProbeStatusSink.class));
DebuggerAgentHelper.injectSink(sink);
assertEquals(0, sink.getSnapshotSink().getLowRateSnapshots().size());
TracerAPI tracer =
CoreTracer.builder().idGenerationStrategy(IdGenerationStrategy.fromName("random")).build();
AgentTracer.registerIfAbsent(tracer);
int runs = 100;
for (int i = 0; i < runs; i++) {
runTrace(tracer);
}
assertEquals(runs * LogProbe.PROBE_BUDGET, sink.getSnapshotSink().getLowRateSnapshots().size());
}

private void runTrace(TracerAPI tracer) {
AgentSpan span = tracer.startSpan("budget testing", "test span");
span.setTag(Tags.PROPAGATED_DEBUG, "12345:1");
try (AgentScope scope = tracer.activateSpan(span, ScopeSource.MANUAL)) {

LogProbe logProbe =
createLog("I'm in a debug session")
.probeId(ProbeId.newId())
.tags("session_id:12345")
.captureSnapshot(true)
.build();

CapturedContext entryContext = capturedContext(span, logProbe);
CapturedContext exitContext = capturedContext(span, logProbe);
logProbe.evaluate(entryContext, new LogStatus(logProbe), MethodLocation.ENTRY);
logProbe.evaluate(exitContext, new LogStatus(logProbe), MethodLocation.EXIT);

for (int i = 0; i < 20; i++) {
logProbe.commit(entryContext, exitContext, emptyList());
}
}
}

private boolean fillSnapshot(DebugSessionStatus status) {
DebuggerAgentHelper.injectSink(new DebuggerSink(getConfig(), mock(ProbeStatusSink.class)));
TracerAPI tracer =
Expand Down Expand Up @@ -152,12 +190,11 @@ public void fillSnapshot_shouldSend(String methodLocation) {
LogProbe logProbe = createLog(null).evaluateAt(MethodLocation.valueOf(methodLocation)).build();
CapturedContext entryContext = new CapturedContext();
CapturedContext exitContext = new CapturedContext();
LogProbe.LogStatus logEntryStatus =
prepareContext(entryContext, logProbe, MethodLocation.ENTRY);
LogStatus logEntryStatus = prepareContext(entryContext, logProbe, MethodLocation.ENTRY);
logEntryStatus.setSampled(true); // force sampled to avoid rate limiting executing tests!
LogProbe.LogStatus logExitStatus = prepareContext(exitContext, logProbe, MethodLocation.EXIT);
LogStatus logExitStatus = prepareContext(exitContext, logProbe, MethodLocation.EXIT);
logExitStatus.setSampled(true); // force sampled to avoid rate limiting executing tests!
Snapshot snapshot = new Snapshot(Thread.currentThread(), logProbe, 10);
Snapshot snapshot = new Snapshot(currentThread(), logProbe, 10);
assertTrue(logProbe.fillSnapshot(entryContext, exitContext, null, snapshot));
}

Expand All @@ -172,16 +209,16 @@ public void fillSnapshot(
LogProbe logProbe = createLog(null).evaluateAt(MethodLocation.EXIT).build();
CapturedContext entryContext = new CapturedContext();
CapturedContext exitContext = new CapturedContext();
LogProbe.LogStatus entryStatus = prepareContext(entryContext, logProbe, MethodLocation.ENTRY);
LogStatus entryStatus = prepareContext(entryContext, logProbe, MethodLocation.ENTRY);
fillStatus(entryStatus, sampled, condition, conditionErrors, logTemplateErrors);
LogProbe.LogStatus exitStatus = prepareContext(exitContext, logProbe, MethodLocation.EXIT);
LogStatus exitStatus = prepareContext(exitContext, logProbe, MethodLocation.EXIT);
fillStatus(exitStatus, sampled, condition, conditionErrors, logTemplateErrors);
Snapshot snapshot = new Snapshot(Thread.currentThread(), logProbe, 10);
Snapshot snapshot = new Snapshot(currentThread(), logProbe, 10);
assertEquals(shouldCommit, logProbe.fillSnapshot(entryContext, exitContext, null, snapshot));
}

private void fillStatus(
LogProbe.LogStatus entryStatus,
LogStatus entryStatus,
boolean sampled,
boolean condition,
boolean conditionErrors,
Expand All @@ -193,10 +230,10 @@ private void fillStatus(
entryStatus.setLogTemplateErrors(logTemplateErrors);
}

private LogProbe.LogStatus prepareContext(
private LogStatus prepareContext(
CapturedContext context, LogProbe logProbe, MethodLocation methodLocation) {
context.evaluate(PROBE_ID.getEncodedId(), logProbe, "", 0, methodLocation);
return (LogProbe.LogStatus) context.getStatus(PROBE_ID.getEncodedId());
return (LogStatus) context.getStatus(PROBE_ID.getEncodedId());
}

private static Stream<Arguments> statusValues() {
Expand All @@ -222,15 +259,15 @@ public void fillSnapshot_shouldSend_exit() {
prepareContext(entryContext, logProbe, MethodLocation.ENTRY);
CapturedContext exitContext = new CapturedContext();
prepareContext(exitContext, logProbe, MethodLocation.EXIT);
Snapshot snapshot = new Snapshot(Thread.currentThread(), logProbe, 10);
Snapshot snapshot = new Snapshot(currentThread(), logProbe, 10);
assertTrue(logProbe.fillSnapshot(entryContext, exitContext, null, snapshot));
}

@Test
public void fillSnapshot_shouldSend_evalErrors() {
LogProbe logProbe = createLog(null).evaluateAt(MethodLocation.EXIT).build();
CapturedContext entryContext = new CapturedContext();
LogProbe.LogStatus logStatus = prepareContext(entryContext, logProbe, MethodLocation.ENTRY);
LogStatus logStatus = prepareContext(entryContext, logProbe, MethodLocation.ENTRY);
logStatus.addError(new EvaluationError("expr", "msg1"));
logStatus.setLogTemplateErrors(true);
entryContext.addThrowable(new RuntimeException("errorEntry"));
Expand All @@ -239,7 +276,7 @@ public void fillSnapshot_shouldSend_evalErrors() {
logStatus.addError(new EvaluationError("expr", "msg2"));
logStatus.setLogTemplateErrors(true);
exitContext.addThrowable(new RuntimeException("errorExit"));
Snapshot snapshot = new Snapshot(Thread.currentThread(), logProbe, 10);
Snapshot snapshot = new Snapshot(currentThread(), logProbe, 10);
assertTrue(logProbe.fillSnapshot(entryContext, exitContext, null, snapshot));
assertEquals(2, snapshot.getEvaluationErrors().size());
assertEquals("msg1", snapshot.getEvaluationErrors().get(0).getMessage());
Expand All @@ -250,7 +287,7 @@ public void fillSnapshot_shouldSend_evalErrors() {
"errorExit", snapshot.getCaptures().getReturn().getCapturedThrowable().getMessage());
}

private LogProbe.Builder createLog(String template) {
private Builder createLog(String template) {
return LogProbe.builder()
.language(LANGUAGE)
.probeId(PROBE_ID)
Expand Down

0 comments on commit 71309b3

Please sign in to comment.