Skip to content

Commit

Permalink
Merge pull request quarkusio#26332 from radcortez/fix-25336
Browse files Browse the repository at this point in the history
Always create a duplicated context in OpenTelemetry when executing client requests
  • Loading branch information
gsmet authored Jul 4, 2022
2 parents b9d7536 + 6e095f4 commit 64f47d0
Show file tree
Hide file tree
Showing 10 changed files with 96 additions and 18 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
import static io.opentelemetry.semconv.trace.attributes.SemanticAttributes.HTTP_TARGET;
import static io.opentelemetry.semconv.trace.attributes.SemanticAttributes.HTTP_URL;
import static java.net.HttpURLConnection.HTTP_OK;
import static java.util.stream.Collectors.toSet;
import static org.junit.jupiter.api.Assertions.assertEquals;

import java.net.URI;
Expand All @@ -27,6 +28,8 @@
import io.quarkus.runtime.StartupEvent;
import io.quarkus.test.QuarkusUnitTest;
import io.quarkus.test.common.http.TestHTTPResource;
import io.vertx.core.CompositeFuture;
import io.vertx.core.Future;
import io.vertx.core.Vertx;
import io.vertx.core.buffer.Buffer;
import io.vertx.ext.web.Router;
Expand Down Expand Up @@ -114,14 +117,40 @@ void path() throws Exception {
assertEquals(server.getParentSpanId(), client.getSpanId());
}

@Test
void multiple() throws Exception {
HttpResponse<Buffer> response = WebClient.create(vertx)
.get(uri.getPort(), uri.getHost(), "/multiple")
.putHeader("host", uri.getHost())
.putHeader("port", uri.getPort() + "")
.send()
.toCompletionStage().toCompletableFuture()
.get();

assertEquals(HTTP_OK, response.statusCode());

List<SpanData> spans = spanExporter.getFinishedSpanItems(6);
assertEquals(1, spans.stream().map(SpanData::getTraceId).collect(toSet()).size());
}

@ApplicationScoped
public static class HelloRouter {
@Inject
Router router;
@Inject
Vertx vertx;

public void register(@Observes StartupEvent ev) {
router.get("/hello").handler(rc -> rc.response().end("hello"));
router.get("/hello/:name").handler(rc -> rc.response().end("hello " + rc.pathParam("name")));
router.get("/multiple").handler(rc -> {
String host = rc.request().getHeader("host");
int port = Integer.parseInt(rc.request().getHeader("port"));
WebClient webClient = WebClient.create(vertx);
Future<HttpResponse<Buffer>> one = webClient.get(port, host, "/hello/naruto").send();
Future<HttpResponse<Buffer>> two = webClient.get(port, host, "/hello/goku").send();
CompositeFuture.join(one, two).onComplete(event -> rc.response().end());
});
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -72,10 +72,14 @@ public OpenTelemetryClientFilter(final OpenTelemetry openTelemetry) {

@Override
public void filter(final ClientRequestContext request) {
Context parentContext = Context.current();
io.vertx.core.Context vertxContext = getVertxContext(request);
io.opentelemetry.context.Context parentContext = QuarkusContextStorage.getContext(vertxContext);
if (parentContext == null) {
parentContext = io.opentelemetry.context.Context.current();
}
if (instrumenter.shouldStart(parentContext, request)) {
Context spanContext = instrumenter.start(parentContext, request);
Scope scope = QuarkusContextStorage.INSTANCE.attach(getVertxContext(request), spanContext);
Scope scope = QuarkusContextStorage.INSTANCE.attach(vertxContext, spanContext);
request.setProperty(REST_CLIENT_OTEL_SPAN_CLIENT_CONTEXT, spanContext);
request.setProperty(REST_CLIENT_OTEL_SPAN_CLIENT_PARENT_CONTEXT, parentContext);
request.setProperty(REST_CLIENT_OTEL_SPAN_CLIENT_SCOPE, scope);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,7 @@ default <R> SpanOperation sendRequest(
if (instrumenter.shouldStart(parentContext, (REQ) request)) {
io.opentelemetry.context.Context spanContext = instrumenter.start(parentContext,
writableHeaders((REQ) request, headers));
Context duplicatedContext = VertxContext.getOrCreateDuplicatedContext(context);
Context duplicatedContext = VertxContext.createNewDuplicatedContext(context);
setContextSafe(duplicatedContext, true);
Scope scope = QuarkusContextStorage.INSTANCE.attach(duplicatedContext, spanContext);
return spanOperation(duplicatedContext, (REQ) request, toMultiMap(headers), spanContext, scope);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,9 +54,10 @@ public ClientRequestContextImpl(RestClientRequestContext restClientRequestContex
this.configuration = configuration;
this.headersMap = new ClientRequestHeadersMap(); //restClientRequestContext.requestHeaders.getHeaders()

// Capture or create a duplicated context, and store it.
// Always create a duplicated context because each REST Client invocation must have its own context
// A separate context allows integrations like OTel to create a separate Span for each invocation (expected)
Context current = client.vertx.getOrCreateContext();
this.context = VertxContext.getOrCreateDuplicatedContext(current);
this.context = VertxContext.createNewDuplicatedContext(current);
restClientRequestContext.properties.put(VERTX_CONTEXT_PROPERTY, context);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@
import javax.ws.rs.Path;
import javax.ws.rs.QueryParam;

import org.eclipse.microprofile.rest.client.inject.RestClient;

import io.opentelemetry.api.trace.Span;
import io.opentelemetry.api.trace.Tracer;
import io.smallrye.mutiny.Uni;
Expand All @@ -16,6 +18,9 @@
public class ReactiveResource {
@Inject
Tracer tracer;
@Inject
@RestClient
ReactiveRestClient client;

@GET
public Uni<String> helloGet(@QueryParam("name") String name) {
Expand All @@ -24,6 +29,13 @@ public Uni<String> helloGet(@QueryParam("name") String name) {
.eventually((Runnable) span::end);
}

@GET
@Path("/multiple")
public Uni<String> helloMultiple() {
return Uni.combine().all().unis(client.helloGet("Naruto"), client.helloGet("Goku"))
.combinedWith((s, s2) -> s + " and " + s2);
}

@POST
public Uni<String> helloPost(String body) {
Span span = tracer.spanBuilder("helloPost").startSpan();
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
package io.quarkus.it.opentelemetry.reactive;

import javax.ws.rs.GET;
import javax.ws.rs.POST;
import javax.ws.rs.Path;
import javax.ws.rs.QueryParam;

import org.eclipse.microprofile.rest.client.inject.RegisterRestClient;

import io.smallrye.mutiny.Uni;

@RegisterRestClient(configKey = "client")
@Path("/reactive")
interface ReactiveRestClient {
@GET
Uni<String> helloGet(@QueryParam("name") String name);

@POST
Uni<String> helloPost(String body);
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,7 @@
import javax.inject.Inject;
import javax.ws.rs.GET;
import javax.ws.rs.POST;
import javax.ws.rs.Path;
import javax.ws.rs.QueryParam;

import org.eclipse.microprofile.rest.client.inject.RegisterRestClient;
import org.eclipse.microprofile.rest.client.inject.RestClient;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.Test;
Expand Down Expand Up @@ -111,16 +108,6 @@ void post() {
assertEquals(HttpMethod.POST.name(), ((Map<?, ?>) client.get("attributes")).get(HTTP_METHOD.getKey()));
}

@RegisterRestClient(configKey = "client")
@Path("/reactive")
interface ReactiveRestClient {
@GET
Uni<String> helloGet(@QueryParam("name") String name);

@POST
Uni<String> helloPost(String body);
}

private static List<Map<String, Object>> getSpans() {
return when().get("/export").body().as(new TypeRef<>() {
});
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
package io.quarkus.it.opentelemetry.reactive;

import io.quarkus.test.junit.QuarkusIntegrationTest;

@QuarkusIntegrationTest
public class OpenTelemetryReactiveIT extends OpenTelemetryReactiveTest {
}
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
import static io.restassured.RestAssured.given;
import static io.restassured.RestAssured.when;
import static java.net.HttpURLConnection.HTTP_OK;
import static java.util.stream.Collectors.toSet;
import static org.awaitility.Awaitility.await;
import static org.hamcrest.CoreMatchers.equalTo;
import static org.junit.jupiter.api.Assertions.assertEquals;
Expand Down Expand Up @@ -59,6 +60,23 @@ void post() {
assertEquals(spans.get(0).get("traceId"), spans.get(1).get("traceId"));
}

@Test
void multiple() {
given()
.contentType("application/json")
.when()
.get("/reactive/multiple")
.then()
.statusCode(200)
.body(equalTo("Hello Naruto and Hello Goku"));

await().atMost(5, TimeUnit.SECONDS).until(() -> getSpans().size() == 7);

List<Map<String, Object>> spans = getSpans();
assertEquals(7, spans.size());
assertEquals(1, spans.stream().map(map -> map.get("traceId")).collect(toSet()).size());
}

private static List<Map<String, Object>> getSpans() {
return when().get("/export").body().as(new TypeRef<>() {
});
Expand Down

0 comments on commit 64f47d0

Please sign in to comment.