Skip to content

Commit

Permalink
Fix wrong dubbo trace caused by using rpcContext.isProviderSide() (#1…
Browse files Browse the repository at this point in the history
…2930)

Co-authored-by: Lauri Tulmin <[email protected]>
Co-authored-by: Lauri Tulmin <[email protected]>
  • Loading branch information
3 people authored Jan 6, 2025
1 parent e12ae97 commit ca3fbac
Show file tree
Hide file tree
Showing 10 changed files with 175 additions and 37 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,11 @@ public ElementMatcher.Junction<ClassLoader> classLoaderMatcher() {
public void injectClasses(ClassInjector injector) {
injector
.proxyBuilder(
"io.opentelemetry.javaagent.instrumentation.apachedubbo.v2_7.OpenTelemetryFilter")
"io.opentelemetry.javaagent.instrumentation.apachedubbo.v2_7.OpenTelemetryClientFilter")
.inject(InjectionMode.CLASS_ONLY);
injector
.proxyBuilder(
"io.opentelemetry.javaagent.instrumentation.apachedubbo.v2_7.OpenTelemetryServerFilter")
.inject(InjectionMode.CLASS_ONLY);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,30 +10,23 @@
import io.opentelemetry.instrumentation.apachedubbo.v2_7.internal.DubboClientNetworkAttributesGetter;
import io.opentelemetry.instrumentation.api.incubator.semconv.net.PeerServiceAttributesExtractor;
import io.opentelemetry.javaagent.bootstrap.internal.AgentCommonConfig;
import org.apache.dubbo.common.extension.Activate;
import org.apache.dubbo.rpc.Filter;
import org.apache.dubbo.rpc.Invocation;
import org.apache.dubbo.rpc.Invoker;
import org.apache.dubbo.rpc.Result;

@Activate(group = {"consumer", "provider"})
public class OpenTelemetryFilter implements Filter {
public final class DubboSingletons {
public static final Filter CLIENT_FILTER;
public static final Filter SERVER_FILTER;

private final Filter delegate;

public OpenTelemetryFilter() {
delegate =
static {
DubboTelemetry telemetry =
DubboTelemetry.builder(GlobalOpenTelemetry.get())
.addAttributesExtractor(
PeerServiceAttributesExtractor.create(
new DubboClientNetworkAttributesGetter(),
AgentCommonConfig.get().getPeerServiceResolver()))
.build()
.newFilter();
.build();
CLIENT_FILTER = telemetry.newClientFilter();
SERVER_FILTER = telemetry.newServerFilter();
}

@Override
public Result invoke(Invoker<?> invoker, Invocation invocation) {
return delegate.invoke(invoker, invocation);
}
private DubboSingletons() {}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/

package io.opentelemetry.javaagent.instrumentation.apachedubbo.v2_7;

import org.apache.dubbo.common.extension.Activate;
import org.apache.dubbo.rpc.Filter;
import org.apache.dubbo.rpc.Invocation;
import org.apache.dubbo.rpc.Invoker;
import org.apache.dubbo.rpc.Result;

@Activate(group = {"consumer"})
public final class OpenTelemetryClientFilter implements Filter {

private final Filter delegate;

public OpenTelemetryClientFilter() {
delegate = DubboSingletons.CLIENT_FILTER;
}

@Override
public Result invoke(Invoker<?> invoker, Invocation invocation) {
return delegate.invoke(invoker, invocation);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/

package io.opentelemetry.javaagent.instrumentation.apachedubbo.v2_7;

import org.apache.dubbo.common.extension.Activate;
import org.apache.dubbo.rpc.Filter;
import org.apache.dubbo.rpc.Invocation;
import org.apache.dubbo.rpc.Invoker;
import org.apache.dubbo.rpc.Result;

@Activate(group = {"provider"})
public final class OpenTelemetryServerFilter implements Filter {

private final Filter delegate;

public OpenTelemetryServerFilter() {
delegate = DubboSingletons.SERVER_FILTER;
}

@Override
public Result invoke(Invoker<?> invoker, Invocation invocation) {
return delegate.invoke(invoker, invocation);
}
}
Original file line number Diff line number Diff line change
@@ -1 +1,2 @@
io.opentelemetry.javaagent.instrumentation.apachedubbo.v2_7.OpenTelemetryFilter
io.opentelemetry.javaagent.instrumentation.apachedubbo.v2_7.OpenTelemetryClientFilter
io.opentelemetry.javaagent.instrumentation.apachedubbo.v2_7.OpenTelemetryServerFilter
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,9 @@
/** Entrypoint for instrumenting Apache Dubbo servers and clients. */
public final class DubboTelemetry {

private final Instrumenter<DubboRequest, Result> serverInstrumenter;
private final Instrumenter<DubboRequest, Result> clientInstrumenter;

/** Returns a new {@link DubboTelemetry} configured with the given {@link OpenTelemetry}. */
public static DubboTelemetry create(OpenTelemetry openTelemetry) {
return builder(openTelemetry).build();
Expand All @@ -25,18 +28,30 @@ public static DubboTelemetryBuilder builder(OpenTelemetry openTelemetry) {
return new DubboTelemetryBuilder(openTelemetry);
}

private final Instrumenter<DubboRequest, Result> serverInstrumenter;
private final Instrumenter<DubboRequest, Result> clientInstrumenter;

DubboTelemetry(
Instrumenter<DubboRequest, Result> serverInstrumenter,
Instrumenter<DubboRequest, Result> clientInstrumenter) {
this.serverInstrumenter = serverInstrumenter;
this.clientInstrumenter = clientInstrumenter;
}

/** Returns a new Dubbo {@link Filter} that traces Dubbo RPC invocations. */
/**
* Returns a new Dubbo {@link Filter} that traces Dubbo RPC invocations.
*
* @deprecated Use {@link #newClientFilter} and {@link #newServerFilter} instead.
*/
@Deprecated
public Filter newFilter() {
return new TracingFilter(serverInstrumenter, clientInstrumenter);
return TracingFilter.newFilter(serverInstrumenter, clientInstrumenter);
}

/** Returns a new Dubbo client {@link Filter} that traces Dubbo RPC invocations. */
public Filter newClientFilter() {
return TracingFilter.newClientFilter(clientInstrumenter);
}

/** Returns a new Dubbo server {@link Filter} that traces Dubbo RPC invocations. */
public Filter newServerFilter() {
return TracingFilter.newServerFilter(serverInstrumenter);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -12,13 +12,13 @@
import org.apache.dubbo.rpc.Invoker;
import org.apache.dubbo.rpc.Result;

@Activate(group = {"consumer", "provider"})
public final class OpenTelemetryFilter implements Filter {
@Activate(group = {"consumer"})
public final class OpenTelemetryClientFilter implements Filter {

private final Filter delegate;

public OpenTelemetryFilter() {
delegate = DubboTelemetry.create(GlobalOpenTelemetry.get()).newFilter();
public OpenTelemetryClientFilter() {
delegate = DubboTelemetry.create(GlobalOpenTelemetry.get()).newClientFilter();
}

@Override
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/

package io.opentelemetry.instrumentation.apachedubbo.v2_7;

import io.opentelemetry.api.GlobalOpenTelemetry;
import org.apache.dubbo.common.extension.Activate;
import org.apache.dubbo.rpc.Filter;
import org.apache.dubbo.rpc.Invocation;
import org.apache.dubbo.rpc.Invoker;
import org.apache.dubbo.rpc.Result;

@Activate(group = {"provider"})
public final class OpenTelemetryServerFilter implements Filter {

private final Filter delegate;

public OpenTelemetryServerFilter() {
delegate = DubboTelemetry.create(GlobalOpenTelemetry.get()).newServerFilter();
}

@Override
public Result invoke(Invoker<?> invoker, Invocation invocation) {
return delegate.invoke(invoker, invocation);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,14 +18,52 @@

final class TracingFilter implements Filter {

private final Instrumenter<DubboRequest, Result> serverInstrumenter;
private final Instrumenter<DubboRequest, Result> clientInstrumenter;
private final InstrumenterSupplier instrumenterSupplier;

TracingFilter(
private TracingFilter(InstrumenterSupplier instrumenterSupplier) {
this.instrumenterSupplier = instrumenterSupplier;
}

static TracingFilter newClientFilter(Instrumenter<DubboRequest, Result> clientInstrumenter) {
return newFilter(clientInstrumenter, true);
}

static TracingFilter newServerFilter(Instrumenter<DubboRequest, Result> serverInstrumenter) {
return newFilter(serverInstrumenter, false);
}

private static TracingFilter newFilter(
Instrumenter<DubboRequest, Result> instrumenter, boolean isClientSide) {
return new TracingFilter(
new InstrumenterSupplier() {

@Override
public Instrumenter<DubboRequest, Result> get(RpcContext rpcContext) {
return instrumenter;
}

@Override
public boolean isClientSide(RpcContext rpcContext) {
return isClientSide;
}
});
}

static TracingFilter newFilter(
Instrumenter<DubboRequest, Result> serverInstrumenter,
Instrumenter<DubboRequest, Result> clientInstrumenter) {
this.serverInstrumenter = serverInstrumenter;
this.clientInstrumenter = clientInstrumenter;
return new TracingFilter(
new InstrumenterSupplier() {
@Override
public Instrumenter<DubboRequest, Result> get(RpcContext rpcContext) {
return rpcContext.isConsumerSide() ? clientInstrumenter : serverInstrumenter;
}

@Override
public boolean isClientSide(RpcContext rpcContext) {
return rpcContext.isConsumerSide();
}
});
}

@Override
Expand All @@ -40,9 +78,7 @@ public Result invoke(Invoker<?> invoker, Invocation invocation) {
return invoker.invoke(invocation);
}

boolean isServer = rpcContext.isProviderSide();
Instrumenter<DubboRequest, Result> instrumenter =
isServer ? serverInstrumenter : clientInstrumenter;
Instrumenter<DubboRequest, Result> instrumenter = instrumenterSupplier.get(rpcContext);
Context parentContext = Context.current();
DubboRequest request = DubboRequest.create((RpcInvocation) invocation, rpcContext);

Expand All @@ -55,7 +91,7 @@ public Result invoke(Invoker<?> invoker, Invocation invocation) {
boolean isSynchronous = true;
try (Scope ignored = context.makeCurrent()) {
result = invoker.invoke(invocation);
if (!isServer) {
if (instrumenterSupplier.isClientSide(rpcContext)) {
CompletableFuture<Object> future = rpcContext.getCompletableFuture();
if (future != null) {
isSynchronous = false;
Expand All @@ -72,4 +108,10 @@ public Result invoke(Invoker<?> invoker, Invocation invocation) {
}
return result;
}

private interface InstrumenterSupplier {
Instrumenter<DubboRequest, Result> get(RpcContext rpcContext);

boolean isClientSide(RpcContext rpcContext);
}
}
Original file line number Diff line number Diff line change
@@ -1 +1,2 @@
io.opentelemetry.instrumentation.apachedubbo.v2_7.OpenTelemetryFilter
io.opentelemetry.instrumentation.apachedubbo.v2_7.OpenTelemetryClientFilter
io.opentelemetry.instrumentation.apachedubbo.v2_7.OpenTelemetryServerFilter

0 comments on commit ca3fbac

Please sign in to comment.