Skip to content

Commit

Permalink
Protect currentContext access for reactor inner operators (#7895)
Browse files Browse the repository at this point in the history
  • Loading branch information
amarziali authored Nov 6, 2024
1 parent 42ee2aa commit 92bc5b5
Show file tree
Hide file tree
Showing 5 changed files with 27 additions and 5 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,7 @@ public static AgentScope onSubscribe(
final AgentSpan span =
InstrumentationContext.get(Publisher.class, AgentSpan.class).remove(self);
final AgentSpan activeSpan = activeSpan();
if (span == null && activeSpan == null) {
if (s == null || (span == null && activeSpan == null)) {
return null;
}
final AgentSpan current =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@
import datadog.trace.bootstrap.InstrumentationContext;
import datadog.trace.bootstrap.instrumentation.api.AgentScope;
import datadog.trace.bootstrap.instrumentation.api.AgentSpan;
import java.lang.reflect.Method;
import java.util.Collections;
import java.util.Map;
import net.bytebuddy.asm.Advice;
Expand Down Expand Up @@ -65,8 +64,7 @@ public ElementMatcher<TypeDescription> hierarchyMatcher() {
*/
public static class SubscriberDownStreamAdvice {
@Advice.OnMethodEnter(suppress = Throwable.class)
public static AgentScope before(
@Advice.Origin final Method m, @Advice.This final Subscriber self) {
public static AgentScope before(@Advice.This final Subscriber self) {
if (activeSpan() != null) {
return null;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -441,6 +441,15 @@ class ReactorCoreTest extends AgentTestRunner {
})
}

def "test currentContext() calls on inner operator is not throwing a NPE on the advice"() {
when:
def mono = Flux.range(1, 100).windowUntil {it % 10 == 0}.count()
then:
// we are not interested into asserting a trace structure but only that the instrumentation error count is 0
assert mono.block() == 10
}
@Trace(operationName = "trace-parent", resourceName = "trace-parent")
def assemblePublisherUnderTrace(def publisherSupplier) {
def span = startSpan("publisher-parent")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,14 @@ public void methodAdvice(MethodTransformer transformer) {
public static class PropagateSpanInScopeAdvice {
@Advice.OnMethodEnter(suppress = Throwable.class)
public static AgentScope before(@Advice.This final CoreSubscriber<?> self) {
final Context context = self.currentContext();
Context context = null;
try {
context = self.currentContext();
} catch (Throwable ignored) {
}
if (context == null) {
return null;
}
if (context.hasKey("dd.span")) {
Object maybeSpan = context.get("dd.span");
if (maybeSpan instanceof WithAgentSpan) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -490,6 +490,14 @@ class ReactorCoreTest extends AgentTestRunner {
span.finish()
}

def "test currentContext() calls on inner operator is not throwing a NPE on the advice"() {
when:
def mono = Flux.range(1, 100).windowUntil {it % 10 == 0}.count()
then:
// we are not interested into asserting a trace structure but only that the instrumentation error count is 0
assert mono.block() == 11
}

@Trace(operationName = "addOne", resourceName = "addOne")
def static addOneFunc(int i) {
return i + 1
Expand Down

0 comments on commit 92bc5b5

Please sign in to comment.