From 7358873420fcf33db8ca001d59e71ca40f625cea Mon Sep 17 00:00:00 2001 From: Daniel Zhang Date: Wed, 20 Mar 2024 17:51:13 +0800 Subject: [PATCH 01/10] Add across thread solution for sofarpc --- .../sofarpc/SofaBoltCallbackActivation.java | 102 ++++++++++ .../SofaBoltCallbackConstructInterceptor.java | 32 ++++ .../SofaBoltCallbackExceptionInterceptor.java | 69 +++++++ .../SofaBoltCallbackInvokeInterceptor.java | 62 ++++++ .../src/main/resources/skywalking-plugin.def | 1 + .../SofaBoltCallbackInterceptorTest.java | 179 ++++++++++++++++++ 6 files changed, 445 insertions(+) create mode 100644 apm-sniffer/apm-sdk-plugin/sofarpc-plugin/src/main/java/org/apache/skywalking/apm/plugin/sofarpc/SofaBoltCallbackActivation.java create mode 100644 apm-sniffer/apm-sdk-plugin/sofarpc-plugin/src/main/java/org/apache/skywalking/apm/plugin/sofarpc/SofaBoltCallbackConstructInterceptor.java create mode 100644 apm-sniffer/apm-sdk-plugin/sofarpc-plugin/src/main/java/org/apache/skywalking/apm/plugin/sofarpc/SofaBoltCallbackExceptionInterceptor.java create mode 100644 apm-sniffer/apm-sdk-plugin/sofarpc-plugin/src/main/java/org/apache/skywalking/apm/plugin/sofarpc/SofaBoltCallbackInvokeInterceptor.java create mode 100644 apm-sniffer/apm-sdk-plugin/sofarpc-plugin/src/test/java/org/apache/skywalking/apm/plugin/sofarpc/SofaBoltCallbackInterceptorTest.java diff --git a/apm-sniffer/apm-sdk-plugin/sofarpc-plugin/src/main/java/org/apache/skywalking/apm/plugin/sofarpc/SofaBoltCallbackActivation.java b/apm-sniffer/apm-sdk-plugin/sofarpc-plugin/src/main/java/org/apache/skywalking/apm/plugin/sofarpc/SofaBoltCallbackActivation.java new file mode 100644 index 0000000000..983aae8529 --- /dev/null +++ b/apm-sniffer/apm-sdk-plugin/sofarpc-plugin/src/main/java/org/apache/skywalking/apm/plugin/sofarpc/SofaBoltCallbackActivation.java @@ -0,0 +1,102 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package org.apache.skywalking.apm.plugin.sofarpc; + +import net.bytebuddy.description.method.MethodDescription; +import net.bytebuddy.matcher.ElementMatcher; +import org.apache.skywalking.apm.agent.core.plugin.interceptor.ConstructorInterceptPoint; +import org.apache.skywalking.apm.agent.core.plugin.interceptor.InstanceMethodsInterceptPoint; +import org.apache.skywalking.apm.agent.core.plugin.interceptor.enhance.ClassInstanceMethodsEnhancePluginDefine; +import org.apache.skywalking.apm.agent.core.plugin.match.ClassMatch; +import org.apache.skywalking.apm.agent.core.plugin.match.HierarchyMatch; + +import static net.bytebuddy.matcher.ElementMatchers.any; +import static net.bytebuddy.matcher.ElementMatchers.named; +import static net.bytebuddy.matcher.ElementMatchers.takesArguments; + +public class SofaBoltCallbackActivation extends ClassInstanceMethodsEnhancePluginDefine { + + private static final String ENHANCE_CLASS = "com.alipay.remoting.InvokeCallback"; + private static final String INIT_METHOD_INTERCEPTOR = "org.apache.skywalking.apm.plugin.sofarpc.SofaBoltCallbackConstructInterceptor"; + private static final String CALL_METHOD_INTERCEPTOR = "org.apache.skywalking.apm.plugin.sofarpc.SofaBoltCallbackInvokeInterceptor"; + private static final String EXCEPTION_METHOD_INTERCEPTOR = "org.apache.skywalking.apm.plugin.sofarpc.SofaBoltCallbackExceptionInterceptor"; + private static final String RESPONSE_METHOD_NAME = "onResponse"; + private static final String EXCEPTION_METHOD_NAME = "onException"; + + @Override + protected ClassMatch enhanceClass() { + return HierarchyMatch.byHierarchyMatch(ENHANCE_CLASS); + } + + @Override + public ConstructorInterceptPoint[] getConstructorsInterceptPoints() { + return new ConstructorInterceptPoint[] { + new ConstructorInterceptPoint() { + @Override + public ElementMatcher getConstructorMatcher() { + return any(); + } + + @Override + public String getConstructorInterceptor() { + return INIT_METHOD_INTERCEPTOR; + } + } + }; + + } + + @Override + public InstanceMethodsInterceptPoint[] getInstanceMethodsInterceptPoints() { + return new InstanceMethodsInterceptPoint[] { + new InstanceMethodsInterceptPoint() { + @Override + public ElementMatcher getMethodsMatcher() { + return named(RESPONSE_METHOD_NAME).and(takesArguments(1)); + } + + @Override + public String getMethodsInterceptor() { + return CALL_METHOD_INTERCEPTOR; + } + + @Override + public boolean isOverrideArgs() { + return false; + } + }, + new InstanceMethodsInterceptPoint() { + @Override + public ElementMatcher getMethodsMatcher() { + return named(EXCEPTION_METHOD_NAME).and(takesArguments(1)); + } + + @Override + public String getMethodsInterceptor() { + return EXCEPTION_METHOD_INTERCEPTOR; + } + + @Override + public boolean isOverrideArgs() { + return false; + } + } + }; + } +} diff --git a/apm-sniffer/apm-sdk-plugin/sofarpc-plugin/src/main/java/org/apache/skywalking/apm/plugin/sofarpc/SofaBoltCallbackConstructInterceptor.java b/apm-sniffer/apm-sdk-plugin/sofarpc-plugin/src/main/java/org/apache/skywalking/apm/plugin/sofarpc/SofaBoltCallbackConstructInterceptor.java new file mode 100644 index 0000000000..4ddb579d53 --- /dev/null +++ b/apm-sniffer/apm-sdk-plugin/sofarpc-plugin/src/main/java/org/apache/skywalking/apm/plugin/sofarpc/SofaBoltCallbackConstructInterceptor.java @@ -0,0 +1,32 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package org.apache.skywalking.apm.plugin.sofarpc; + +import org.apache.skywalking.apm.agent.core.context.ContextManager; +import org.apache.skywalking.apm.agent.core.plugin.interceptor.enhance.EnhancedInstance; +import org.apache.skywalking.apm.agent.core.plugin.interceptor.enhance.InstanceConstructorInterceptor; + +public class SofaBoltCallbackConstructInterceptor implements InstanceConstructorInterceptor { + @Override + public void onConstruct(EnhancedInstance objInst, Object[] allArguments) { + if (ContextManager.isActive()) { + objInst.setSkyWalkingDynamicField(ContextManager.capture()); + } + } +} diff --git a/apm-sniffer/apm-sdk-plugin/sofarpc-plugin/src/main/java/org/apache/skywalking/apm/plugin/sofarpc/SofaBoltCallbackExceptionInterceptor.java b/apm-sniffer/apm-sdk-plugin/sofarpc-plugin/src/main/java/org/apache/skywalking/apm/plugin/sofarpc/SofaBoltCallbackExceptionInterceptor.java new file mode 100644 index 0000000000..0727ab9896 --- /dev/null +++ b/apm-sniffer/apm-sdk-plugin/sofarpc-plugin/src/main/java/org/apache/skywalking/apm/plugin/sofarpc/SofaBoltCallbackExceptionInterceptor.java @@ -0,0 +1,69 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package org.apache.skywalking.apm.plugin.sofarpc; + +import java.lang.reflect.Method; +import org.apache.skywalking.apm.agent.core.context.ContextManager; +import org.apache.skywalking.apm.agent.core.context.ContextSnapshot; +import org.apache.skywalking.apm.agent.core.context.trace.AbstractSpan; +import org.apache.skywalking.apm.agent.core.plugin.interceptor.enhance.EnhancedInstance; +import org.apache.skywalking.apm.agent.core.plugin.interceptor.enhance.InstanceMethodsAroundInterceptor; +import org.apache.skywalking.apm.agent.core.plugin.interceptor.enhance.MethodInterceptResult; + +public class SofaBoltCallbackExceptionInterceptor implements InstanceMethodsAroundInterceptor { + @Override + public void beforeMethod(EnhancedInstance objInst, + Method method, + Object[] allArguments, + Class[] argumentsTypes, + MethodInterceptResult result) throws Throwable { + ContextManager.createLocalSpan("Thread/" + objInst.getClass().getName() + "/" + method.getName()); + ContextSnapshot cachedObjects = (ContextSnapshot) objInst.getSkyWalkingDynamicField(); + if (cachedObjects != null) { + ContextManager.continued(cachedObjects); + } + if (allArguments[0] instanceof Throwable) { + AbstractSpan abstractSpan = ContextManager.activeSpan(); + if (abstractSpan != null) { + abstractSpan.log((Throwable) allArguments[0]); + } + } + } + + @Override + public Object afterMethod(EnhancedInstance objInst, + Method method, + Object[] allArguments, + Class[] argumentsTypes, + Object ret) throws Throwable { + ContextManager.stopSpan(); + // clear ContextSnapshot + objInst.setSkyWalkingDynamicField(null); + return ret; + } + + @Override + public void handleMethodException(EnhancedInstance objInst, + Method method, + Object[] allArguments, + Class[] argumentsTypes, + Throwable t) { + ContextManager.activeSpan().log(t); + } +} diff --git a/apm-sniffer/apm-sdk-plugin/sofarpc-plugin/src/main/java/org/apache/skywalking/apm/plugin/sofarpc/SofaBoltCallbackInvokeInterceptor.java b/apm-sniffer/apm-sdk-plugin/sofarpc-plugin/src/main/java/org/apache/skywalking/apm/plugin/sofarpc/SofaBoltCallbackInvokeInterceptor.java new file mode 100644 index 0000000000..b3987e539d --- /dev/null +++ b/apm-sniffer/apm-sdk-plugin/sofarpc-plugin/src/main/java/org/apache/skywalking/apm/plugin/sofarpc/SofaBoltCallbackInvokeInterceptor.java @@ -0,0 +1,62 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package org.apache.skywalking.apm.plugin.sofarpc; + +import java.lang.reflect.Method; +import org.apache.skywalking.apm.agent.core.context.ContextManager; +import org.apache.skywalking.apm.agent.core.context.ContextSnapshot; +import org.apache.skywalking.apm.agent.core.plugin.interceptor.enhance.EnhancedInstance; +import org.apache.skywalking.apm.agent.core.plugin.interceptor.enhance.InstanceMethodsAroundInterceptor; +import org.apache.skywalking.apm.agent.core.plugin.interceptor.enhance.MethodInterceptResult; + +public class SofaBoltCallbackInvokeInterceptor implements InstanceMethodsAroundInterceptor { + @Override + public void beforeMethod(EnhancedInstance objInst, + Method method, + Object[] allArguments, + Class[] argumentsTypes, + MethodInterceptResult result) { + ContextManager.createLocalSpan("Thread/" + objInst.getClass().getName() + "/" + method.getName()); + ContextSnapshot cachedObjects = (ContextSnapshot) objInst.getSkyWalkingDynamicField(); + if (cachedObjects != null) { + ContextManager.continued(cachedObjects); + } + } + + @Override + public Object afterMethod(EnhancedInstance objInst, + Method method, + Object[] allArguments, + Class[] argumentsTypes, + Object ret) { + ContextManager.stopSpan(); + // clear ContextSnapshot + objInst.setSkyWalkingDynamicField(null); + return ret; + } + + @Override + public void handleMethodException(EnhancedInstance objInst, + Method method, + Object[] allArguments, + Class[] argumentsTypes, + Throwable t) { + ContextManager.activeSpan().log(t); + } +} diff --git a/apm-sniffer/apm-sdk-plugin/sofarpc-plugin/src/main/resources/skywalking-plugin.def b/apm-sniffer/apm-sdk-plugin/sofarpc-plugin/src/main/resources/skywalking-plugin.def index 9850487d56..4113eb11f0 100644 --- a/apm-sniffer/apm-sdk-plugin/sofarpc-plugin/src/main/resources/skywalking-plugin.def +++ b/apm-sniffer/apm-sdk-plugin/sofarpc-plugin/src/main/resources/skywalking-plugin.def @@ -16,3 +16,4 @@ sofarpc=org.apache.skywalking.apm.plugin.sofarpc.SofaRpcConsumerInstrumentation sofarpc=org.apache.skywalking.apm.plugin.sofarpc.SofaRpcProviderInstrumentation +sofarpc=org.apache.skywalking.apm.plugin.sofarpc.SofaBoltCallbackActivation diff --git a/apm-sniffer/apm-sdk-plugin/sofarpc-plugin/src/test/java/org/apache/skywalking/apm/plugin/sofarpc/SofaBoltCallbackInterceptorTest.java b/apm-sniffer/apm-sdk-plugin/sofarpc-plugin/src/test/java/org/apache/skywalking/apm/plugin/sofarpc/SofaBoltCallbackInterceptorTest.java new file mode 100644 index 0000000000..1f4d17291b --- /dev/null +++ b/apm-sniffer/apm-sdk-plugin/sofarpc-plugin/src/test/java/org/apache/skywalking/apm/plugin/sofarpc/SofaBoltCallbackInterceptorTest.java @@ -0,0 +1,179 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package org.apache.skywalking.apm.plugin.sofarpc; + +import com.alipay.remoting.InvokeCallback; +import java.lang.reflect.Method; +import java.util.List; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.Executor; +import java.util.concurrent.Executors; +import java.util.concurrent.TimeUnit; +import org.apache.skywalking.apm.agent.core.context.ContextManager; +import org.apache.skywalking.apm.agent.core.context.ContextSnapshot; +import org.apache.skywalking.apm.agent.core.context.trace.AbstractSpan; +import org.apache.skywalking.apm.agent.core.context.trace.AbstractTracingSpan; +import org.apache.skywalking.apm.agent.core.context.trace.TraceSegment; +import org.apache.skywalking.apm.agent.core.plugin.interceptor.enhance.EnhancedInstance; +import org.apache.skywalking.apm.agent.test.helper.SegmentHelper; +import org.apache.skywalking.apm.agent.test.tools.AgentServiceRule; +import org.apache.skywalking.apm.agent.test.tools.SegmentStorage; +import org.apache.skywalking.apm.agent.test.tools.SegmentStoragePoint; +import org.apache.skywalking.apm.agent.test.tools.TracingSegmentRunner; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Rule; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.mockito.Mock; +import org.mockito.junit.MockitoJUnit; +import org.mockito.junit.MockitoRule; + +import static org.hamcrest.CoreMatchers.is; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertThat; + +@RunWith(TracingSegmentRunner.class) +public class SofaBoltCallbackInterceptorTest { + + @SegmentStoragePoint + private SegmentStorage segmentStorage; + + @Rule + public AgentServiceRule agentServiceRule = new AgentServiceRule(); + @Rule + public MockitoRule rule = MockitoJUnit.rule(); + + private EnhancedInstance enhancedInstance = new EnhancedInstance() { + + private Object object; + + @Override + public Object getSkyWalkingDynamicField() { + return object; + } + + @Override + public void setSkyWalkingDynamicField(Object value) { + this.object = value; + } + }; + + @Mock + private ContextSnapshot contextSnapshot; + private final Executor executor = Executors.newFixedThreadPool(1); + + private SofaBoltCallbackConstructInterceptor constructInterceptor; + private SofaBoltCallbackExceptionInterceptor exceptionInterceptor; + private SofaBoltCallbackInvokeInterceptor invokeInterceptor; + + private Object[] arguments; + private Object[] throwableArgs; + + private Method responseMethod; + private Method exceptionMethod; + private InvokeCallback callback; + + @Before + public void before() throws NoSuchMethodException { + constructInterceptor = new SofaBoltCallbackConstructInterceptor(); + exceptionInterceptor = new SofaBoltCallbackExceptionInterceptor(); + invokeInterceptor = new SofaBoltCallbackInvokeInterceptor(); + callback = new InvokeCallback() { + + @Override + public void onResponse(Object o) { + ContextManager.createLocalSpan("TestBegin"); + ContextManager.stopSpan(); + } + + @Override + public void onException(Throwable throwable) { + + } + + @Override + public Executor getExecutor() { + return null; + } + }; + + responseMethod = callback.getClass().getMethod("onResponse", Object.class); + exceptionMethod = callback.getClass().getMethod("onException", Throwable.class); + arguments = new Object[] {new Object()}; + throwableArgs = new Object[] {new NullPointerException()}; + } + + @Test + public void testOnConstructor() { + constructInterceptor.onConstruct(enhancedInstance, null); + Assert.assertNull(enhancedInstance.getSkyWalkingDynamicField()); + } + + @Test + public void testResponse() throws Throwable { + enhancedInstance.setSkyWalkingDynamicField(contextSnapshot); + invokeInterceptor.beforeMethod(enhancedInstance, responseMethod, arguments, null, null); + invokeInterceptor.afterMethod(enhancedInstance, responseMethod, arguments, null, null); + + assertThat(segmentStorage.getTraceSegments().size(), is(1)); + TraceSegment traceSegment = segmentStorage.getTraceSegments().get(0); + List spans = SegmentHelper.getSpans(traceSegment); + assertThat(spans.size(), is(1)); + } + + @Test + public void testException() throws Throwable { + enhancedInstance.setSkyWalkingDynamicField(contextSnapshot); + exceptionInterceptor.beforeMethod(enhancedInstance, exceptionMethod, throwableArgs, null, null); + exceptionInterceptor.afterMethod(enhancedInstance, exceptionMethod, throwableArgs, null, null); + + assertThat(segmentStorage.getTraceSegments().size(), is(1)); + TraceSegment traceSegment = segmentStorage.getTraceSegments().get(0); + List spans = SegmentHelper.getSpans(traceSegment); + assertThat(spans.size(), is(1)); + } + + @Test + public void testResponseIntegrally() throws InterruptedException { + AbstractSpan testBegin = ContextManager.createLocalSpan("TestBegin"); + constructInterceptor.onConstruct(enhancedInstance, null); + CountDownLatch countDownLatch = new CountDownLatch(1); + ContextManager.stopSpan(testBegin); + + executor.execute(() -> { + invokeInterceptor.beforeMethod(enhancedInstance, responseMethod, arguments, null, null); + callback.onResponse(new Object()); + invokeInterceptor.afterMethod(enhancedInstance, responseMethod, arguments, null, null); + countDownLatch.countDown(); + }); + + countDownLatch.await(1000, TimeUnit.MILLISECONDS); + + assertEquals(2, segmentStorage.getTraceSegments().size()); + TraceSegment traceSegment1 = segmentStorage.getTraceSegments().get(0); + List spans1 = SegmentHelper.getSpans(traceSegment1); + assertEquals(1, spans1.size()); + TraceSegment traceSegment2 = segmentStorage.getTraceSegments().get(1); + List spans2 = SegmentHelper.getSpans(traceSegment2); + assertEquals(2, spans2.size()); + assertEquals("TestBegin", spans2.get(0).getOperationName()); + } + +} From 7d9e11ddd6b0a3fa44f691edc6a777f52c39ff55 Mon Sep 17 00:00:00 2001 From: Daniel Zhang Date: Wed, 20 Mar 2024 17:58:06 +0800 Subject: [PATCH 02/10] Change CHANGES.md --- CHANGES.md | 1 + 1 file changed, 1 insertion(+) diff --git a/CHANGES.md b/CHANGES.md index 11dcc86166..5627ea6ad0 100644 --- a/CHANGES.md +++ b/CHANGES.md @@ -18,6 +18,7 @@ Release Notes. * Archive the expired plugins `impala-jdbc-2.6.x-plugin`. * Fix a bug in Spring Cloud Gateway if HttpClientFinalizer#send does not invoke, the span created at NettyRoutingFilterInterceptor can not stop. * Fix not tracing in HttpClient v5 when HttpHost(arg[0]) is null but `RoutingSupport#determineHost` works. +* Support across thread tracing for SOFA-RPC. #### Documentation * Update docs to describe `expired-plugins`. From 9393f887a1184cae892ef4f7a52bba660090cd30 Mon Sep 17 00:00:00 2001 From: Daniel Zhang Date: Thu, 21 Mar 2024 17:26:25 +0800 Subject: [PATCH 03/10] Add sofarpc test casees --- .../SofaBoltCallbackInterceptorTest.java | 6 +- .../sofarpc-scenario/config/expectedData.yaml | 78 +++++++++++++++++++ .../testcase/sofarpc/SofaRpcApplication.java | 14 +++- .../sofarpc/callback/TestCallback.java | 48 ++++++++++++ .../sofarpc/controller/CaseController.java | 13 ++++ .../interfaces/SofaRpcDemoService.java | 4 + .../service/SofaRpcDemoServiceImpl.java | 10 +++ 7 files changed, 169 insertions(+), 4 deletions(-) create mode 100644 test/plugin/scenarios/sofarpc-scenario/src/main/java/org/apache/skywalking/apm/testcase/sofarpc/callback/TestCallback.java diff --git a/apm-sniffer/apm-sdk-plugin/sofarpc-plugin/src/test/java/org/apache/skywalking/apm/plugin/sofarpc/SofaBoltCallbackInterceptorTest.java b/apm-sniffer/apm-sdk-plugin/sofarpc-plugin/src/test/java/org/apache/skywalking/apm/plugin/sofarpc/SofaBoltCallbackInterceptorTest.java index 1f4d17291b..071c6135fa 100644 --- a/apm-sniffer/apm-sdk-plugin/sofarpc-plugin/src/test/java/org/apache/skywalking/apm/plugin/sofarpc/SofaBoltCallbackInterceptorTest.java +++ b/apm-sniffer/apm-sdk-plugin/sofarpc-plugin/src/test/java/org/apache/skywalking/apm/plugin/sofarpc/SofaBoltCallbackInterceptorTest.java @@ -99,7 +99,7 @@ public void before() throws NoSuchMethodException { @Override public void onResponse(Object o) { - ContextManager.createLocalSpan("TestBegin"); + ContextManager.createLocalSpan("onResponse"); ContextManager.stopSpan(); } @@ -173,7 +173,9 @@ public void testResponseIntegrally() throws InterruptedException { TraceSegment traceSegment2 = segmentStorage.getTraceSegments().get(1); List spans2 = SegmentHelper.getSpans(traceSegment2); assertEquals(2, spans2.size()); - assertEquals("TestBegin", spans2.get(0).getOperationName()); + assertEquals("onResponse", spans2.get(0).getOperationName()); + + assertEquals(traceSegment1.getRelatedGlobalTrace().getId(),traceSegment2.getRef().getTraceId()); } } diff --git a/test/plugin/scenarios/sofarpc-scenario/config/expectedData.yaml b/test/plugin/scenarios/sofarpc-scenario/config/expectedData.yaml index f8d0f4f807..2a9155e765 100644 --- a/test/plugin/scenarios/sofarpc-scenario/config/expectedData.yaml +++ b/test/plugin/scenarios/sofarpc-scenario/config/expectedData.yaml @@ -17,6 +17,23 @@ segmentItems: - serviceName: sofarpc-scenario segmentSize: gt 2 segments: + - segmentId: not null + spans: + - operationName: HEAD:/sofarpc-scenario/case/healthCheck + parentSpanId: -1 + spanId: 0 + spanLayer: Http + startTime: nq 0 + endTime: nq 0 + componentId: 1 + isError: false + spanType: Entry + peer: '' + skipAnalysis: false + tags: + - {key: url, value: 'http://localhost:8080/sofarpc-scenario/case/healthCheck'} + - {key: http.method, value: HEAD} + - {key: http.status_code, value: '200'} - segmentId: not null spans: - operationName: org.apache.skywalking.apm.testcase.sofarpc.interfaces.SofaRpcDemoService.hello(java.lang.String) @@ -34,6 +51,37 @@ segmentItems: refType: CrossProcess, parentSpanId: 1, parentTraceSegmentId: not null, parentServiceInstance: not null, parentService: sofarpc-scenario, traceId: not null} skipAnalysis: 'false' + - segmentId: not null + spans: + - operationName: org.apache.skywalking.apm.testcase.sofarpc.interfaces.SofaRpcDemoService.onAppResponse(java.lang.String) + parentSpanId: 0 + spanId: 1 + spanLayer: RPCFramework + startTime: nq 0 + endTime: nq 0 + componentId: 43 + isError: false + spanType: Exit + peer: 127.0.0.1:12200 + skipAnalysis: false + tags: + - {key: url, value: 'bolt://127.0.0.1:12200/org.apache.skywalking.apm.testcase.sofarpc.interfaces.SofaRpcDemoService.onAppResponse(java.lang.String)'} + - operationName: Thread/com.alipay.sofa.rpc.message.bolt.BoltInvokerCallback/onResponse + parentSpanId: -1 + spanId: 0 + spanLayer: Unknown + startTime: nq 0 + endTime: nq 0 + componentId: 0 + isError: false + spanType: Local + peer: '' + skipAnalysis: false + refs: + - {parentEndpoint: 'GET:/sofarpc-scenario/case/sofarpc', networkAddress: '', + refType: CrossThread, parentSpanId: 2, parentTraceSegmentId: not null, + parentServiceInstance: not null, parentService: sofarpc-scenario, + traceId: not null} - segmentId: not null spans: - operationName: org.apache.skywalking.apm.testcase.sofarpc.interfaces.SofaRpcDemoService.hello(java.lang.String) @@ -49,6 +97,19 @@ segmentItems: tags: - {key: url, value: 'bolt://127.0.0.1:12200/org.apache.skywalking.apm.testcase.sofarpc.interfaces.SofaRpcDemoService.hello(java.lang.String)'} skipAnalysis: 'false' + - operationName: org.apache.skywalking.apm.testcase.sofarpc.interfaces.SofaRpcDemoService.callback(java.lang.String) + parentSpanId: 0 + spanId: 2 + spanLayer: RPCFramework + startTime: nq 0 + endTime: nq 0 + componentId: 43 + isError: false + spanType: Exit + peer: 127.0.0.1:12200 + tags: + - { key: url, value: 'bolt://127.0.0.1:12200/org.apache.skywalking.apm.testcase.sofarpc.interfaces.SofaRpcDemoService.callback(java.lang.String)' } + skipAnalysis: 'false' - operationName: GET:/sofarpc-scenario/case/sofarpc parentSpanId: -1 spanId: 0 @@ -64,3 +125,20 @@ segmentItems: - {key: http.method, value: GET} - {key: http.status_code, value: '200'} skipAnalysis: 'false' + - segmentId: not null + spans: + - operationName: org.apache.skywalking.apm.testcase.sofarpc.interfaces.SofaRpcDemoService.callback(java.lang.String) + parentSpanId: -1 + spanId: 0 + spanLayer: RPCFramework + startTime: nq 0 + endTime: nq 0 + componentId: 43 + isError: false + spanType: Entry + peer: '' + refs: + - { parentEndpoint: GET:/sofarpc-scenario/case/sofarpc, networkAddress: '127.0.0.1:12200', + refType: CrossProcess, parentSpanId: 2, parentTraceSegmentId: not null, parentServiceInstance: not + null, parentService: sofarpc-scenario, traceId: not null } + skipAnalysis: 'false' diff --git a/test/plugin/scenarios/sofarpc-scenario/src/main/java/org/apache/skywalking/apm/testcase/sofarpc/SofaRpcApplication.java b/test/plugin/scenarios/sofarpc-scenario/src/main/java/org/apache/skywalking/apm/testcase/sofarpc/SofaRpcApplication.java index faa8bb8639..609a0e5266 100644 --- a/test/plugin/scenarios/sofarpc-scenario/src/main/java/org/apache/skywalking/apm/testcase/sofarpc/SofaRpcApplication.java +++ b/test/plugin/scenarios/sofarpc-scenario/src/main/java/org/apache/skywalking/apm/testcase/sofarpc/SofaRpcApplication.java @@ -18,6 +18,7 @@ package org.apache.skywalking.apm.testcase.sofarpc; +import com.alipay.sofa.rpc.common.RpcConstants; import com.alipay.sofa.rpc.config.ConsumerConfig; import com.alipay.sofa.rpc.config.ProviderConfig; import com.alipay.sofa.rpc.config.ServerConfig; @@ -42,8 +43,9 @@ public static class SofaRpcConfiguration { public ProviderConfig provider() { ServerConfig config = new ServerConfig().setProtocol("bolt").setPort(12200).setDaemon(true); - ProviderConfig providerConfig = new ProviderConfig().setInterfaceId(SofaRpcDemoService.class - .getName()).setRef(new SofaRpcDemoServiceImpl()).setServer(config); + ProviderConfig providerConfig = new ProviderConfig().setInterfaceId( + SofaRpcDemoService.class + .getName()).setRef(new SofaRpcDemoServiceImpl()).setServer(config); providerConfig.export(); return providerConfig; @@ -55,5 +57,13 @@ public ConsumerConfig consumer() { .setProtocol("bolt") .setDirectUrl("bolt://127.0.0.1:12200"); } + + @Bean + public ConsumerConfig callbackConsumer() { + return new ConsumerConfig().setInterfaceId(SofaRpcDemoService.class.getName()) + .setProtocol("bolt") + .setInvokeType(RpcConstants.INVOKER_TYPE_CALLBACK) + .setDirectUrl("bolt://127.0.0.1:12200"); + } } } diff --git a/test/plugin/scenarios/sofarpc-scenario/src/main/java/org/apache/skywalking/apm/testcase/sofarpc/callback/TestCallback.java b/test/plugin/scenarios/sofarpc-scenario/src/main/java/org/apache/skywalking/apm/testcase/sofarpc/callback/TestCallback.java new file mode 100644 index 0000000000..34fc621e47 --- /dev/null +++ b/test/plugin/scenarios/sofarpc-scenario/src/main/java/org/apache/skywalking/apm/testcase/sofarpc/callback/TestCallback.java @@ -0,0 +1,48 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package org.apache.skywalking.apm.testcase.sofarpc.callback; + +import com.alipay.sofa.rpc.core.exception.SofaRpcException; +import com.alipay.sofa.rpc.core.invoke.SofaResponseCallback; +import com.alipay.sofa.rpc.core.request.RequestBase; +import org.apache.skywalking.apm.testcase.sofarpc.interfaces.SofaRpcDemoService; + +public class TestCallback implements SofaResponseCallback { + + private SofaRpcDemoService service; + + public TestCallback(final SofaRpcDemoService service) { + this.service = service; + } + + @Override + public void onAppResponse(final Object o, final String s, final RequestBase requestBase) { + service.onAppResponse("hello"); + } + + @Override + public void onAppException(final Throwable throwable, final String s, final RequestBase requestBase) { + + } + + @Override + public void onSofaException(final SofaRpcException e, final String s, final RequestBase requestBase) { + + } +} diff --git a/test/plugin/scenarios/sofarpc-scenario/src/main/java/org/apache/skywalking/apm/testcase/sofarpc/controller/CaseController.java b/test/plugin/scenarios/sofarpc-scenario/src/main/java/org/apache/skywalking/apm/testcase/sofarpc/controller/CaseController.java index ca9320001e..fff7b5ac34 100644 --- a/test/plugin/scenarios/sofarpc-scenario/src/main/java/org/apache/skywalking/apm/testcase/sofarpc/controller/CaseController.java +++ b/test/plugin/scenarios/sofarpc-scenario/src/main/java/org/apache/skywalking/apm/testcase/sofarpc/controller/CaseController.java @@ -19,8 +19,11 @@ package org.apache.skywalking.apm.testcase.sofarpc.controller; import com.alipay.sofa.rpc.config.ConsumerConfig; +import com.alipay.sofa.rpc.context.RpcInvokeContext; +import org.apache.skywalking.apm.testcase.sofarpc.callback.TestCallback; import org.apache.skywalking.apm.testcase.sofarpc.interfaces.SofaRpcDemoService; import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.beans.factory.annotation.Qualifier; import org.springframework.web.bind.annotation.RequestMapping; import org.springframework.web.bind.annotation.ResponseBody; import org.springframework.web.bind.annotation.RestController; @@ -32,8 +35,13 @@ public class CaseController { private static final String SUCCESS = "Success"; @Autowired + @Qualifier("consumer") private ConsumerConfig consumerConfig; + @Autowired + @Qualifier("callbackConsumer") + private ConsumerConfig callbackConsumer; + @RequestMapping("/healthCheck") @ResponseBody public String healthCheck() { @@ -45,6 +53,11 @@ public String healthCheck() { public String sofarpc() { SofaRpcDemoService service = consumerConfig.refer(); service.hello("sofarpc"); + + SofaRpcDemoService callbackService = callbackConsumer.refer(); + RpcInvokeContext invokeCtx = RpcInvokeContext.peekContext(); + invokeCtx.setResponseCallback(new TestCallback(service)); + callbackService.callback("sofarpc"); return SUCCESS; } } diff --git a/test/plugin/scenarios/sofarpc-scenario/src/main/java/org/apache/skywalking/apm/testcase/sofarpc/interfaces/SofaRpcDemoService.java b/test/plugin/scenarios/sofarpc-scenario/src/main/java/org/apache/skywalking/apm/testcase/sofarpc/interfaces/SofaRpcDemoService.java index cd541109b7..2c2ef3b605 100644 --- a/test/plugin/scenarios/sofarpc-scenario/src/main/java/org/apache/skywalking/apm/testcase/sofarpc/interfaces/SofaRpcDemoService.java +++ b/test/plugin/scenarios/sofarpc-scenario/src/main/java/org/apache/skywalking/apm/testcase/sofarpc/interfaces/SofaRpcDemoService.java @@ -21,4 +21,8 @@ public interface SofaRpcDemoService { String hello(String name); + + String onAppResponse(String name); + + String callback(String name); } diff --git a/test/plugin/scenarios/sofarpc-scenario/src/main/java/org/apache/skywalking/apm/testcase/sofarpc/service/SofaRpcDemoServiceImpl.java b/test/plugin/scenarios/sofarpc-scenario/src/main/java/org/apache/skywalking/apm/testcase/sofarpc/service/SofaRpcDemoServiceImpl.java index 9fed6ddc3d..14dbb30c8a 100644 --- a/test/plugin/scenarios/sofarpc-scenario/src/main/java/org/apache/skywalking/apm/testcase/sofarpc/service/SofaRpcDemoServiceImpl.java +++ b/test/plugin/scenarios/sofarpc-scenario/src/main/java/org/apache/skywalking/apm/testcase/sofarpc/service/SofaRpcDemoServiceImpl.java @@ -25,4 +25,14 @@ public class SofaRpcDemoServiceImpl implements SofaRpcDemoService { public String hello(String name) { return "hello, " + name; } + + @Override + public String onAppResponse(String name) { + return "hello, " + name; + } + + @Override + public String callback(String name) { + return "hello, " + name; + } } From 0840582e61a8ed7152accfb132a98f07b19d4a26 Mon Sep 17 00:00:00 2001 From: Daniel Zhang Date: Thu, 21 Mar 2024 17:40:33 +0800 Subject: [PATCH 04/10] reformat code --- .../apm/plugin/sofarpc/SofaBoltCallbackInterceptorTest.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/apm-sniffer/apm-sdk-plugin/sofarpc-plugin/src/test/java/org/apache/skywalking/apm/plugin/sofarpc/SofaBoltCallbackInterceptorTest.java b/apm-sniffer/apm-sdk-plugin/sofarpc-plugin/src/test/java/org/apache/skywalking/apm/plugin/sofarpc/SofaBoltCallbackInterceptorTest.java index 071c6135fa..895c4030ce 100644 --- a/apm-sniffer/apm-sdk-plugin/sofarpc-plugin/src/test/java/org/apache/skywalking/apm/plugin/sofarpc/SofaBoltCallbackInterceptorTest.java +++ b/apm-sniffer/apm-sdk-plugin/sofarpc-plugin/src/test/java/org/apache/skywalking/apm/plugin/sofarpc/SofaBoltCallbackInterceptorTest.java @@ -175,7 +175,7 @@ public void testResponseIntegrally() throws InterruptedException { assertEquals(2, spans2.size()); assertEquals("onResponse", spans2.get(0).getOperationName()); - assertEquals(traceSegment1.getRelatedGlobalTrace().getId(),traceSegment2.getRef().getTraceId()); + assertEquals(traceSegment1.getRelatedGlobalTrace().getId(), traceSegment2.getRef().getTraceId()); } } From 40d7b27f7561cb2345b74aa7f5c900e3c8f58592 Mon Sep 17 00:00:00 2001 From: Daniel Zhang Date: Thu, 21 Mar 2024 22:32:30 +0800 Subject: [PATCH 05/10] Simplify sofarpc-scenario test case --- .../sofarpc-scenario/config/expectedData.yaml | 30 ------------------- .../sofarpc/callback/TestCallback.java | 1 - .../interfaces/SofaRpcDemoService.java | 2 -- .../service/SofaRpcDemoServiceImpl.java | 5 ---- 4 files changed, 38 deletions(-) diff --git a/test/plugin/scenarios/sofarpc-scenario/config/expectedData.yaml b/test/plugin/scenarios/sofarpc-scenario/config/expectedData.yaml index 2a9155e765..c6655d281b 100644 --- a/test/plugin/scenarios/sofarpc-scenario/config/expectedData.yaml +++ b/test/plugin/scenarios/sofarpc-scenario/config/expectedData.yaml @@ -17,23 +17,6 @@ segmentItems: - serviceName: sofarpc-scenario segmentSize: gt 2 segments: - - segmentId: not null - spans: - - operationName: HEAD:/sofarpc-scenario/case/healthCheck - parentSpanId: -1 - spanId: 0 - spanLayer: Http - startTime: nq 0 - endTime: nq 0 - componentId: 1 - isError: false - spanType: Entry - peer: '' - skipAnalysis: false - tags: - - {key: url, value: 'http://localhost:8080/sofarpc-scenario/case/healthCheck'} - - {key: http.method, value: HEAD} - - {key: http.status_code, value: '200'} - segmentId: not null spans: - operationName: org.apache.skywalking.apm.testcase.sofarpc.interfaces.SofaRpcDemoService.hello(java.lang.String) @@ -53,19 +36,6 @@ segmentItems: skipAnalysis: 'false' - segmentId: not null spans: - - operationName: org.apache.skywalking.apm.testcase.sofarpc.interfaces.SofaRpcDemoService.onAppResponse(java.lang.String) - parentSpanId: 0 - spanId: 1 - spanLayer: RPCFramework - startTime: nq 0 - endTime: nq 0 - componentId: 43 - isError: false - spanType: Exit - peer: 127.0.0.1:12200 - skipAnalysis: false - tags: - - {key: url, value: 'bolt://127.0.0.1:12200/org.apache.skywalking.apm.testcase.sofarpc.interfaces.SofaRpcDemoService.onAppResponse(java.lang.String)'} - operationName: Thread/com.alipay.sofa.rpc.message.bolt.BoltInvokerCallback/onResponse parentSpanId: -1 spanId: 0 diff --git a/test/plugin/scenarios/sofarpc-scenario/src/main/java/org/apache/skywalking/apm/testcase/sofarpc/callback/TestCallback.java b/test/plugin/scenarios/sofarpc-scenario/src/main/java/org/apache/skywalking/apm/testcase/sofarpc/callback/TestCallback.java index 34fc621e47..81da5ba588 100644 --- a/test/plugin/scenarios/sofarpc-scenario/src/main/java/org/apache/skywalking/apm/testcase/sofarpc/callback/TestCallback.java +++ b/test/plugin/scenarios/sofarpc-scenario/src/main/java/org/apache/skywalking/apm/testcase/sofarpc/callback/TestCallback.java @@ -33,7 +33,6 @@ public TestCallback(final SofaRpcDemoService service) { @Override public void onAppResponse(final Object o, final String s, final RequestBase requestBase) { - service.onAppResponse("hello"); } @Override diff --git a/test/plugin/scenarios/sofarpc-scenario/src/main/java/org/apache/skywalking/apm/testcase/sofarpc/interfaces/SofaRpcDemoService.java b/test/plugin/scenarios/sofarpc-scenario/src/main/java/org/apache/skywalking/apm/testcase/sofarpc/interfaces/SofaRpcDemoService.java index 2c2ef3b605..f2d0f93e9e 100644 --- a/test/plugin/scenarios/sofarpc-scenario/src/main/java/org/apache/skywalking/apm/testcase/sofarpc/interfaces/SofaRpcDemoService.java +++ b/test/plugin/scenarios/sofarpc-scenario/src/main/java/org/apache/skywalking/apm/testcase/sofarpc/interfaces/SofaRpcDemoService.java @@ -22,7 +22,5 @@ public interface SofaRpcDemoService { String hello(String name); - String onAppResponse(String name); - String callback(String name); } diff --git a/test/plugin/scenarios/sofarpc-scenario/src/main/java/org/apache/skywalking/apm/testcase/sofarpc/service/SofaRpcDemoServiceImpl.java b/test/plugin/scenarios/sofarpc-scenario/src/main/java/org/apache/skywalking/apm/testcase/sofarpc/service/SofaRpcDemoServiceImpl.java index 14dbb30c8a..b946932117 100644 --- a/test/plugin/scenarios/sofarpc-scenario/src/main/java/org/apache/skywalking/apm/testcase/sofarpc/service/SofaRpcDemoServiceImpl.java +++ b/test/plugin/scenarios/sofarpc-scenario/src/main/java/org/apache/skywalking/apm/testcase/sofarpc/service/SofaRpcDemoServiceImpl.java @@ -26,11 +26,6 @@ public String hello(String name) { return "hello, " + name; } - @Override - public String onAppResponse(String name) { - return "hello, " + name; - } - @Override public String callback(String name) { return "hello, " + name; From 8680f820505703a24a5c08fea5b3230719dd574e Mon Sep 17 00:00:00 2001 From: Daniel Zhang Date: Sun, 24 Mar 2024 17:28:43 +0800 Subject: [PATCH 06/10] Add callback wrapper to ensure contextSnapshot is one time used. --- .../plugin/sofarpc/InvokeCallbackWrapper.java | 93 +++++++++ .../SofaBoltCallbackConstructInterceptor.java | 32 ---- .../SofaBoltCallbackExceptionInterceptor.java | 69 ------- ...a => SofaBoltCallbackInstrumentation.java} | 54 ++---- .../SofaBoltCallbackInvokeInterceptor.java | 13 +- .../src/main/resources/skywalking-plugin.def | 2 +- .../sofarpc/InvokeCallbackWrapperTest.java | 169 ++++++++++++++++ .../SofaBoltCallbackInterceptorTest.java | 181 ------------------ ...SofaBoltCallbackInvokeInterceptorTest.java | 79 ++++++++ .../SofaRpcConsumerInterceptorTest.java | 40 ++-- .../SofaRpcProviderInterceptorTest.java | 19 +- 11 files changed, 390 insertions(+), 361 deletions(-) create mode 100644 apm-sniffer/apm-sdk-plugin/sofarpc-plugin/src/main/java/org/apache/skywalking/apm/plugin/sofarpc/InvokeCallbackWrapper.java delete mode 100644 apm-sniffer/apm-sdk-plugin/sofarpc-plugin/src/main/java/org/apache/skywalking/apm/plugin/sofarpc/SofaBoltCallbackConstructInterceptor.java delete mode 100644 apm-sniffer/apm-sdk-plugin/sofarpc-plugin/src/main/java/org/apache/skywalking/apm/plugin/sofarpc/SofaBoltCallbackExceptionInterceptor.java rename apm-sniffer/apm-sdk-plugin/sofarpc-plugin/src/main/java/org/apache/skywalking/apm/plugin/sofarpc/{SofaBoltCallbackActivation.java => SofaBoltCallbackInstrumentation.java} (52%) create mode 100644 apm-sniffer/apm-sdk-plugin/sofarpc-plugin/src/test/java/org/apache/skywalking/apm/plugin/sofarpc/InvokeCallbackWrapperTest.java delete mode 100644 apm-sniffer/apm-sdk-plugin/sofarpc-plugin/src/test/java/org/apache/skywalking/apm/plugin/sofarpc/SofaBoltCallbackInterceptorTest.java create mode 100644 apm-sniffer/apm-sdk-plugin/sofarpc-plugin/src/test/java/org/apache/skywalking/apm/plugin/sofarpc/SofaBoltCallbackInvokeInterceptorTest.java diff --git a/apm-sniffer/apm-sdk-plugin/sofarpc-plugin/src/main/java/org/apache/skywalking/apm/plugin/sofarpc/InvokeCallbackWrapper.java b/apm-sniffer/apm-sdk-plugin/sofarpc-plugin/src/main/java/org/apache/skywalking/apm/plugin/sofarpc/InvokeCallbackWrapper.java new file mode 100644 index 0000000000..d34b91d5f7 --- /dev/null +++ b/apm-sniffer/apm-sdk-plugin/sofarpc-plugin/src/main/java/org/apache/skywalking/apm/plugin/sofarpc/InvokeCallbackWrapper.java @@ -0,0 +1,93 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package org.apache.skywalking.apm.plugin.sofarpc; + +import com.alipay.remoting.InvokeCallback; +import java.util.concurrent.Executor; +import org.apache.skywalking.apm.agent.core.context.ContextManager; +import org.apache.skywalking.apm.agent.core.context.ContextSnapshot; +import org.apache.skywalking.apm.agent.core.context.trace.AbstractSpan; + +public class InvokeCallbackWrapper implements InvokeCallback { + + private ContextSnapshot contextSnapshot; + private final InvokeCallback invokeCallback; + + public InvokeCallbackWrapper(InvokeCallback invokeCallback) { + if (ContextManager.isActive()) { + this.contextSnapshot = ContextManager.capture(); + } + this.invokeCallback = invokeCallback; + + } + + @Override + public void onResponse(final Object o) { + ContextManager.createLocalSpan("Thread/" + invokeCallback.getClass().getName() + "/onResponse"); + if (contextSnapshot != null) { + ContextManager.continued(contextSnapshot); + } + try { + invokeCallback.onResponse(o); + } catch (Throwable t) { + ContextManager.activeSpan().log(t); + throw t; + } finally { + ContextManager.stopSpan(); + contextSnapshot = null; + } + + } + + @Override + public void onException(final Throwable throwable) { + ContextManager.createLocalSpan("Thread/" + invokeCallback.getClass().getName() + "/onException"); + if (contextSnapshot != null) { + ContextManager.continued(contextSnapshot); + } + if (throwable != null) { + AbstractSpan abstractSpan = ContextManager.activeSpan(); + if (abstractSpan != null) { + abstractSpan.log(throwable); + } + } + try { + invokeCallback.onException(throwable); + } catch (Throwable t) { + ContextManager.activeSpan().log(t); + throw t; + } finally { + ContextManager.stopSpan(); + contextSnapshot = null; + } + } + + @Override + public Executor getExecutor() { + return invokeCallback.getExecutor(); + } + + protected ContextSnapshot getContextSnapshot() { + return contextSnapshot; + } + + protected InvokeCallback getInvokeCallback() { + return invokeCallback; + } +} diff --git a/apm-sniffer/apm-sdk-plugin/sofarpc-plugin/src/main/java/org/apache/skywalking/apm/plugin/sofarpc/SofaBoltCallbackConstructInterceptor.java b/apm-sniffer/apm-sdk-plugin/sofarpc-plugin/src/main/java/org/apache/skywalking/apm/plugin/sofarpc/SofaBoltCallbackConstructInterceptor.java deleted file mode 100644 index 4ddb579d53..0000000000 --- a/apm-sniffer/apm-sdk-plugin/sofarpc-plugin/src/main/java/org/apache/skywalking/apm/plugin/sofarpc/SofaBoltCallbackConstructInterceptor.java +++ /dev/null @@ -1,32 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - */ - -package org.apache.skywalking.apm.plugin.sofarpc; - -import org.apache.skywalking.apm.agent.core.context.ContextManager; -import org.apache.skywalking.apm.agent.core.plugin.interceptor.enhance.EnhancedInstance; -import org.apache.skywalking.apm.agent.core.plugin.interceptor.enhance.InstanceConstructorInterceptor; - -public class SofaBoltCallbackConstructInterceptor implements InstanceConstructorInterceptor { - @Override - public void onConstruct(EnhancedInstance objInst, Object[] allArguments) { - if (ContextManager.isActive()) { - objInst.setSkyWalkingDynamicField(ContextManager.capture()); - } - } -} diff --git a/apm-sniffer/apm-sdk-plugin/sofarpc-plugin/src/main/java/org/apache/skywalking/apm/plugin/sofarpc/SofaBoltCallbackExceptionInterceptor.java b/apm-sniffer/apm-sdk-plugin/sofarpc-plugin/src/main/java/org/apache/skywalking/apm/plugin/sofarpc/SofaBoltCallbackExceptionInterceptor.java deleted file mode 100644 index 0727ab9896..0000000000 --- a/apm-sniffer/apm-sdk-plugin/sofarpc-plugin/src/main/java/org/apache/skywalking/apm/plugin/sofarpc/SofaBoltCallbackExceptionInterceptor.java +++ /dev/null @@ -1,69 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - */ - -package org.apache.skywalking.apm.plugin.sofarpc; - -import java.lang.reflect.Method; -import org.apache.skywalking.apm.agent.core.context.ContextManager; -import org.apache.skywalking.apm.agent.core.context.ContextSnapshot; -import org.apache.skywalking.apm.agent.core.context.trace.AbstractSpan; -import org.apache.skywalking.apm.agent.core.plugin.interceptor.enhance.EnhancedInstance; -import org.apache.skywalking.apm.agent.core.plugin.interceptor.enhance.InstanceMethodsAroundInterceptor; -import org.apache.skywalking.apm.agent.core.plugin.interceptor.enhance.MethodInterceptResult; - -public class SofaBoltCallbackExceptionInterceptor implements InstanceMethodsAroundInterceptor { - @Override - public void beforeMethod(EnhancedInstance objInst, - Method method, - Object[] allArguments, - Class[] argumentsTypes, - MethodInterceptResult result) throws Throwable { - ContextManager.createLocalSpan("Thread/" + objInst.getClass().getName() + "/" + method.getName()); - ContextSnapshot cachedObjects = (ContextSnapshot) objInst.getSkyWalkingDynamicField(); - if (cachedObjects != null) { - ContextManager.continued(cachedObjects); - } - if (allArguments[0] instanceof Throwable) { - AbstractSpan abstractSpan = ContextManager.activeSpan(); - if (abstractSpan != null) { - abstractSpan.log((Throwable) allArguments[0]); - } - } - } - - @Override - public Object afterMethod(EnhancedInstance objInst, - Method method, - Object[] allArguments, - Class[] argumentsTypes, - Object ret) throws Throwable { - ContextManager.stopSpan(); - // clear ContextSnapshot - objInst.setSkyWalkingDynamicField(null); - return ret; - } - - @Override - public void handleMethodException(EnhancedInstance objInst, - Method method, - Object[] allArguments, - Class[] argumentsTypes, - Throwable t) { - ContextManager.activeSpan().log(t); - } -} diff --git a/apm-sniffer/apm-sdk-plugin/sofarpc-plugin/src/main/java/org/apache/skywalking/apm/plugin/sofarpc/SofaBoltCallbackActivation.java b/apm-sniffer/apm-sdk-plugin/sofarpc-plugin/src/main/java/org/apache/skywalking/apm/plugin/sofarpc/SofaBoltCallbackInstrumentation.java similarity index 52% rename from apm-sniffer/apm-sdk-plugin/sofarpc-plugin/src/main/java/org/apache/skywalking/apm/plugin/sofarpc/SofaBoltCallbackActivation.java rename to apm-sniffer/apm-sdk-plugin/sofarpc-plugin/src/main/java/org/apache/skywalking/apm/plugin/sofarpc/SofaBoltCallbackInstrumentation.java index 983aae8529..6547042bc6 100644 --- a/apm-sniffer/apm-sdk-plugin/sofarpc-plugin/src/main/java/org/apache/skywalking/apm/plugin/sofarpc/SofaBoltCallbackActivation.java +++ b/apm-sniffer/apm-sdk-plugin/sofarpc-plugin/src/main/java/org/apache/skywalking/apm/plugin/sofarpc/SofaBoltCallbackInstrumentation.java @@ -24,42 +24,25 @@ import org.apache.skywalking.apm.agent.core.plugin.interceptor.InstanceMethodsInterceptPoint; import org.apache.skywalking.apm.agent.core.plugin.interceptor.enhance.ClassInstanceMethodsEnhancePluginDefine; import org.apache.skywalking.apm.agent.core.plugin.match.ClassMatch; -import org.apache.skywalking.apm.agent.core.plugin.match.HierarchyMatch; +import org.apache.skywalking.apm.agent.core.plugin.match.NameMatch; -import static net.bytebuddy.matcher.ElementMatchers.any; import static net.bytebuddy.matcher.ElementMatchers.named; import static net.bytebuddy.matcher.ElementMatchers.takesArguments; -public class SofaBoltCallbackActivation extends ClassInstanceMethodsEnhancePluginDefine { +public class SofaBoltCallbackInstrumentation extends ClassInstanceMethodsEnhancePluginDefine { - private static final String ENHANCE_CLASS = "com.alipay.remoting.InvokeCallback"; - private static final String INIT_METHOD_INTERCEPTOR = "org.apache.skywalking.apm.plugin.sofarpc.SofaBoltCallbackConstructInterceptor"; - private static final String CALL_METHOD_INTERCEPTOR = "org.apache.skywalking.apm.plugin.sofarpc.SofaBoltCallbackInvokeInterceptor"; - private static final String EXCEPTION_METHOD_INTERCEPTOR = "org.apache.skywalking.apm.plugin.sofarpc.SofaBoltCallbackExceptionInterceptor"; - private static final String RESPONSE_METHOD_NAME = "onResponse"; - private static final String EXCEPTION_METHOD_NAME = "onException"; + private static final String ENHANCE_CLASS = "com.alipay.remoting.BaseRemoting"; + private static final String INVOKE_METHOD_INTERCEPTOR = "org.apache.skywalking.apm.plugin.sofarpc.SofaBoltCallbackInvokeInterceptor"; + private static final String INVOKE_METHOD = "invokeWithCallback"; @Override protected ClassMatch enhanceClass() { - return HierarchyMatch.byHierarchyMatch(ENHANCE_CLASS); + return NameMatch.byName(ENHANCE_CLASS); } @Override public ConstructorInterceptPoint[] getConstructorsInterceptPoints() { - return new ConstructorInterceptPoint[] { - new ConstructorInterceptPoint() { - @Override - public ElementMatcher getConstructorMatcher() { - return any(); - } - - @Override - public String getConstructorInterceptor() { - return INIT_METHOD_INTERCEPTOR; - } - } - }; - + return null; } @Override @@ -68,33 +51,18 @@ public InstanceMethodsInterceptPoint[] getInstanceMethodsInterceptPoints() { new InstanceMethodsInterceptPoint() { @Override public ElementMatcher getMethodsMatcher() { - return named(RESPONSE_METHOD_NAME).and(takesArguments(1)); - } - - @Override - public String getMethodsInterceptor() { - return CALL_METHOD_INTERCEPTOR; - } - - @Override - public boolean isOverrideArgs() { - return false; - } - }, - new InstanceMethodsInterceptPoint() { - @Override - public ElementMatcher getMethodsMatcher() { - return named(EXCEPTION_METHOD_NAME).and(takesArguments(1)); + return named(INVOKE_METHOD).and( + takesArguments(4)); } @Override public String getMethodsInterceptor() { - return EXCEPTION_METHOD_INTERCEPTOR; + return INVOKE_METHOD_INTERCEPTOR; } @Override public boolean isOverrideArgs() { - return false; + return true; } } }; diff --git a/apm-sniffer/apm-sdk-plugin/sofarpc-plugin/src/main/java/org/apache/skywalking/apm/plugin/sofarpc/SofaBoltCallbackInvokeInterceptor.java b/apm-sniffer/apm-sdk-plugin/sofarpc-plugin/src/main/java/org/apache/skywalking/apm/plugin/sofarpc/SofaBoltCallbackInvokeInterceptor.java index b3987e539d..c890b4a90d 100644 --- a/apm-sniffer/apm-sdk-plugin/sofarpc-plugin/src/main/java/org/apache/skywalking/apm/plugin/sofarpc/SofaBoltCallbackInvokeInterceptor.java +++ b/apm-sniffer/apm-sdk-plugin/sofarpc-plugin/src/main/java/org/apache/skywalking/apm/plugin/sofarpc/SofaBoltCallbackInvokeInterceptor.java @@ -18,9 +18,8 @@ package org.apache.skywalking.apm.plugin.sofarpc; +import com.alipay.remoting.InvokeCallback; import java.lang.reflect.Method; -import org.apache.skywalking.apm.agent.core.context.ContextManager; -import org.apache.skywalking.apm.agent.core.context.ContextSnapshot; import org.apache.skywalking.apm.agent.core.plugin.interceptor.enhance.EnhancedInstance; import org.apache.skywalking.apm.agent.core.plugin.interceptor.enhance.InstanceMethodsAroundInterceptor; import org.apache.skywalking.apm.agent.core.plugin.interceptor.enhance.MethodInterceptResult; @@ -32,10 +31,8 @@ public void beforeMethod(EnhancedInstance objInst, Object[] allArguments, Class[] argumentsTypes, MethodInterceptResult result) { - ContextManager.createLocalSpan("Thread/" + objInst.getClass().getName() + "/" + method.getName()); - ContextSnapshot cachedObjects = (ContextSnapshot) objInst.getSkyWalkingDynamicField(); - if (cachedObjects != null) { - ContextManager.continued(cachedObjects); + if (allArguments[2] instanceof InvokeCallback) { + allArguments[2] = new InvokeCallbackWrapper((InvokeCallback) allArguments[2]); } } @@ -45,9 +42,6 @@ public Object afterMethod(EnhancedInstance objInst, Object[] allArguments, Class[] argumentsTypes, Object ret) { - ContextManager.stopSpan(); - // clear ContextSnapshot - objInst.setSkyWalkingDynamicField(null); return ret; } @@ -57,6 +51,5 @@ public void handleMethodException(EnhancedInstance objInst, Object[] allArguments, Class[] argumentsTypes, Throwable t) { - ContextManager.activeSpan().log(t); } } diff --git a/apm-sniffer/apm-sdk-plugin/sofarpc-plugin/src/main/resources/skywalking-plugin.def b/apm-sniffer/apm-sdk-plugin/sofarpc-plugin/src/main/resources/skywalking-plugin.def index 4113eb11f0..72682ac2bc 100644 --- a/apm-sniffer/apm-sdk-plugin/sofarpc-plugin/src/main/resources/skywalking-plugin.def +++ b/apm-sniffer/apm-sdk-plugin/sofarpc-plugin/src/main/resources/skywalking-plugin.def @@ -16,4 +16,4 @@ sofarpc=org.apache.skywalking.apm.plugin.sofarpc.SofaRpcConsumerInstrumentation sofarpc=org.apache.skywalking.apm.plugin.sofarpc.SofaRpcProviderInstrumentation -sofarpc=org.apache.skywalking.apm.plugin.sofarpc.SofaBoltCallbackActivation +sofarpc=org.apache.skywalking.apm.plugin.sofarpc.SofaBoltCallbackInstrumentation diff --git a/apm-sniffer/apm-sdk-plugin/sofarpc-plugin/src/test/java/org/apache/skywalking/apm/plugin/sofarpc/InvokeCallbackWrapperTest.java b/apm-sniffer/apm-sdk-plugin/sofarpc-plugin/src/test/java/org/apache/skywalking/apm/plugin/sofarpc/InvokeCallbackWrapperTest.java new file mode 100644 index 0000000000..ef9c9f3cb0 --- /dev/null +++ b/apm-sniffer/apm-sdk-plugin/sofarpc-plugin/src/test/java/org/apache/skywalking/apm/plugin/sofarpc/InvokeCallbackWrapperTest.java @@ -0,0 +1,169 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package org.apache.skywalking.apm.plugin.sofarpc; + +import com.alipay.remoting.InvokeCallback; +import java.util.List; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.Executor; +import java.util.concurrent.Executors; +import org.apache.skywalking.apm.agent.core.context.ContextManager; +import org.apache.skywalking.apm.agent.core.context.trace.AbstractTracingSpan; +import org.apache.skywalking.apm.agent.core.context.trace.TraceSegment; +import org.apache.skywalking.apm.agent.test.helper.SegmentHelper; +import org.apache.skywalking.apm.agent.test.helper.SpanHelper; +import org.apache.skywalking.apm.agent.test.tools.AgentServiceRule; +import org.apache.skywalking.apm.agent.test.tools.SegmentStorage; +import org.apache.skywalking.apm.agent.test.tools.SegmentStoragePoint; +import org.apache.skywalking.apm.agent.test.tools.TracingSegmentRunner; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Rule; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.mockito.junit.MockitoJUnit; +import org.mockito.junit.MockitoRule; + +import static org.hamcrest.CoreMatchers.is; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertThat; + +@RunWith(TracingSegmentRunner.class) +public class InvokeCallbackWrapperTest { + + @SegmentStoragePoint + private SegmentStorage segmentStorage; + + private Executor executor = Executors.newFixedThreadPool(1); + + @Rule + public AgentServiceRule agentServiceRule = new AgentServiceRule(); + @Rule + public MockitoRule rule = MockitoJUnit.rule(); + + private InvokeCallback callback; + + @Before + public void before() { + callback = new InvokeCallback() { + @Override + public void onResponse(final Object o) { + } + + @Override + public void onException(final Throwable throwable) { + } + + @Override + public Executor getExecutor() { + return null; + } + }; + } + + static class WrapperWrapper implements InvokeCallback { + + private InvokeCallback callback; + + private CountDownLatch countDownLatch; + + public CountDownLatch getCountDownLatch() { + return countDownLatch; + } + + public WrapperWrapper(InvokeCallback callback) { + this.countDownLatch = new CountDownLatch(1); + this.callback = callback; + } + + @Override + public void onResponse(final Object o) { + callback.onResponse(o); + countDownLatch.countDown(); + } + + @Override + public void onException(final Throwable throwable) { + callback.onException(throwable); + countDownLatch.countDown(); + } + + @Override + public Executor getExecutor() { + return null; + } + } + + @Test + public void testConstruct() { + InvokeCallbackWrapper wrapper = new InvokeCallbackWrapper(callback); + Assert.assertSame(callback, wrapper.getInvokeCallback()); + Assert.assertNull(wrapper.getContextSnapshot()); + + ContextManager.createEntrySpan("sofarpc", null); + wrapper = new InvokeCallbackWrapper(callback); + Assert.assertSame(callback, wrapper.getInvokeCallback()); + Assert.assertEquals(ContextManager.getGlobalTraceId(), wrapper.getContextSnapshot().getTraceId().getId()); + Assert.assertEquals("sofarpc", wrapper.getContextSnapshot().getParentEndpoint()); + ContextManager.stopSpan(); + } + + @Test + public void testOnResponse() throws InterruptedException { + ContextManager.createEntrySpan("sofarpc", null); + InvokeCallbackWrapper wrapper = new InvokeCallbackWrapper(callback); + final WrapperWrapper wrapperWrapper = new WrapperWrapper(wrapper); + executor.execute(() -> wrapperWrapper.onResponse(null)); + ContextManager.stopSpan(); + wrapperWrapper.getCountDownLatch().await(); + + assertThat(segmentStorage.getTraceSegments().size(), is(2)); + TraceSegment traceSegment = segmentStorage.getTraceSegments().get(0); + List spans = SegmentHelper.getSpans(traceSegment); + assertThat(spans.size(), is(1)); + + TraceSegment traceSegment2 = segmentStorage.getTraceSegments().get(1); + List spans2 = SegmentHelper.getSpans(traceSegment2); + assertThat(spans2.size(), is(1)); + assertEquals("sofarpc", traceSegment2.getRef().getParentEndpoint()); + } + + @Test + public void testOnException() throws InterruptedException { + ContextManager.createEntrySpan("sofarpc", null); + InvokeCallbackWrapper wrapper = new InvokeCallbackWrapper(callback); + final WrapperWrapper wrapperWrapper = new WrapperWrapper(wrapper); + final Throwable throwable = new Throwable(); + executor.execute(() -> wrapperWrapper.onException(throwable)); + ContextManager.stopSpan(); + wrapperWrapper.getCountDownLatch().await(); + + assertThat(segmentStorage.getTraceSegments().size(), is(2)); + TraceSegment traceSegment = segmentStorage.getTraceSegments().get(0); + List spans = SegmentHelper.getSpans(traceSegment); + assertThat(spans.size(), is(1)); + + TraceSegment traceSegment2 = segmentStorage.getTraceSegments().get(1); + List spans2 = SegmentHelper.getSpans(traceSegment2); + assertThat(spans2.size(), is(1)); + assertThat(SpanHelper.getLogs(spans2.get(0)).size(), is(1)); + + } + +} \ No newline at end of file diff --git a/apm-sniffer/apm-sdk-plugin/sofarpc-plugin/src/test/java/org/apache/skywalking/apm/plugin/sofarpc/SofaBoltCallbackInterceptorTest.java b/apm-sniffer/apm-sdk-plugin/sofarpc-plugin/src/test/java/org/apache/skywalking/apm/plugin/sofarpc/SofaBoltCallbackInterceptorTest.java deleted file mode 100644 index 895c4030ce..0000000000 --- a/apm-sniffer/apm-sdk-plugin/sofarpc-plugin/src/test/java/org/apache/skywalking/apm/plugin/sofarpc/SofaBoltCallbackInterceptorTest.java +++ /dev/null @@ -1,181 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - */ - -package org.apache.skywalking.apm.plugin.sofarpc; - -import com.alipay.remoting.InvokeCallback; -import java.lang.reflect.Method; -import java.util.List; -import java.util.concurrent.CountDownLatch; -import java.util.concurrent.Executor; -import java.util.concurrent.Executors; -import java.util.concurrent.TimeUnit; -import org.apache.skywalking.apm.agent.core.context.ContextManager; -import org.apache.skywalking.apm.agent.core.context.ContextSnapshot; -import org.apache.skywalking.apm.agent.core.context.trace.AbstractSpan; -import org.apache.skywalking.apm.agent.core.context.trace.AbstractTracingSpan; -import org.apache.skywalking.apm.agent.core.context.trace.TraceSegment; -import org.apache.skywalking.apm.agent.core.plugin.interceptor.enhance.EnhancedInstance; -import org.apache.skywalking.apm.agent.test.helper.SegmentHelper; -import org.apache.skywalking.apm.agent.test.tools.AgentServiceRule; -import org.apache.skywalking.apm.agent.test.tools.SegmentStorage; -import org.apache.skywalking.apm.agent.test.tools.SegmentStoragePoint; -import org.apache.skywalking.apm.agent.test.tools.TracingSegmentRunner; -import org.junit.Assert; -import org.junit.Before; -import org.junit.Rule; -import org.junit.Test; -import org.junit.runner.RunWith; -import org.mockito.Mock; -import org.mockito.junit.MockitoJUnit; -import org.mockito.junit.MockitoRule; - -import static org.hamcrest.CoreMatchers.is; -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertThat; - -@RunWith(TracingSegmentRunner.class) -public class SofaBoltCallbackInterceptorTest { - - @SegmentStoragePoint - private SegmentStorage segmentStorage; - - @Rule - public AgentServiceRule agentServiceRule = new AgentServiceRule(); - @Rule - public MockitoRule rule = MockitoJUnit.rule(); - - private EnhancedInstance enhancedInstance = new EnhancedInstance() { - - private Object object; - - @Override - public Object getSkyWalkingDynamicField() { - return object; - } - - @Override - public void setSkyWalkingDynamicField(Object value) { - this.object = value; - } - }; - - @Mock - private ContextSnapshot contextSnapshot; - private final Executor executor = Executors.newFixedThreadPool(1); - - private SofaBoltCallbackConstructInterceptor constructInterceptor; - private SofaBoltCallbackExceptionInterceptor exceptionInterceptor; - private SofaBoltCallbackInvokeInterceptor invokeInterceptor; - - private Object[] arguments; - private Object[] throwableArgs; - - private Method responseMethod; - private Method exceptionMethod; - private InvokeCallback callback; - - @Before - public void before() throws NoSuchMethodException { - constructInterceptor = new SofaBoltCallbackConstructInterceptor(); - exceptionInterceptor = new SofaBoltCallbackExceptionInterceptor(); - invokeInterceptor = new SofaBoltCallbackInvokeInterceptor(); - callback = new InvokeCallback() { - - @Override - public void onResponse(Object o) { - ContextManager.createLocalSpan("onResponse"); - ContextManager.stopSpan(); - } - - @Override - public void onException(Throwable throwable) { - - } - - @Override - public Executor getExecutor() { - return null; - } - }; - - responseMethod = callback.getClass().getMethod("onResponse", Object.class); - exceptionMethod = callback.getClass().getMethod("onException", Throwable.class); - arguments = new Object[] {new Object()}; - throwableArgs = new Object[] {new NullPointerException()}; - } - - @Test - public void testOnConstructor() { - constructInterceptor.onConstruct(enhancedInstance, null); - Assert.assertNull(enhancedInstance.getSkyWalkingDynamicField()); - } - - @Test - public void testResponse() throws Throwable { - enhancedInstance.setSkyWalkingDynamicField(contextSnapshot); - invokeInterceptor.beforeMethod(enhancedInstance, responseMethod, arguments, null, null); - invokeInterceptor.afterMethod(enhancedInstance, responseMethod, arguments, null, null); - - assertThat(segmentStorage.getTraceSegments().size(), is(1)); - TraceSegment traceSegment = segmentStorage.getTraceSegments().get(0); - List spans = SegmentHelper.getSpans(traceSegment); - assertThat(spans.size(), is(1)); - } - - @Test - public void testException() throws Throwable { - enhancedInstance.setSkyWalkingDynamicField(contextSnapshot); - exceptionInterceptor.beforeMethod(enhancedInstance, exceptionMethod, throwableArgs, null, null); - exceptionInterceptor.afterMethod(enhancedInstance, exceptionMethod, throwableArgs, null, null); - - assertThat(segmentStorage.getTraceSegments().size(), is(1)); - TraceSegment traceSegment = segmentStorage.getTraceSegments().get(0); - List spans = SegmentHelper.getSpans(traceSegment); - assertThat(spans.size(), is(1)); - } - - @Test - public void testResponseIntegrally() throws InterruptedException { - AbstractSpan testBegin = ContextManager.createLocalSpan("TestBegin"); - constructInterceptor.onConstruct(enhancedInstance, null); - CountDownLatch countDownLatch = new CountDownLatch(1); - ContextManager.stopSpan(testBegin); - - executor.execute(() -> { - invokeInterceptor.beforeMethod(enhancedInstance, responseMethod, arguments, null, null); - callback.onResponse(new Object()); - invokeInterceptor.afterMethod(enhancedInstance, responseMethod, arguments, null, null); - countDownLatch.countDown(); - }); - - countDownLatch.await(1000, TimeUnit.MILLISECONDS); - - assertEquals(2, segmentStorage.getTraceSegments().size()); - TraceSegment traceSegment1 = segmentStorage.getTraceSegments().get(0); - List spans1 = SegmentHelper.getSpans(traceSegment1); - assertEquals(1, spans1.size()); - TraceSegment traceSegment2 = segmentStorage.getTraceSegments().get(1); - List spans2 = SegmentHelper.getSpans(traceSegment2); - assertEquals(2, spans2.size()); - assertEquals("onResponse", spans2.get(0).getOperationName()); - - assertEquals(traceSegment1.getRelatedGlobalTrace().getId(), traceSegment2.getRef().getTraceId()); - } - -} diff --git a/apm-sniffer/apm-sdk-plugin/sofarpc-plugin/src/test/java/org/apache/skywalking/apm/plugin/sofarpc/SofaBoltCallbackInvokeInterceptorTest.java b/apm-sniffer/apm-sdk-plugin/sofarpc-plugin/src/test/java/org/apache/skywalking/apm/plugin/sofarpc/SofaBoltCallbackInvokeInterceptorTest.java new file mode 100644 index 0000000000..bc082d7d2b --- /dev/null +++ b/apm-sniffer/apm-sdk-plugin/sofarpc-plugin/src/test/java/org/apache/skywalking/apm/plugin/sofarpc/SofaBoltCallbackInvokeInterceptorTest.java @@ -0,0 +1,79 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package org.apache.skywalking.apm.plugin.sofarpc; + +import com.alipay.remoting.InvokeCallback; +import java.util.concurrent.Executor; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; + +public class SofaBoltCallbackInvokeInterceptorTest { + InvokeCallback callback; + Object obj; + Object[] matchArgs; + Object[] mismatchArgs; + + @Before + public void before() { + callback = new InvokeCallback() { + @Override + public void onResponse(final Object o) { + + } + + @Override + public void onException(final Throwable throwable) { + + } + + @Override + public Executor getExecutor() { + return null; + } + }; + + obj = new Object(); + + matchArgs = new Object[] { + null, + null, + callback, + null + }; + mismatchArgs = new Object[] { + null, + null, + obj, + null + }; + } + + @Test + public void testOverrideArguments() { + final SofaBoltCallbackInvokeInterceptor interceptor = new SofaBoltCallbackInvokeInterceptor(); + interceptor.beforeMethod(null, null, matchArgs, null, null); + Assert.assertTrue(matchArgs[2] instanceof InvokeCallbackWrapper); + Assert.assertSame(callback, ((InvokeCallbackWrapper) matchArgs[2]).getInvokeCallback()); + + interceptor.beforeMethod(null, null, mismatchArgs, null, null); + Assert.assertSame(obj, mismatchArgs[2]); + } + +} \ No newline at end of file diff --git a/apm-sniffer/apm-sdk-plugin/sofarpc-plugin/src/test/java/org/apache/skywalking/apm/plugin/sofarpc/SofaRpcConsumerInterceptorTest.java b/apm-sniffer/apm-sdk-plugin/sofarpc-plugin/src/test/java/org/apache/skywalking/apm/plugin/sofarpc/SofaRpcConsumerInterceptorTest.java index 3dc0903ec0..c0c3420966 100644 --- a/apm-sniffer/apm-sdk-plugin/sofarpc-plugin/src/test/java/org/apache/skywalking/apm/plugin/sofarpc/SofaRpcConsumerInterceptorTest.java +++ b/apm-sniffer/apm-sdk-plugin/sofarpc-plugin/src/test/java/org/apache/skywalking/apm/plugin/sofarpc/SofaRpcConsumerInterceptorTest.java @@ -18,11 +18,11 @@ package org.apache.skywalking.apm.plugin.sofarpc; -import static org.hamcrest.CoreMatchers.is; -import static org.junit.Assert.assertNull; -import static org.junit.Assert.assertThat; -import static org.junit.Assert.assertTrue; -import static org.mockito.Mockito.when; +import com.alipay.sofa.rpc.client.ProviderInfo; +import com.alipay.sofa.rpc.context.RpcInternalContext; +import com.alipay.sofa.rpc.core.request.SofaRequest; +import com.alipay.sofa.rpc.core.response.SofaResponse; +import com.alipay.sofa.rpc.filter.ConsumerInvoker; import java.util.List; import org.apache.skywalking.apm.agent.core.conf.Config; import org.apache.skywalking.apm.agent.core.context.trace.AbstractTracingSpan; @@ -50,11 +50,12 @@ import org.mockito.Mockito; import org.mockito.junit.MockitoJUnit; import org.mockito.junit.MockitoRule; -import com.alipay.sofa.rpc.client.ProviderInfo; -import com.alipay.sofa.rpc.context.RpcInternalContext; -import com.alipay.sofa.rpc.core.request.SofaRequest; -import com.alipay.sofa.rpc.core.response.SofaResponse; -import com.alipay.sofa.rpc.filter.ConsumerInvoker; + +import static org.hamcrest.CoreMatchers.is; +import static org.junit.Assert.assertNull; +import static org.junit.Assert.assertThat; +import static org.junit.Assert.assertTrue; +import static org.mockito.Mockito.when; @RunWith(TracingSegmentRunner.class) public class SofaRpcConsumerInterceptorTest { @@ -121,7 +122,8 @@ public void setUp() throws Exception { @Test public void testConsumerWithAttachment() throws Throwable { - sofaRpcConsumerInterceptor.beforeMethod(enhancedInstance, null, allArguments, argumentTypes, methodInterceptResult); + sofaRpcConsumerInterceptor.beforeMethod( + enhancedInstance, null, allArguments, argumentTypes, methodInterceptResult); sofaRpcConsumerInterceptor.afterMethod(enhancedInstance, null, allArguments, argumentTypes, sofaResponse); assertThat(segmentStorage.getTraceSegments().size(), is(1)); @@ -133,8 +135,10 @@ public void testConsumerWithAttachment() throws Throwable { @Test public void testConsumerWithException() throws Throwable { - sofaRpcConsumerInterceptor.beforeMethod(enhancedInstance, null, allArguments, argumentTypes, methodInterceptResult); - sofaRpcConsumerInterceptor.handleMethodException(enhancedInstance, null, allArguments, argumentTypes, new RuntimeException()); + sofaRpcConsumerInterceptor.beforeMethod( + enhancedInstance, null, allArguments, argumentTypes, methodInterceptResult); + sofaRpcConsumerInterceptor.handleMethodException( + enhancedInstance, null, allArguments, argumentTypes, new RuntimeException()); sofaRpcConsumerInterceptor.afterMethod(enhancedInstance, null, allArguments, argumentTypes, sofaResponse); assertThat(segmentStorage.getTraceSegments().size(), is(1)); TraceSegment traceSegment = segmentStorage.getTraceSegments().get(0); @@ -146,7 +150,8 @@ public void testConsumerWithResultHasException() throws Throwable { when(sofaResponse.isError()).thenReturn(true); when(sofaResponse.getAppResponse()).thenReturn(new RuntimeException()); - sofaRpcConsumerInterceptor.beforeMethod(enhancedInstance, null, allArguments, argumentTypes, methodInterceptResult); + sofaRpcConsumerInterceptor.beforeMethod( + enhancedInstance, null, allArguments, argumentTypes, methodInterceptResult); sofaRpcConsumerInterceptor.afterMethod(enhancedInstance, null, allArguments, argumentTypes, sofaResponse); assertThat(segmentStorage.getTraceSegments().size(), is(1)); @@ -180,8 +185,11 @@ private void assertCommonsAttribute(AbstractTracingSpan span) { assertThat(tags.size(), is(1)); assertThat(SpanHelper.getLayer(span), CoreMatchers.is(SpanLayer.RPC_FRAMEWORK)); assertThat(SpanHelper.getComponentId(span), is(43)); - assertThat(tags.get(0) - .getValue(), is("bolt://127.0.0.1:12200/org.apache.skywalking.apm.test.TestSofaRpcService.test(String)")); + assertThat( + tags.get(0) + .getValue(), + is("bolt://127.0.0.1:12200/org.apache.skywalking.apm.test.TestSofaRpcService.test(String)") + ); assertThat(span.getOperationName(), is("org.apache.skywalking.apm.test.TestSofaRpcService.test(String)")); } } diff --git a/apm-sniffer/apm-sdk-plugin/sofarpc-plugin/src/test/java/org/apache/skywalking/apm/plugin/sofarpc/SofaRpcProviderInterceptorTest.java b/apm-sniffer/apm-sdk-plugin/sofarpc-plugin/src/test/java/org/apache/skywalking/apm/plugin/sofarpc/SofaRpcProviderInterceptorTest.java index 43c2938e5e..80edc8016a 100644 --- a/apm-sniffer/apm-sdk-plugin/sofarpc-plugin/src/test/java/org/apache/skywalking/apm/plugin/sofarpc/SofaRpcProviderInterceptorTest.java +++ b/apm-sniffer/apm-sdk-plugin/sofarpc-plugin/src/test/java/org/apache/skywalking/apm/plugin/sofarpc/SofaRpcProviderInterceptorTest.java @@ -18,10 +18,11 @@ package org.apache.skywalking.apm.plugin.sofarpc; -import static org.hamcrest.CoreMatchers.is; -import static org.junit.Assert.assertThat; -import static org.junit.Assert.assertTrue; -import static org.mockito.Mockito.when; +import com.alipay.sofa.rpc.client.ProviderInfo; +import com.alipay.sofa.rpc.context.RpcInternalContext; +import com.alipay.sofa.rpc.core.request.SofaRequest; +import com.alipay.sofa.rpc.core.response.SofaResponse; +import com.alipay.sofa.rpc.filter.ProviderInvoker; import java.util.List; import org.apache.skywalking.apm.agent.core.conf.Config; import org.apache.skywalking.apm.agent.core.context.SW8CarrierItem; @@ -51,11 +52,11 @@ import org.mockito.Mockito; import org.mockito.junit.MockitoJUnit; import org.mockito.junit.MockitoRule; -import com.alipay.sofa.rpc.client.ProviderInfo; -import com.alipay.sofa.rpc.context.RpcInternalContext; -import com.alipay.sofa.rpc.core.request.SofaRequest; -import com.alipay.sofa.rpc.core.response.SofaResponse; -import com.alipay.sofa.rpc.filter.ProviderInvoker; + +import static org.hamcrest.CoreMatchers.is; +import static org.junit.Assert.assertThat; +import static org.junit.Assert.assertTrue; +import static org.mockito.Mockito.when; @RunWith(TracingSegmentRunner.class) public class SofaRpcProviderInterceptorTest { From bb384e4ebc0da266c76be8ac6c32c9aab348c681 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E5=90=B4=E6=99=9F=20Wu=20Sheng?= Date: Sun, 24 Mar 2024 21:24:05 +0800 Subject: [PATCH 07/10] Apply suggestions from code review --- .../skywalking/apm/plugin/sofarpc/InvokeCallbackWrapper.java | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/apm-sniffer/apm-sdk-plugin/sofarpc-plugin/src/main/java/org/apache/skywalking/apm/plugin/sofarpc/InvokeCallbackWrapper.java b/apm-sniffer/apm-sdk-plugin/sofarpc-plugin/src/main/java/org/apache/skywalking/apm/plugin/sofarpc/InvokeCallbackWrapper.java index d34b91d5f7..328d8cd0e6 100644 --- a/apm-sniffer/apm-sdk-plugin/sofarpc-plugin/src/main/java/org/apache/skywalking/apm/plugin/sofarpc/InvokeCallbackWrapper.java +++ b/apm-sniffer/apm-sdk-plugin/sofarpc-plugin/src/main/java/org/apache/skywalking/apm/plugin/sofarpc/InvokeCallbackWrapper.java @@ -34,7 +34,6 @@ public InvokeCallbackWrapper(InvokeCallback invokeCallback) { this.contextSnapshot = ContextManager.capture(); } this.invokeCallback = invokeCallback; - } @Override @@ -49,8 +48,8 @@ public void onResponse(final Object o) { ContextManager.activeSpan().log(t); throw t; } finally { - ContextManager.stopSpan(); contextSnapshot = null; + ContextManager.stopSpan(); } } @@ -73,8 +72,8 @@ public void onException(final Throwable throwable) { ContextManager.activeSpan().log(t); throw t; } finally { - ContextManager.stopSpan(); contextSnapshot = null; + ContextManager.stopSpan(); } } From 5cccf8f06f35ee3766da96bcb9e82715980826fc Mon Sep 17 00:00:00 2001 From: Daniel Zhang Date: Mon, 25 Mar 2024 09:16:51 +0800 Subject: [PATCH 08/10] Add lombok annotations for getter methods. --- .../apm/plugin/sofarpc/InvokeCallbackWrapper.java | 11 +++-------- 1 file changed, 3 insertions(+), 8 deletions(-) diff --git a/apm-sniffer/apm-sdk-plugin/sofarpc-plugin/src/main/java/org/apache/skywalking/apm/plugin/sofarpc/InvokeCallbackWrapper.java b/apm-sniffer/apm-sdk-plugin/sofarpc-plugin/src/main/java/org/apache/skywalking/apm/plugin/sofarpc/InvokeCallbackWrapper.java index 328d8cd0e6..e73ed8161a 100644 --- a/apm-sniffer/apm-sdk-plugin/sofarpc-plugin/src/main/java/org/apache/skywalking/apm/plugin/sofarpc/InvokeCallbackWrapper.java +++ b/apm-sniffer/apm-sdk-plugin/sofarpc-plugin/src/main/java/org/apache/skywalking/apm/plugin/sofarpc/InvokeCallbackWrapper.java @@ -20,13 +20,16 @@ import com.alipay.remoting.InvokeCallback; import java.util.concurrent.Executor; +import lombok.Getter; import org.apache.skywalking.apm.agent.core.context.ContextManager; import org.apache.skywalking.apm.agent.core.context.ContextSnapshot; import org.apache.skywalking.apm.agent.core.context.trace.AbstractSpan; public class InvokeCallbackWrapper implements InvokeCallback { + @Getter private ContextSnapshot contextSnapshot; + @Getter private final InvokeCallback invokeCallback; public InvokeCallbackWrapper(InvokeCallback invokeCallback) { @@ -81,12 +84,4 @@ public void onException(final Throwable throwable) { public Executor getExecutor() { return invokeCallback.getExecutor(); } - - protected ContextSnapshot getContextSnapshot() { - return contextSnapshot; - } - - protected InvokeCallback getInvokeCallback() { - return invokeCallback; - } } From 05b43578fdc1bffd7cd3d8eb5851d5388c07b068 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E5=90=B4=E6=99=9F=20Wu=20Sheng?= Date: Mon, 25 Mar 2024 09:44:49 +0800 Subject: [PATCH 09/10] Apply suggestions from code review --- .../skywalking/apm/plugin/sofarpc/InvokeCallbackWrapper.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/apm-sniffer/apm-sdk-plugin/sofarpc-plugin/src/main/java/org/apache/skywalking/apm/plugin/sofarpc/InvokeCallbackWrapper.java b/apm-sniffer/apm-sdk-plugin/sofarpc-plugin/src/main/java/org/apache/skywalking/apm/plugin/sofarpc/InvokeCallbackWrapper.java index e73ed8161a..5b1526f5db 100644 --- a/apm-sniffer/apm-sdk-plugin/sofarpc-plugin/src/main/java/org/apache/skywalking/apm/plugin/sofarpc/InvokeCallbackWrapper.java +++ b/apm-sniffer/apm-sdk-plugin/sofarpc-plugin/src/main/java/org/apache/skywalking/apm/plugin/sofarpc/InvokeCallbackWrapper.java @@ -27,9 +27,9 @@ public class InvokeCallbackWrapper implements InvokeCallback { - @Getter + @Getter(AccessLevel.PACKAGE) private ContextSnapshot contextSnapshot; - @Getter + @Getter(AccessLevel.PACKAGE) private final InvokeCallback invokeCallback; public InvokeCallbackWrapper(InvokeCallback invokeCallback) { From 49ffb3f657997988b329371b85f4b46b66405099 Mon Sep 17 00:00:00 2001 From: Daniel Zhang Date: Mon, 25 Mar 2024 10:05:10 +0800 Subject: [PATCH 10/10] Add lombok import statement --- .../skywalking/apm/plugin/sofarpc/InvokeCallbackWrapper.java | 1 + 1 file changed, 1 insertion(+) diff --git a/apm-sniffer/apm-sdk-plugin/sofarpc-plugin/src/main/java/org/apache/skywalking/apm/plugin/sofarpc/InvokeCallbackWrapper.java b/apm-sniffer/apm-sdk-plugin/sofarpc-plugin/src/main/java/org/apache/skywalking/apm/plugin/sofarpc/InvokeCallbackWrapper.java index 5b1526f5db..cf806cb592 100644 --- a/apm-sniffer/apm-sdk-plugin/sofarpc-plugin/src/main/java/org/apache/skywalking/apm/plugin/sofarpc/InvokeCallbackWrapper.java +++ b/apm-sniffer/apm-sdk-plugin/sofarpc-plugin/src/main/java/org/apache/skywalking/apm/plugin/sofarpc/InvokeCallbackWrapper.java @@ -20,6 +20,7 @@ import com.alipay.remoting.InvokeCallback; import java.util.concurrent.Executor; +import lombok.AccessLevel; import lombok.Getter; import org.apache.skywalking.apm.agent.core.context.ContextManager; import org.apache.skywalking.apm.agent.core.context.ContextSnapshot;