diff --git a/gradle.properties b/gradle.properties index 7b5ac2349..237ba8625 100644 --- a/gradle.properties +++ b/gradle.properties @@ -11,5 +11,5 @@ # See the License for the specific language governing permissions and # limitations under the License. # -version=1.1.3 +version=1.1.4-SNAPSHOT perfBaselineVersion=1.1.2 diff --git a/rsocket-micrometer/src/main/java/io/rsocket/micrometer/observation/ObservationRequesterRSocketProxy.java b/rsocket-micrometer/src/main/java/io/rsocket/micrometer/observation/ObservationRequesterRSocketProxy.java index 5a89071b4..fb80ea317 100644 --- a/rsocket-micrometer/src/main/java/io/rsocket/micrometer/observation/ObservationRequesterRSocketProxy.java +++ b/rsocket-micrometer/src/main/java/io/rsocket/micrometer/observation/ObservationRequesterRSocketProxy.java @@ -32,6 +32,7 @@ import org.reactivestreams.Publisher; import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; +import reactor.util.annotation.Nullable; import reactor.util.context.ContextView; /** @@ -43,13 +44,24 @@ */ public class ObservationRequesterRSocketProxy extends RSocketProxy { + /** Aligned with ObservationThreadLocalAccessor#KEY */ + private static final String MICROMETER_OBSERVATION_KEY = "micrometer.observation"; + private final ObservationRegistry observationRegistry; - private RSocketRequesterObservationConvention observationConvention; + @Nullable private final RSocketRequesterObservationConvention observationConvention; public ObservationRequesterRSocketProxy(RSocket source, ObservationRegistry observationRegistry) { + this(source, observationRegistry, null); + } + + public ObservationRequesterRSocketProxy( + RSocket source, + ObservationRegistry observationRegistry, + RSocketRequesterObservationConvention observationConvention) { super(source); this.observationRegistry = observationRegistry; + this.observationConvention = observationConvention; } @Override @@ -76,15 +88,7 @@ Mono setObservation( FrameType frameType, ObservationDocumentation observation) { return Mono.deferContextual( - contextView -> { - if (contextView.hasKey(Observation.class)) { - Observation parent = contextView.get(Observation.class); - try (Observation.Scope scope = parent.openScope()) { - return observe(input, payload, frameType, observation); - } - } - return observe(input, payload, frameType, observation); - }); + contextView -> observe(input, payload, frameType, observation, contextView)); } private String route(Payload payload) { @@ -107,18 +111,22 @@ private Mono observe( Function> input, Payload payload, FrameType frameType, - ObservationDocumentation obs) { + ObservationDocumentation obs, + ContextView contextView) { String route = route(payload); RSocketContext rSocketContext = new RSocketContext( payload, payload.sliceMetadata(), frameType, route, RSocketContext.Side.REQUESTER); + Observation parentObservation = contextView.getOrDefault(MICROMETER_OBSERVATION_KEY, null); Observation observation = - obs.start( - this.observationConvention, - new DefaultRSocketRequesterObservationConvention(rSocketContext), - () -> rSocketContext, - observationRegistry); + obs.observation( + this.observationConvention, + new DefaultRSocketRequesterObservationConvention(rSocketContext), + () -> rSocketContext, + observationRegistry) + .parentObservation(parentObservation); setContextualName(frameType, route, observation); + observation.start(); Payload newPayload = payload; if (rSocketContext.modifiedPayload != null) { newPayload = rSocketContext.modifiedPayload; @@ -126,26 +134,17 @@ private Mono observe( return input .apply(newPayload) .doOnError(observation::error) - .doFinally(signalType -> observation.stop()); - } - - private Observation observation(ContextView contextView) { - if (contextView.hasKey(Observation.class)) { - return contextView.get(Observation.class); - } - return null; + .doFinally(signalType -> observation.stop()) + .contextWrite(context -> context.put(MICROMETER_OBSERVATION_KEY, observation)); } @Override public Flux requestStream(Payload payload) { - return Flux.deferContextual( - contextView -> - setObservation( - super::requestStream, - payload, - contextView, - FrameType.REQUEST_STREAM, - RSocketObservationDocumentation.RSOCKET_REQUESTER_REQUEST_STREAM)); + return observationFlux( + super::requestStream, + payload, + FrameType.REQUEST_STREAM, + RSocketObservationDocumentation.RSOCKET_REQUESTER_REQUEST_STREAM); } @Override @@ -155,10 +154,9 @@ public Flux requestChannel(Publisher inbound) { (firstSignal, flux) -> { final Payload firstPayload = firstSignal.get(); if (firstPayload != null) { - return setObservation( + return observationFlux( p -> super.requestChannel(flux.skip(1).startWith(p)), firstPayload, - firstSignal.getContextView(), FrameType.REQUEST_CHANNEL, RSocketObservationDocumentation.RSOCKET_REQUESTER_REQUEST_CHANNEL); } @@ -166,21 +164,6 @@ public Flux requestChannel(Publisher inbound) { }); } - private Flux setObservation( - Function> input, - Payload payload, - ContextView contextView, - FrameType frameType, - ObservationDocumentation obs) { - Observation parentObservation = observation(contextView); - if (parentObservation == null) { - return observationFlux(input, payload, frameType, obs); - } - try (Observation.Scope scope = parentObservation.openScope()) { - return observationFlux(input, payload, frameType, obs); - } - } - private Flux observationFlux( Function> input, Payload payload, @@ -196,17 +179,22 @@ private Flux observationFlux( frameType, route, RSocketContext.Side.REQUESTER); + Observation parentObservation = + contextView.getOrDefault(MICROMETER_OBSERVATION_KEY, null); Observation newObservation = - obs.start( - this.observationConvention, - new DefaultRSocketRequesterObservationConvention(rSocketContext), - () -> rSocketContext, - this.observationRegistry); + obs.observation( + this.observationConvention, + new DefaultRSocketRequesterObservationConvention(rSocketContext), + () -> rSocketContext, + this.observationRegistry) + .parentObservation(parentObservation); setContextualName(frameType, route, newObservation); + newObservation.start(); return input .apply(rSocketContext.modifiedPayload) .doOnError(newObservation::error) - .doFinally(signalType -> newObservation.stop()); + .doFinally(signalType -> newObservation.stop()) + .contextWrite(context -> context.put(MICROMETER_OBSERVATION_KEY, newObservation)); }); } @@ -217,8 +205,4 @@ private void setContextualName(FrameType frameType, String route, Observation ne newObservation.contextualName(frameType.name()); } } - - public void setObservationConvention(RSocketRequesterObservationConvention convention) { - this.observationConvention = convention; - } } diff --git a/rsocket-micrometer/src/main/java/io/rsocket/micrometer/observation/ObservationResponderRSocketProxy.java b/rsocket-micrometer/src/main/java/io/rsocket/micrometer/observation/ObservationResponderRSocketProxy.java index 47c05f76c..9ed27adf3 100644 --- a/rsocket-micrometer/src/main/java/io/rsocket/micrometer/observation/ObservationResponderRSocketProxy.java +++ b/rsocket-micrometer/src/main/java/io/rsocket/micrometer/observation/ObservationResponderRSocketProxy.java @@ -30,6 +30,7 @@ import org.reactivestreams.Publisher; import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; +import reactor.util.annotation.Nullable; /** * Tracing representation of a {@link RSocketProxy} for the responder. @@ -39,14 +40,24 @@ * @since 1.1.4 */ public class ObservationResponderRSocketProxy extends RSocketProxy { + /** Aligned with ObservationThreadLocalAccessor#KEY */ + private static final String MICROMETER_OBSERVATION_KEY = "micrometer.observation"; private final ObservationRegistry observationRegistry; - private RSocketResponderObservationConvention observationConvention; + @Nullable private final RSocketResponderObservationConvention observationConvention; public ObservationResponderRSocketProxy(RSocket source, ObservationRegistry observationRegistry) { + this(source, observationRegistry, null); + } + + public ObservationResponderRSocketProxy( + RSocket source, + ObservationRegistry observationRegistry, + RSocketResponderObservationConvention observationConvention) { super(source); this.observationRegistry = observationRegistry; + this.observationConvention = observationConvention; } @Override @@ -66,7 +77,8 @@ public Mono fireAndForget(Payload payload) { startObservation(RSocketObservationDocumentation.RSOCKET_RESPONDER_FNF, rSocketContext); return super.fireAndForget(rSocketContext.modifiedPayload) .doOnError(newObservation::error) - .doFinally(signalType -> newObservation.stop()); + .doFinally(signalType -> newObservation.stop()) + .contextWrite(context -> context.put(MICROMETER_OBSERVATION_KEY, newObservation)); } private Observation startObservation( @@ -94,7 +106,8 @@ public Mono requestResponse(Payload payload) { RSocketObservationDocumentation.RSOCKET_RESPONDER_REQUEST_RESPONSE, rSocketContext); return super.requestResponse(rSocketContext.modifiedPayload) .doOnError(newObservation::error) - .doFinally(signalType -> newObservation.stop()); + .doFinally(signalType -> newObservation.stop()) + .contextWrite(context -> context.put(MICROMETER_OBSERVATION_KEY, newObservation)); } @Override @@ -109,7 +122,8 @@ public Flux requestStream(Payload payload) { RSocketObservationDocumentation.RSOCKET_RESPONDER_REQUEST_STREAM, rSocketContext); return super.requestStream(rSocketContext.modifiedPayload) .doOnError(newObservation::error) - .doFinally(signalType -> newObservation.stop()); + .doFinally(signalType -> newObservation.stop()) + .contextWrite(context -> context.put(MICROMETER_OBSERVATION_KEY, newObservation)); } @Override @@ -137,7 +151,9 @@ public Flux requestChannel(Publisher payloads) { } return super.requestChannel(flux.skip(1).startWith(rSocketContext.modifiedPayload)) .doOnError(newObservation::error) - .doFinally(signalType -> newObservation.stop()); + .doFinally(signalType -> newObservation.stop()) + .contextWrite( + context -> context.put(MICROMETER_OBSERVATION_KEY, newObservation)); } return flux; }); @@ -160,8 +176,4 @@ private String route(Payload payload, ByteBuf headers) { } return null; } - - public void setObservationConvention(RSocketResponderObservationConvention convention) { - this.observationConvention = convention; - } } diff --git a/rsocket-micrometer/src/main/java/io/rsocket/micrometer/observation/RSocketRequesterTracingObservationHandler.java b/rsocket-micrometer/src/main/java/io/rsocket/micrometer/observation/RSocketRequesterTracingObservationHandler.java index 2cb3450d2..996267d4a 100644 --- a/rsocket-micrometer/src/main/java/io/rsocket/micrometer/observation/RSocketRequesterTracingObservationHandler.java +++ b/rsocket-micrometer/src/main/java/io/rsocket/micrometer/observation/RSocketRequesterTracingObservationHandler.java @@ -69,6 +69,10 @@ public Tracer getTracer() { public void onStart(RSocketContext context) { Payload payload = context.payload; Span.Builder spanBuilder = this.tracer.spanBuilder(); + Span parentSpan = getParentSpan(context); + if (parentSpan != null) { + spanBuilder.setParent(parentSpan.context()); + } Span span = spanBuilder.kind(Span.Kind.PRODUCER).start(); log.debug("Extracted result from context or thread local {}", span); // TODO: newmetadata returns an empty composite byte buf