diff --git a/dd-java-agent/agent-debugger/src/main/java/com/datadog/debugger/probe/LogProbe.java b/dd-java-agent/agent-debugger/src/main/java/com/datadog/debugger/probe/LogProbe.java index 9a8c0f59a0b..f07f17cc558 100644 --- a/dd-java-agent/agent-debugger/src/main/java/com/datadog/debugger/probe/LogProbe.java +++ b/dd-java-agent/agent-debugger/src/main/java/com/datadog/debugger/probe/LogProbe.java @@ -16,12 +16,14 @@ import com.datadog.debugger.sink.DebuggerSink; import com.datadog.debugger.sink.Snapshot; import com.datadog.debugger.util.MoshiHelper; +import com.datadog.debugger.util.WeakIdentityHashMap; import com.squareup.moshi.Json; import com.squareup.moshi.JsonAdapter; import com.squareup.moshi.JsonReader; import com.squareup.moshi.JsonWriter; import com.squareup.moshi.Types; import datadog.trace.api.Config; +import datadog.trace.api.DDTraceId; import datadog.trace.bootstrap.debugger.CapturedContext; import datadog.trace.bootstrap.debugger.CorrelationAccess; import datadog.trace.bootstrap.debugger.DebuggerContext; @@ -49,6 +51,7 @@ import java.util.List; import java.util.Map; import java.util.Objects; +import java.util.concurrent.atomic.AtomicInteger; import java.util.function.Consumer; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -59,6 +62,8 @@ public class LogProbe extends ProbeDefinition implements Sampled { private static final Limits LIMITS = new Limits(1, 3, 8192, 5); private static final int LOG_MSG_LIMIT = 8192; + public static final int PROBE_BUDGET = 10; + /** Stores part of a templated message either a str or an expression */ public static class Segment { private final String str; @@ -278,6 +283,8 @@ public String toString() { private final Capture capture; private final Sampling sampling; private transient Consumer snapshotProcessor; + protected transient Map budget = + Collections.synchronizedMap(new WeakIdentityHashMap<>()); // no-arg constructor is required by Moshi to avoid creating instance with unsafe and by-passing // constructors, including field initializers. @@ -568,10 +575,12 @@ public void commit( CapturedContext exitContext, List caughtExceptions) { Snapshot snapshot = createSnapshot(); - boolean shouldCommit = fillSnapshot(entryContext, exitContext, caughtExceptions, snapshot); + boolean shouldCommit = + inBudget() && fillSnapshot(entryContext, exitContext, caughtExceptions, snapshot); DebuggerSink sink = DebuggerAgent.getSink(); if (shouldCommit) { commitSnapshot(snapshot, sink); + incrementBudget(); if (snapshotProcessor != null) { snapshotProcessor.accept(snapshot); } @@ -855,6 +864,26 @@ public String toString() { } } + private boolean inBudget() { + AtomicInteger budgetLevel = getBudgetLevel(); + return budgetLevel == null || budgetLevel.get() < PROBE_BUDGET; + } + + private AtomicInteger getBudgetLevel() { + TracerAPI tracer = AgentTracer.get(); + AgentSpan span = tracer != null ? tracer.activeSpan() : null; + return getDebugSessionId() == null || span == null + ? null + : budget.computeIfAbsent(span.getLocalRootSpan().getTraceId(), id -> new AtomicInteger()); + } + + private void incrementBudget() { + AtomicInteger budgetLevel = getBudgetLevel(); + if (budgetLevel != null) { + budgetLevel.incrementAndGet(); + } + } + @Generated @Override public boolean equals(Object o) { diff --git a/dd-java-agent/agent-debugger/src/main/java/com/datadog/debugger/sink/SnapshotSink.java b/dd-java-agent/agent-debugger/src/main/java/com/datadog/debugger/sink/SnapshotSink.java index 8b47882c8fa..168ef43a19e 100644 --- a/dd-java-agent/agent-debugger/src/main/java/com/datadog/debugger/sink/SnapshotSink.java +++ b/dd-java-agent/agent-debugger/src/main/java/com/datadog/debugger/sink/SnapshotSink.java @@ -53,6 +53,10 @@ public SnapshotSink(Config config, String tags, BatchUploader snapshotUploader) this.snapshotUploader = snapshotUploader; } + public BlockingQueue getLowRateSnapshots() { + return lowRateSnapshots; + } + public void start() { if (started.compareAndSet(false, true)) { highRateScheduled = diff --git a/dd-java-agent/agent-debugger/src/test/java/com/datadog/debugger/probe/LogProbeTest.java b/dd-java-agent/agent-debugger/src/test/java/com/datadog/debugger/probe/LogProbeTest.java index 778eefb8abb..19019ffa3e6 100644 --- a/dd-java-agent/agent-debugger/src/test/java/com/datadog/debugger/probe/LogProbeTest.java +++ b/dd-java-agent/agent-debugger/src/test/java/com/datadog/debugger/probe/LogProbeTest.java @@ -43,18 +43,18 @@ public class LogProbeTest { @Test public void testCapture() { - LogProbe.Builder builder = createLog(null); + Builder builder = createLog(null); LogProbe snapshotProbe = builder.capture(1, 420, 255, 20).build(); - Assertions.assertEquals(1, snapshotProbe.getCapture().getMaxReferenceDepth()); - Assertions.assertEquals(420, snapshotProbe.getCapture().getMaxCollectionSize()); - Assertions.assertEquals(255, snapshotProbe.getCapture().getMaxLength()); + assertEquals(1, snapshotProbe.getCapture().getMaxReferenceDepth()); + assertEquals(420, snapshotProbe.getCapture().getMaxCollectionSize()); + assertEquals(255, snapshotProbe.getCapture().getMaxLength()); } @Test public void testSampling() { - LogProbe.Builder builder = createLog(null); + Builder builder = createLog(null); LogProbe snapshotProbe = builder.sampling(0.25).build(); - Assertions.assertEquals(0.25, snapshotProbe.getSampling().getEventsPerSecond(), 0.01); + assertEquals(0.25, snapshotProbe.getSampling().getEventsPerSecond(), 0.01); } @Test @@ -78,6 +78,44 @@ public void noDebugSession() { "With no debug sessions, snapshots should get filled."); } + @Test + public void budgets() { + DebuggerSink sink = new DebuggerSink(getConfig(), mock(ProbeStatusSink.class)); + DebuggerAgentHelper.injectSink(sink); + assertEquals(0, sink.getSnapshotSink().getLowRateSnapshots().size()); + TracerAPI tracer = + CoreTracer.builder().idGenerationStrategy(IdGenerationStrategy.fromName("random")).build(); + AgentTracer.registerIfAbsent(tracer); + int runs = 100; + for (int i = 0; i < runs; i++) { + runTrace(tracer); + } + assertEquals(runs * LogProbe.PROBE_BUDGET, sink.getSnapshotSink().getLowRateSnapshots().size()); + } + + private void runTrace(TracerAPI tracer) { + AgentSpan span = tracer.startSpan("budget testing", "test span"); + span.setTag(Tags.PROPAGATED_DEBUG, "12345:1"); + try (AgentScope scope = tracer.activateSpan(span, ScopeSource.MANUAL)) { + + LogProbe logProbe = + createLog("I'm in a debug session") + .probeId(ProbeId.newId()) + .tags("session_id:12345") + .captureSnapshot(true) + .build(); + + CapturedContext entryContext = capturedContext(span, logProbe); + CapturedContext exitContext = capturedContext(span, logProbe); + logProbe.evaluate(entryContext, new LogStatus(logProbe), MethodLocation.ENTRY); + logProbe.evaluate(exitContext, new LogStatus(logProbe), MethodLocation.EXIT); + + for (int i = 0; i < 20; i++) { + logProbe.commit(entryContext, exitContext, emptyList()); + } + } + } + private boolean fillSnapshot(DebugSessionStatus status) { DebuggerAgentHelper.injectSink(new DebuggerSink(getConfig(), mock(ProbeStatusSink.class))); TracerAPI tracer = @@ -152,12 +190,11 @@ public void fillSnapshot_shouldSend(String methodLocation) { LogProbe logProbe = createLog(null).evaluateAt(MethodLocation.valueOf(methodLocation)).build(); CapturedContext entryContext = new CapturedContext(); CapturedContext exitContext = new CapturedContext(); - LogProbe.LogStatus logEntryStatus = - prepareContext(entryContext, logProbe, MethodLocation.ENTRY); + LogStatus logEntryStatus = prepareContext(entryContext, logProbe, MethodLocation.ENTRY); logEntryStatus.setSampled(true); // force sampled to avoid rate limiting executing tests! - LogProbe.LogStatus logExitStatus = prepareContext(exitContext, logProbe, MethodLocation.EXIT); + LogStatus logExitStatus = prepareContext(exitContext, logProbe, MethodLocation.EXIT); logExitStatus.setSampled(true); // force sampled to avoid rate limiting executing tests! - Snapshot snapshot = new Snapshot(Thread.currentThread(), logProbe, 10); + Snapshot snapshot = new Snapshot(currentThread(), logProbe, 10); assertTrue(logProbe.fillSnapshot(entryContext, exitContext, null, snapshot)); } @@ -172,16 +209,16 @@ public void fillSnapshot( LogProbe logProbe = createLog(null).evaluateAt(MethodLocation.EXIT).build(); CapturedContext entryContext = new CapturedContext(); CapturedContext exitContext = new CapturedContext(); - LogProbe.LogStatus entryStatus = prepareContext(entryContext, logProbe, MethodLocation.ENTRY); + LogStatus entryStatus = prepareContext(entryContext, logProbe, MethodLocation.ENTRY); fillStatus(entryStatus, sampled, condition, conditionErrors, logTemplateErrors); - LogProbe.LogStatus exitStatus = prepareContext(exitContext, logProbe, MethodLocation.EXIT); + LogStatus exitStatus = prepareContext(exitContext, logProbe, MethodLocation.EXIT); fillStatus(exitStatus, sampled, condition, conditionErrors, logTemplateErrors); - Snapshot snapshot = new Snapshot(Thread.currentThread(), logProbe, 10); + Snapshot snapshot = new Snapshot(currentThread(), logProbe, 10); assertEquals(shouldCommit, logProbe.fillSnapshot(entryContext, exitContext, null, snapshot)); } private void fillStatus( - LogProbe.LogStatus entryStatus, + LogStatus entryStatus, boolean sampled, boolean condition, boolean conditionErrors, @@ -193,10 +230,10 @@ private void fillStatus( entryStatus.setLogTemplateErrors(logTemplateErrors); } - private LogProbe.LogStatus prepareContext( + private LogStatus prepareContext( CapturedContext context, LogProbe logProbe, MethodLocation methodLocation) { context.evaluate(PROBE_ID.getEncodedId(), logProbe, "", 0, methodLocation); - return (LogProbe.LogStatus) context.getStatus(PROBE_ID.getEncodedId()); + return (LogStatus) context.getStatus(PROBE_ID.getEncodedId()); } private static Stream statusValues() { @@ -222,7 +259,7 @@ public void fillSnapshot_shouldSend_exit() { prepareContext(entryContext, logProbe, MethodLocation.ENTRY); CapturedContext exitContext = new CapturedContext(); prepareContext(exitContext, logProbe, MethodLocation.EXIT); - Snapshot snapshot = new Snapshot(Thread.currentThread(), logProbe, 10); + Snapshot snapshot = new Snapshot(currentThread(), logProbe, 10); assertTrue(logProbe.fillSnapshot(entryContext, exitContext, null, snapshot)); } @@ -230,7 +267,7 @@ public void fillSnapshot_shouldSend_exit() { public void fillSnapshot_shouldSend_evalErrors() { LogProbe logProbe = createLog(null).evaluateAt(MethodLocation.EXIT).build(); CapturedContext entryContext = new CapturedContext(); - LogProbe.LogStatus logStatus = prepareContext(entryContext, logProbe, MethodLocation.ENTRY); + LogStatus logStatus = prepareContext(entryContext, logProbe, MethodLocation.ENTRY); logStatus.addError(new EvaluationError("expr", "msg1")); logStatus.setLogTemplateErrors(true); entryContext.addThrowable(new RuntimeException("errorEntry")); @@ -239,7 +276,7 @@ public void fillSnapshot_shouldSend_evalErrors() { logStatus.addError(new EvaluationError("expr", "msg2")); logStatus.setLogTemplateErrors(true); exitContext.addThrowable(new RuntimeException("errorExit")); - Snapshot snapshot = new Snapshot(Thread.currentThread(), logProbe, 10); + Snapshot snapshot = new Snapshot(currentThread(), logProbe, 10); assertTrue(logProbe.fillSnapshot(entryContext, exitContext, null, snapshot)); assertEquals(2, snapshot.getEvaluationErrors().size()); assertEquals("msg1", snapshot.getEvaluationErrors().get(0).getMessage()); @@ -250,7 +287,7 @@ public void fillSnapshot_shouldSend_evalErrors() { "errorExit", snapshot.getCaptures().getReturn().getCapturedThrowable().getMessage()); } - private LogProbe.Builder createLog(String template) { + private Builder createLog(String template) { return LogProbe.builder() .language(LANGUAGE) .probeId(PROBE_ID)