diff --git a/dd-java-agent/instrumentation/reactive-streams/src/main/java/datadog/trace/instrumentation/reactivestreams/PublisherInstrumentation.java b/dd-java-agent/instrumentation/reactive-streams/src/main/java/datadog/trace/instrumentation/reactivestreams/PublisherInstrumentation.java index c716e56d2c4..5acc5e79d1d 100644 --- a/dd-java-agent/instrumentation/reactive-streams/src/main/java/datadog/trace/instrumentation/reactivestreams/PublisherInstrumentation.java +++ b/dd-java-agent/instrumentation/reactive-streams/src/main/java/datadog/trace/instrumentation/reactivestreams/PublisherInstrumentation.java @@ -94,7 +94,7 @@ public static AgentScope onSubscribe( final AgentSpan span = InstrumentationContext.get(Publisher.class, AgentSpan.class).remove(self); final AgentSpan activeSpan = activeSpan(); - if (span == null && activeSpan == null) { + if (s == null || (span == null && activeSpan == null)) { return null; } final AgentSpan current = diff --git a/dd-java-agent/instrumentation/reactive-streams/src/main/java/datadog/trace/instrumentation/reactivestreams/SubscriberInstrumentation.java b/dd-java-agent/instrumentation/reactive-streams/src/main/java/datadog/trace/instrumentation/reactivestreams/SubscriberInstrumentation.java index a746f4f0986..6da5577618d 100644 --- a/dd-java-agent/instrumentation/reactive-streams/src/main/java/datadog/trace/instrumentation/reactivestreams/SubscriberInstrumentation.java +++ b/dd-java-agent/instrumentation/reactive-streams/src/main/java/datadog/trace/instrumentation/reactivestreams/SubscriberInstrumentation.java @@ -14,7 +14,6 @@ import datadog.trace.bootstrap.InstrumentationContext; import datadog.trace.bootstrap.instrumentation.api.AgentScope; import datadog.trace.bootstrap.instrumentation.api.AgentSpan; -import java.lang.reflect.Method; import java.util.Collections; import java.util.Map; import net.bytebuddy.asm.Advice; @@ -65,8 +64,7 @@ public ElementMatcher hierarchyMatcher() { */ public static class SubscriberDownStreamAdvice { @Advice.OnMethodEnter(suppress = Throwable.class) - public static AgentScope before( - @Advice.Origin final Method m, @Advice.This final Subscriber self) { + public static AgentScope before(@Advice.This final Subscriber self) { if (activeSpan() != null) { return null; } diff --git a/dd-java-agent/instrumentation/reactor-core-3.1/src/latestDepTest/groovy/ReactorCoreTest.groovy b/dd-java-agent/instrumentation/reactor-core-3.1/src/latestDepTest/groovy/ReactorCoreTest.groovy index ae65a2b72ad..51d9484a424 100644 --- a/dd-java-agent/instrumentation/reactor-core-3.1/src/latestDepTest/groovy/ReactorCoreTest.groovy +++ b/dd-java-agent/instrumentation/reactor-core-3.1/src/latestDepTest/groovy/ReactorCoreTest.groovy @@ -441,6 +441,15 @@ class ReactorCoreTest extends AgentTestRunner { }) } + def "test currentContext() calls on inner operator is not throwing a NPE on the advice"() { + when: + def mono = Flux.range(1, 100).windowUntil {it % 10 == 0}.count() + then: + // we are not interested into asserting a trace structure but only that the instrumentation error count is 0 + assert mono.block() == 10 + } + + @Trace(operationName = "trace-parent", resourceName = "trace-parent") def assemblePublisherUnderTrace(def publisherSupplier) { def span = startSpan("publisher-parent") diff --git a/dd-java-agent/instrumentation/reactor-core-3.1/src/main/java/datadog/trace/instrumentation/reactor/core/CoreSubscriberInstrumentation.java b/dd-java-agent/instrumentation/reactor-core-3.1/src/main/java/datadog/trace/instrumentation/reactor/core/CoreSubscriberInstrumentation.java index 9a35a4df950..7b9a03a2669 100644 --- a/dd-java-agent/instrumentation/reactor-core-3.1/src/main/java/datadog/trace/instrumentation/reactor/core/CoreSubscriberInstrumentation.java +++ b/dd-java-agent/instrumentation/reactor-core-3.1/src/main/java/datadog/trace/instrumentation/reactor/core/CoreSubscriberInstrumentation.java @@ -44,7 +44,14 @@ public void methodAdvice(MethodTransformer transformer) { public static class PropagateSpanInScopeAdvice { @Advice.OnMethodEnter(suppress = Throwable.class) public static AgentScope before(@Advice.This final CoreSubscriber self) { - final Context context = self.currentContext(); + Context context = null; + try { + context = self.currentContext(); + } catch (Throwable ignored) { + } + if (context == null) { + return null; + } if (context.hasKey("dd.span")) { Object maybeSpan = context.get("dd.span"); if (maybeSpan instanceof WithAgentSpan) { diff --git a/dd-java-agent/instrumentation/reactor-core-3.1/src/test/groovy/ReactorCoreTest.groovy b/dd-java-agent/instrumentation/reactor-core-3.1/src/test/groovy/ReactorCoreTest.groovy index fa1a0195b4f..9dd1c0616f6 100644 --- a/dd-java-agent/instrumentation/reactor-core-3.1/src/test/groovy/ReactorCoreTest.groovy +++ b/dd-java-agent/instrumentation/reactor-core-3.1/src/test/groovy/ReactorCoreTest.groovy @@ -490,6 +490,14 @@ class ReactorCoreTest extends AgentTestRunner { span.finish() } + def "test currentContext() calls on inner operator is not throwing a NPE on the advice"() { + when: + def mono = Flux.range(1, 100).windowUntil {it % 10 == 0}.count() + then: + // we are not interested into asserting a trace structure but only that the instrumentation error count is 0 + assert mono.block() == 11 + } + @Trace(operationName = "addOne", resourceName = "addOne") def static addOneFunc(int i) { return i + 1