From 06c60d97e930fe932fc148e40aa5031372a210c7 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?SampsonYe=28=E5=8F=B6=E9=A3=9E=29?= Date: Thu, 15 Sep 2022 16:18:55 +0800 Subject: [PATCH] fix: continue with origin trace info when pub/sub retry (#507) --- .../CapDiagnosticProcessor.cs | 37 ++++++++++++++++--- 1 file changed, 31 insertions(+), 6 deletions(-) diff --git a/src/SkyApm.Diagnostics.CAP/CapDiagnosticProcessor.cs b/src/SkyApm.Diagnostics.CAP/CapDiagnosticProcessor.cs index f60859e9..403222f3 100644 --- a/src/SkyApm.Diagnostics.CAP/CapDiagnosticProcessor.cs +++ b/src/SkyApm.Diagnostics.CAP/CapDiagnosticProcessor.cs @@ -106,11 +106,24 @@ public void ErrorPublishStore([Object] CapEventDataPubStore eventData) [DiagnosticName(CapEvents.BeforePublish)] public void BeforePublish([Object] CapEventDataPubSend eventData) { - _localSegmentContextAccessor.Context = _contexts[eventData.TransportMessage.GetId()]; - + SegmentContext context = null; var host = eventData.BrokerAddress.Endpoint.Replace("-1", "5672"); - var context = _tracingContext.CreateExitSegmentContext(OperateNamePrefix + eventData.Operation + ProducerOperateNameSuffix, - host, new CapCarrierHeaderCollection(eventData.TransportMessage)); + if (_contexts.TryGetValue(eventData.TransportMessage.GetId(),out var ctx)) + { + _localSegmentContextAccessor.Context = ctx; + context = _tracingContext.CreateExitSegmentContext(OperateNamePrefix + eventData.Operation + ProducerOperateNameSuffix, + host, new CapCarrierHeaderCollection(eventData.TransportMessage)); + + } + else + { + // may be come from retry loop + var carrierHeader = new CapCarrierHeaderCollection(eventData.TransportMessage); + var eventName = OperateNamePrefix + eventData.Operation + ProducerOperateNameSuffix; + var operationName = OperateNamePrefix + eventName + ConsumerOperateNameSuffix; + context = _tracingContext.CreateEntrySegmentContext(operationName, carrierHeader); + } + context.Span.SpanLayer = SpanLayer.MQ; context.Span.Component = GetComponent(eventData.BrokerAddress, true); @@ -205,9 +218,21 @@ public void CapErrorConsume([Object] CapEventDataSubStore eventData) [DiagnosticName(CapEvents.BeforeSubscriberInvoke)] public void CapBeforeSubscriberInvoke([Object] CapEventDataSubExecute eventData) { - _entrySegmentContextAccessor.Context = _contexts[eventData.Message.GetId() + eventData.Message.GetGroup()]; + SegmentContext context = null; + if (_contexts.TryGetValue(eventData.Message.GetId() + eventData.Message.GetGroup(),out var ctx)) + { + _entrySegmentContextAccessor.Context = ctx; + context = _tracingContext.CreateLocalSegmentContext("Subscriber Invoke: " + eventData.MethodInfo.Name); + } + else + { + // may be come from retry loop + var carrierHeader = new CapCarrierHeaderCollection(eventData.Message); + var eventName = eventData.Message.GetGroup() + "/" + eventData.Operation; + var operationName = OperateNamePrefix + eventName + ConsumerOperateNameSuffix; + context = _tracingContext.CreateEntrySegmentContext(operationName, carrierHeader); + } - var context = _tracingContext.CreateLocalSegmentContext("Subscriber Invoke: " + eventData.MethodInfo.Name); context.Span.SpanLayer = SpanLayer.MQ; context.Span.Component = Components.CAP; context.Span.AddLog(LogEvent.Event("Subscriber Invoke Start"));