diff --git a/CHANGES.md b/CHANGES.md index edb63bbe9d..5e959990cd 100644 --- a/CHANGES.md +++ b/CHANGES.md @@ -23,6 +23,8 @@ Release Notes. * [doc] Add Spring Gateway Plugin document * [doc] Add 4 menu items guiding users to find important notices for Spring Annotation Plugin, Custom Trace Ignoring Plugin, Kotlin Coroutine Plugin, and Spring Gateway Plugin +* Change context and parent entry span propagation mechanism from gRPC ThreadLocal context to SkyWalking native dynamic + field as new propagation mechanism, to better support async scenarios. All issues and pull requests are [here](https://github.com/apache/skywalking/milestone/222?closed=1) diff --git a/apm-sniffer/apm-sdk-plugin/grpc-1.x-plugin/src/main/java/org/apache/skywalking/apm/plugin/grpc/v1/server/ServerInterceptor.java b/apm-sniffer/apm-sdk-plugin/grpc-1.x-plugin/src/main/java/org/apache/skywalking/apm/plugin/grpc/v1/server/ServerInterceptor.java index f970d813f0..5db7248ea3 100644 --- a/apm-sniffer/apm-sdk-plugin/grpc-1.x-plugin/src/main/java/org/apache/skywalking/apm/plugin/grpc/v1/server/ServerInterceptor.java +++ b/apm-sniffer/apm-sdk-plugin/grpc-1.x-plugin/src/main/java/org/apache/skywalking/apm/plugin/grpc/v1/server/ServerInterceptor.java @@ -36,9 +36,6 @@ public class ServerInterceptor implements io.grpc.ServerInterceptor { - static final Context.Key CONTEXT_SNAPSHOT_KEY = Context.key("skywalking-grpc-context-snapshot"); - static final Context.Key ACTIVE_SPAN_KEY = Context.key("skywalking-grpc-active-span"); - @Override public ServerCall.Listener interceptCall(ServerCall call, Metadata headers, ServerCallHandler handler) { @@ -59,15 +56,17 @@ public ServerCall.Listener interceptCall(ServerCall ContextSnapshot contextSnapshot = ContextManager.capture(); AbstractSpan asyncSpan = span.prepareForAsync(); - Context context = Context.current().withValues(CONTEXT_SNAPSHOT_KEY, contextSnapshot, ACTIVE_SPAN_KEY, asyncSpan); + //Context context = Context.current().withValues(CONTEXT_SNAPSHOT_KEY, contextSnapshot, ACTIVE_SPAN_KEY, asyncSpan); ServerCall.Listener listener = Contexts.interceptCall( - context, - new TracingServerCall<>(call), + Context.current(), + new TracingServerCall<>(call, contextSnapshot, asyncSpan), headers, (serverCall, metadata) -> new TracingServerCallListener<>( handler.startCall(serverCall, metadata), - serverCall.getMethodDescriptor() + serverCall.getMethodDescriptor(), + contextSnapshot, + asyncSpan ) ); ContextManager.stopSpan(asyncSpan); diff --git a/apm-sniffer/apm-sdk-plugin/grpc-1.x-plugin/src/main/java/org/apache/skywalking/apm/plugin/grpc/v1/server/TracingServerCall.java b/apm-sniffer/apm-sdk-plugin/grpc-1.x-plugin/src/main/java/org/apache/skywalking/apm/plugin/grpc/v1/server/TracingServerCall.java index 3361c4b7db..c674b5e16b 100644 --- a/apm-sniffer/apm-sdk-plugin/grpc-1.x-plugin/src/main/java/org/apache/skywalking/apm/plugin/grpc/v1/server/TracingServerCall.java +++ b/apm-sniffer/apm-sdk-plugin/grpc-1.x-plugin/src/main/java/org/apache/skywalking/apm/plugin/grpc/v1/server/TracingServerCall.java @@ -18,38 +18,46 @@ package org.apache.skywalking.apm.plugin.grpc.v1.server; -import static org.apache.skywalking.apm.plugin.grpc.v1.Constants.RESPONSE_ON_CLOSE_OPERATION_NAME; -import static org.apache.skywalking.apm.plugin.grpc.v1.Constants.RESPONSE_ON_MESSAGE_OPERATION_NAME; -import static org.apache.skywalking.apm.plugin.grpc.v1.Constants.SERVER; - import io.grpc.ForwardingServerCall; import io.grpc.Metadata; import io.grpc.ServerCall; import io.grpc.Status; import org.apache.skywalking.apm.agent.core.context.ContextManager; +import org.apache.skywalking.apm.agent.core.context.ContextSnapshot; import org.apache.skywalking.apm.agent.core.context.tag.Tags; import org.apache.skywalking.apm.agent.core.context.trace.AbstractSpan; import org.apache.skywalking.apm.agent.core.context.trace.SpanLayer; import org.apache.skywalking.apm.network.trace.component.ComponentsDefine; import org.apache.skywalking.apm.plugin.grpc.v1.OperationNameFormatUtil; +import static org.apache.skywalking.apm.plugin.grpc.v1.Constants.RESPONSE_ON_CLOSE_OPERATION_NAME; +import static org.apache.skywalking.apm.plugin.grpc.v1.Constants.RESPONSE_ON_MESSAGE_OPERATION_NAME; +import static org.apache.skywalking.apm.plugin.grpc.v1.Constants.SERVER; + public class TracingServerCall extends ForwardingServerCall.SimpleForwardingServerCall { private final String operationPrefix; + private final ContextSnapshot contextSnapshot; + private final AbstractSpan parentEntrySpan; - protected TracingServerCall(ServerCall delegate) { + protected TracingServerCall(ServerCall delegate, + final ContextSnapshot contextSnapshot, + final AbstractSpan parentEntrySpan) { super(delegate); this.operationPrefix = OperationNameFormatUtil.formatOperationName(delegate.getMethodDescriptor()) + SERVER; + this.contextSnapshot = contextSnapshot; + this.parentEntrySpan = parentEntrySpan; } @Override public void sendMessage(RESPONSE message) { // We just create the request on message span for server stream calls. if (!getMethodDescriptor().getType().serverSendsOneMessage()) { - final AbstractSpan span = ContextManager.createLocalSpan(operationPrefix + RESPONSE_ON_MESSAGE_OPERATION_NAME); + final AbstractSpan span = ContextManager.createLocalSpan( + operationPrefix + RESPONSE_ON_MESSAGE_OPERATION_NAME); span.setComponent(ComponentsDefine.GRPC); span.setLayer(SpanLayer.RPC_FRAMEWORK); - ContextManager.continued(ServerInterceptor.CONTEXT_SNAPSHOT_KEY.get()); + ContextManager.continued(contextSnapshot); try { super.sendMessage(message); } catch (Throwable t) { @@ -68,7 +76,7 @@ public void close(Status status, Metadata trailers) { final AbstractSpan span = ContextManager.createLocalSpan(operationPrefix + RESPONSE_ON_CLOSE_OPERATION_NAME); span.setComponent(ComponentsDefine.GRPC); span.setLayer(SpanLayer.RPC_FRAMEWORK); - ContextManager.continued(ServerInterceptor.CONTEXT_SNAPSHOT_KEY.get()); + ContextManager.continued(contextSnapshot); switch (status.getCode()) { case OK: break; @@ -94,7 +102,7 @@ public void close(Status status, Metadata trailers) { break; } Tags.RPC_RESPONSE_STATUS_CODE.set(span, status.getCode().name()); - ServerInterceptor.ACTIVE_SPAN_KEY.get().asyncFinish(); + parentEntrySpan.asyncFinish(); try { super.close(status, trailers); diff --git a/apm-sniffer/apm-sdk-plugin/grpc-1.x-plugin/src/main/java/org/apache/skywalking/apm/plugin/grpc/v1/server/TracingServerCallListener.java b/apm-sniffer/apm-sdk-plugin/grpc-1.x-plugin/src/main/java/org/apache/skywalking/apm/plugin/grpc/v1/server/TracingServerCallListener.java index cb19e86c6c..65910f003f 100644 --- a/apm-sniffer/apm-sdk-plugin/grpc-1.x-plugin/src/main/java/org/apache/skywalking/apm/plugin/grpc/v1/server/TracingServerCallListener.java +++ b/apm-sniffer/apm-sdk-plugin/grpc-1.x-plugin/src/main/java/org/apache/skywalking/apm/plugin/grpc/v1/server/TracingServerCallListener.java @@ -22,6 +22,7 @@ import io.grpc.MethodDescriptor; import io.grpc.ServerCall; import org.apache.skywalking.apm.agent.core.context.ContextManager; +import org.apache.skywalking.apm.agent.core.context.ContextSnapshot; import org.apache.skywalking.apm.agent.core.context.trace.AbstractSpan; import org.apache.skywalking.apm.agent.core.context.trace.SpanLayer; import org.apache.skywalking.apm.network.trace.component.ComponentsDefine; @@ -34,11 +35,17 @@ public class TracingServerCallListener extends ForwardingServerCallListener.SimpleForwardingServerCallListener { private final MethodDescriptor.MethodType methodType; private final String operationPrefix; + private final ContextSnapshot contextSnapshot; + private final AbstractSpan parentEntrySpan; - protected TracingServerCallListener(ServerCall.Listener delegate, MethodDescriptor descriptor) { + protected TracingServerCallListener(ServerCall.Listener delegate, MethodDescriptor descriptor, + final ContextSnapshot contextSnapshot, + final AbstractSpan parentEntrySpan) { super(delegate); this.methodType = descriptor.getType(); this.operationPrefix = OperationNameFormatUtil.formatOperationName(descriptor) + SERVER; + this.contextSnapshot = contextSnapshot; + this.parentEntrySpan = parentEntrySpan; } @Override @@ -48,7 +55,7 @@ public void onMessage(REQUEST message) { final AbstractSpan span = ContextManager.createLocalSpan(operationPrefix + REQUEST_ON_MESSAGE_OPERATION_NAME); span.setComponent(ComponentsDefine.GRPC); span.setLayer(SpanLayer.RPC_FRAMEWORK); - ContextManager.continued(ServerInterceptor.CONTEXT_SNAPSHOT_KEY.get()); + ContextManager.continued(contextSnapshot); try { super.onMessage(message); } catch (Throwable t) { @@ -67,7 +74,7 @@ public void onCancel() { final AbstractSpan span = ContextManager.createLocalSpan(operationPrefix + REQUEST_ON_CANCEL_OPERATION_NAME); span.setComponent(ComponentsDefine.GRPC); span.setLayer(SpanLayer.RPC_FRAMEWORK); - ContextManager.continued(ServerInterceptor.CONTEXT_SNAPSHOT_KEY.get()); + ContextManager.continued(contextSnapshot); try { super.onCancel(); } catch (Throwable t) { @@ -75,7 +82,7 @@ public void onCancel() { throw t; } finally { ContextManager.stopSpan(); - ServerInterceptor.ACTIVE_SPAN_KEY.get().asyncFinish(); + parentEntrySpan.asyncFinish(); } } @@ -84,7 +91,7 @@ public void onHalfClose() { final AbstractSpan span = ContextManager.createLocalSpan(operationPrefix + REQUEST_ON_HALF_CLOSE_OPERATION_NAME); span.setComponent(ComponentsDefine.GRPC); span.setLayer(SpanLayer.RPC_FRAMEWORK); - ContextManager.continued(ServerInterceptor.CONTEXT_SNAPSHOT_KEY.get()); + ContextManager.continued(contextSnapshot); try { super.onHalfClose(); } catch (Throwable t) {