Skip to content

Commit

Permalink
Merge #3164 into 2.0.0-M4
Browse files Browse the repository at this point in the history
  • Loading branch information
violetagg committed Apr 17, 2024
2 parents 6551885 + 0daf7e8 commit 2f9f7d5
Show file tree
Hide file tree
Showing 7 changed files with 33 additions and 57 deletions.
10 changes: 4 additions & 6 deletions build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -85,11 +85,11 @@ ext {
}

//Metrics
micrometerVersion = '1.10.0' //optional baseline
micrometerTracingVersion = '1.0.0' //optional baseline
micrometerDocsVersion = '1.0.0' //optional baseline
micrometerVersion = '1.13.0-SNAPSHOT' //optional baseline
micrometerTracingVersion = '1.3.0-SNAPSHOT' //optional baseline
micrometerDocsVersion = '1.0.2' //optional baseline

contextPropagationDefaultVersion = '1.0.0' //optional baseline
contextPropagationDefaultVersion = '1.1.0' //optional baseline
if (!project.hasProperty("forceContextPropagationVersion")) {
contextPropagationVersion = contextPropagationDefaultVersion
}
Expand Down Expand Up @@ -287,8 +287,6 @@ subprojects {
if (project.hasProperty("forceTransport")) {
systemProperty("forceTransport", forceTransport)
}
systemProperty("reactorCoreVersionMinor", VersionNumber.parse(reactorCoreVersion.toString()).minor)
systemProperty("contextPropagationVersionMicro", VersionNumber.parse(contextPropagationVersion.toString()).micro)
scanForTestClasses = false
include '**/*Tests.*'
include '**/*Test.*'
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import brave.propagation.StrictCurrentTraceContext;
import brave.sampler.Sampler;
import io.micrometer.context.ContextSnapshot;
import io.micrometer.context.ContextSnapshotFactory;
import io.micrometer.tracing.CurrentTraceContext;
import io.micrometer.tracing.Tracer;
import io.micrometer.tracing.brave.bridge.BraveBaggageManager;
Expand Down Expand Up @@ -73,7 +74,7 @@ public boolean isSharable() {
@Override
@SuppressWarnings("try")
public Future<Void> write(ChannelHandlerContext ctx, Object msg) {
try (ContextSnapshot.Scope scope = ContextSnapshot.setAllThreadLocalsFrom(ctx.channel())) {
try (ContextSnapshot.Scope scope = ContextSnapshotFactory.builder().build().setThreadLocalsFrom(ctx.channel())) {
System.out.println("Current Observation in Scope: " + OBSERVATION_REGISTRY.getCurrentObservation());
}
System.out.println("Current Observation: " + OBSERVATION_REGISTRY.getCurrentObservation());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import brave.propagation.StrictCurrentTraceContext;
import brave.sampler.Sampler;
import io.micrometer.context.ContextSnapshot;
import io.micrometer.context.ContextSnapshotFactory;
import io.micrometer.tracing.CurrentTraceContext;
import io.micrometer.tracing.Tracer;
import io.micrometer.tracing.brave.bridge.BraveBaggageManager;
Expand Down Expand Up @@ -73,7 +74,7 @@ public boolean isSharable() {
@Override
@SuppressWarnings("try")
public Future<Void> write(ChannelHandlerContext ctx, Object msg) {
try (ContextSnapshot.Scope scope = ContextSnapshot.setAllThreadLocalsFrom(ctx.channel())) {
try (ContextSnapshot.Scope scope = ContextSnapshotFactory.builder().build().setThreadLocalsFrom(ctx.channel())) {
System.out.println("Current Observation in Scope: " + OBSERVATION_REGISTRY.getCurrentObservation());
}
System.out.println("Current Observation: " + OBSERVATION_REGISTRY.getCurrentObservation());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import brave.propagation.StrictCurrentTraceContext;
import brave.sampler.Sampler;
import io.micrometer.context.ContextSnapshot;
import io.micrometer.context.ContextSnapshotFactory;
import io.micrometer.tracing.CurrentTraceContext;
import io.micrometer.tracing.Tracer;
import io.micrometer.tracing.brave.bridge.BraveBaggageManager;
Expand Down Expand Up @@ -68,7 +69,7 @@ public boolean isSharable() {
@Override
@SuppressWarnings("try")
public Future<Void> connect(ChannelHandlerContext ctx, SocketAddress remoteAddress, SocketAddress localAddress) {
try (ContextSnapshot.Scope scope = ContextSnapshot.setAllThreadLocalsFrom(ctx.channel())) {
try (ContextSnapshot.Scope scope = ContextSnapshotFactory.builder().build().setThreadLocalsFrom(ctx.channel())) {
System.out.println("Current Observation in Scope: " + OBSERVATION_REGISTRY.getCurrentObservation());
}
System.out.println("Current Observation: " + OBSERVATION_REGISTRY.getCurrentObservation());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import brave.propagation.StrictCurrentTraceContext;
import brave.sampler.Sampler;
import io.micrometer.context.ContextSnapshot;
import io.micrometer.context.ContextSnapshotFactory;
import io.micrometer.tracing.CurrentTraceContext;
import io.micrometer.tracing.Tracer;
import io.micrometer.tracing.brave.bridge.BraveBaggageManager;
Expand Down Expand Up @@ -70,7 +71,7 @@ static final class CustomChannelInboundHandler extends ChannelHandlerAdapter {
@Override
@SuppressWarnings("try")
public void channelActive(ChannelHandlerContext ctx) {
try (ContextSnapshot.Scope scope = ContextSnapshot.setAllThreadLocalsFrom(ctx.channel())) {
try (ContextSnapshot.Scope scope = ContextSnapshotFactory.builder().build().setThreadLocalsFrom(ctx.channel())) {
System.out.println("Current Observation in Scope: " + OBSERVATION_REGISTRY.getCurrentObservation());
}
System.out.println("Current Observation: " + OBSERVATION_REGISTRY.getCurrentObservation());
Expand Down
7 changes: 6 additions & 1 deletion reactor-netty5-http/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -132,7 +132,12 @@ dependencies {
// JSR-305 annotations
testCompileOnly "com.google.code.findbugs:jsr305:$jsr305Version"

testImplementation "org.mockito:mockito-core:$mockitoVersion"
testImplementation ("org.mockito:mockito-core") {
// Workaround for https://github.com/micrometer-metrics/micrometer/issues/4968
version {
strictly "$mockitoVersion"
}
}
testImplementation "io.specto:hoverfly-java-junit5:$hoverflyJavaVersion"
testImplementation "org.apache.tomcat.embed:tomcat-embed-core:$tomcatVersion"
testImplementation "io.projectreactor:reactor-test:$testAddonVersion"
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2022-2023 VMware, Inc. or its affiliates, All Rights Reserved.
* Copyright (c) 2022-2024 VMware, Inc. or its affiliates, All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -124,23 +124,14 @@ void testContextPropagation(HttpClient client) {
@ParameterizedTest
@MethodSource("httpClientCombinations")
void testAutomaticContextPropagation(HttpClient client) {
String reactorCoreVersionMinor = System.getProperty("reactorCoreVersionMinor");
String contextPropagationVersionMicro = System.getProperty("contextPropagationVersionMicro");

boolean enableAutomaticContextPropagation =
reactorCoreVersionMinor != null && !reactorCoreVersionMinor.isEmpty() && Integer.parseInt(reactorCoreVersionMinor) >= 6 && // 3.6.x
contextPropagationVersionMicro != null && !contextPropagationVersionMicro.isEmpty() && Integer.parseInt(contextPropagationVersionMicro) >= 5; // 1.0.5 or above

HttpServer server = client.configuration().sslProvider() != null ?
baseServer.secure(spec -> spec.sslContext(serverCtx)).protocol(HttpProtocol.HTTP11, HttpProtocol.H2) :
baseServer.protocol(HttpProtocol.HTTP11, HttpProtocol.H2C);

disposableServer = server.bindNow();

try {
if (enableAutomaticContextPropagation) {
Hooks.enableAutomaticContextPropagation();
}
Hooks.enableAutomaticContextPropagation();

registry.registerThreadLocalAccessor(new TestThreadLocalAccessor());

Expand All @@ -153,17 +144,15 @@ void testAutomaticContextPropagation(HttpClient client) {
.uri("/")
.send(BufferMono.fromString(Mono.just("test")));

response(responseReceiver, enableAutomaticContextPropagation);
responseConnection(responseReceiver, enableAutomaticContextPropagation);
responseContent(responseReceiver, enableAutomaticContextPropagation);
responseSingle(responseReceiver, enableAutomaticContextPropagation);
response(responseReceiver);
responseConnection(responseReceiver);
responseContent(responseReceiver);
responseSingle(responseReceiver);
}
finally {
TestThreadLocalHolder.reset();
registry.removeThreadLocalAccessor(TestThreadLocalAccessor.KEY);
if (enableAutomaticContextPropagation) {
Hooks.disableAutomaticContextPropagation();
}
Hooks.disableAutomaticContextPropagation();
disposableServer.disposeNow();
}
}
Expand Down Expand Up @@ -208,63 +197,43 @@ static Object[] httpClientCombinations() {
};
}

static void response(HttpClient.ResponseReceiver<?> responseReceiver, boolean enableAutomaticContextPropagation) {
static void response(HttpClient.ResponseReceiver<?> responseReceiver) {
AtomicReference<String> threadLocal = new AtomicReference<>();
responseReceiver.response()
.doOnNext(s -> threadLocal.set(TestThreadLocalHolder.value()))
.block(Duration.ofSeconds(5));

if (enableAutomaticContextPropagation) {
assertThat(threadLocal.get()).isNotNull().isEqualTo("First");
}
else {
assertThat(threadLocal.get()).isNull();
}
assertThat(threadLocal.get()).isNotNull().isEqualTo("First");
}

static void responseConnection(HttpClient.ResponseReceiver<?> responseReceiver, boolean enableAutomaticContextPropagation) {
static void responseConnection(HttpClient.ResponseReceiver<?> responseReceiver) {
AtomicReference<String> threadLocal = new AtomicReference<>();
responseReceiver.responseConnection((res, conn) -> conn.inbound().receive().aggregate().asString())
.next()
.doOnNext(s -> threadLocal.set(TestThreadLocalHolder.value()))
.block(Duration.ofSeconds(5));

if (enableAutomaticContextPropagation) {
assertThat(threadLocal.get()).isNotNull().isEqualTo("First");
}
else {
assertThat(threadLocal.get()).isNull();
}
assertThat(threadLocal.get()).isNotNull().isEqualTo("First");
}

static void responseContent(HttpClient.ResponseReceiver<?> responseReceiver, boolean enableAutomaticContextPropagation) {
static void responseContent(HttpClient.ResponseReceiver<?> responseReceiver) {
AtomicReference<String> threadLocal = new AtomicReference<>();
responseReceiver.responseContent()
.aggregate()
.asString()
.doOnNext(s -> threadLocal.set(TestThreadLocalHolder.value()))
.block(Duration.ofSeconds(5));

if (enableAutomaticContextPropagation) {
assertThat(threadLocal.get()).isNotNull().isEqualTo("First");
}
else {
assertThat(threadLocal.get()).isNull();
}
assertThat(threadLocal.get()).isNotNull().isEqualTo("First");
}

static void responseSingle(HttpClient.ResponseReceiver<?> responseReceiver, boolean enableAutomaticContextPropagation) {
static void responseSingle(HttpClient.ResponseReceiver<?> responseReceiver) {
AtomicReference<String> threadLocal = new AtomicReference<>();
responseReceiver.responseSingle((res, bytes) -> bytes.asString())
.doOnNext(s -> threadLocal.set(TestThreadLocalHolder.value()))
.block(Duration.ofSeconds(5));

if (enableAutomaticContextPropagation) {
assertThat(threadLocal.get()).isNotNull().isEqualTo("First");
}
else {
assertThat(threadLocal.get()).isNull();
}
assertThat(threadLocal.get()).isNotNull().isEqualTo("First");
}

static void sendRequest(HttpClient client, String expectation) {
Expand Down

0 comments on commit 2f9f7d5

Please sign in to comment.