diff --git a/monitoring/opentelemetry-reactive/pom.xml b/monitoring/opentelemetry-reactive/pom.xml index 2c3093c2a..60e95c552 100644 --- a/monitoring/opentelemetry-reactive/pom.xml +++ b/monitoring/opentelemetry-reactive/pom.xml @@ -27,10 +27,58 @@ io.quarkus quarkus-opentelemetry + + io.quarkus + quarkus-grpc + io.quarkus.qe quarkus-test-service-jaeger test + + + + ${quarkus.platform.group-id} + quarkus-maven-plugin + + + + generate-code + generate-code-tests + build + + + + + + + + + + skip-tests-on-windows + + + windows + + + + + + maven-surefire-plugin + + true + + + + maven-failsafe-plugin + + true + + + + + + diff --git a/monitoring/opentelemetry-reactive/src/main/java/io/quarkus/ts/opentelemetry/reactive/grpc/GrpcPingResource.java b/monitoring/opentelemetry-reactive/src/main/java/io/quarkus/ts/opentelemetry/reactive/grpc/GrpcPingResource.java new file mode 100644 index 000000000..d936dcaf9 --- /dev/null +++ b/monitoring/opentelemetry-reactive/src/main/java/io/quarkus/ts/opentelemetry/reactive/grpc/GrpcPingResource.java @@ -0,0 +1,28 @@ +package io.quarkus.ts.opentelemetry.reactive.grpc; + +import javax.inject.Inject; +import javax.ws.rs.GET; +import javax.ws.rs.Path; +import javax.ws.rs.Produces; +import javax.ws.rs.core.MediaType; + +import io.quarkus.example.PongRequest; +import io.quarkus.example.PongServiceGrpc; +import io.quarkus.grpc.GrpcClient; +import io.quarkus.ts.opentelemetry.reactive.traceable.TraceableResource; + +@Path("/grpc-ping") +public class GrpcPingResource extends TraceableResource { + + @Inject + @GrpcClient("pong") + PongServiceGrpc.PongServiceBlockingStub pongClient; + + @GET + @Produces(MediaType.TEXT_PLAIN) + public String getPing() { + recordTraceId(); + + return "ping " + pongClient.sayPong(PongRequest.newBuilder().build()).getMessage(); + } +} diff --git a/monitoring/opentelemetry-reactive/src/main/java/io/quarkus/ts/opentelemetry/reactive/grpc/GrpcPongResource.java b/monitoring/opentelemetry-reactive/src/main/java/io/quarkus/ts/opentelemetry/reactive/grpc/GrpcPongResource.java new file mode 100644 index 000000000..bdce6714e --- /dev/null +++ b/monitoring/opentelemetry-reactive/src/main/java/io/quarkus/ts/opentelemetry/reactive/grpc/GrpcPongResource.java @@ -0,0 +1,34 @@ +package io.quarkus.ts.opentelemetry.reactive.grpc; + +import javax.inject.Inject; +import javax.ws.rs.GET; +import javax.ws.rs.Path; +import javax.ws.rs.Produces; +import javax.ws.rs.core.MediaType; + +import org.jboss.logging.Logger; + +import io.quarkus.example.LastTraceIdRequest; +import io.quarkus.example.PongServiceGrpc; +import io.quarkus.grpc.GrpcClient; +import io.quarkus.ts.opentelemetry.reactive.traceable.TraceableResource; + +@Path("/grpc-pong") +public class GrpcPongResource { + + @Inject + @GrpcClient("pong") + PongServiceGrpc.PongServiceBlockingStub pongClient; + + private static final Logger LOG = Logger.getLogger(TraceableResource.class); + + @GET + @Path("/lastTraceId") + @Produces(MediaType.TEXT_PLAIN) + public String getLastTraceId() { + String lastTraceId = pongClient.returnLastTraceId(LastTraceIdRequest.newBuilder().build()).getMessage(); + LOG.info("Recorded trace ID: " + lastTraceId); + return lastTraceId; + } + +} diff --git a/monitoring/opentelemetry-reactive/src/main/java/io/quarkus/ts/opentelemetry/reactive/grpc/GrpcPongService.java b/monitoring/opentelemetry-reactive/src/main/java/io/quarkus/ts/opentelemetry/reactive/grpc/GrpcPongService.java new file mode 100644 index 000000000..9c112f1cb --- /dev/null +++ b/monitoring/opentelemetry-reactive/src/main/java/io/quarkus/ts/opentelemetry/reactive/grpc/GrpcPongService.java @@ -0,0 +1,34 @@ +package io.quarkus.ts.opentelemetry.reactive.grpc; + +import org.jboss.logmanager.MDC; + +import io.grpc.stub.StreamObserver; +import io.quarkus.example.LastTraceIdReply; +import io.quarkus.example.LastTraceIdRequest; +import io.quarkus.example.PongReply; +import io.quarkus.example.PongRequest; +import io.quarkus.example.PongServiceGrpc; +import io.quarkus.grpc.GrpcService; + +@GrpcService +public class GrpcPongService extends PongServiceGrpc.PongServiceImplBase { + + private String lastTraceId; + + @Override + public void sayPong(PongRequest request, StreamObserver responseObserver) { + lastTraceId = MDC.get("traceId"); + responseObserver.onNext(PongReply.newBuilder().setMessage("pong").build()); + responseObserver.onCompleted(); + } + + @Override + public void returnLastTraceId(LastTraceIdRequest request, StreamObserver responseObserver) { + responseObserver.onNext(LastTraceIdReply.newBuilder().setMessage(getLastTraceId()).build()); + responseObserver.onCompleted(); + } + + public String getLastTraceId() { + return lastTraceId; + } +} diff --git a/monitoring/opentelemetry-reactive/src/main/java/io/quarkus/ts/opentelemetry/reactive/sse/ServerSentEventsPingResource.java b/monitoring/opentelemetry-reactive/src/main/java/io/quarkus/ts/opentelemetry/reactive/sse/ServerSentEventsPingResource.java new file mode 100644 index 000000000..801eb6e13 --- /dev/null +++ b/monitoring/opentelemetry-reactive/src/main/java/io/quarkus/ts/opentelemetry/reactive/sse/ServerSentEventsPingResource.java @@ -0,0 +1,27 @@ +package io.quarkus.ts.opentelemetry.reactive.sse; + +import javax.inject.Inject; +import javax.ws.rs.GET; +import javax.ws.rs.Path; +import javax.ws.rs.Produces; +import javax.ws.rs.core.MediaType; + +import org.eclipse.microprofile.rest.client.inject.RestClient; + +import io.quarkus.ts.opentelemetry.reactive.traceable.TraceableResource; +import io.smallrye.mutiny.Multi; + +@Path("/server-sent-events-ping") +public class ServerSentEventsPingResource extends TraceableResource { + + @Inject + @RestClient + ServerSentEventsPongClient pongClient; + + @GET + @Produces(MediaType.SERVER_SENT_EVENTS) + public Multi getPing() { + recordTraceId(); + return pongClient.getPong().map(response -> "ping " + response); + } +} diff --git a/monitoring/opentelemetry-reactive/src/main/java/io/quarkus/ts/opentelemetry/reactive/sse/ServerSentEventsPongClient.java b/monitoring/opentelemetry-reactive/src/main/java/io/quarkus/ts/opentelemetry/reactive/sse/ServerSentEventsPongClient.java new file mode 100644 index 000000000..3ae017944 --- /dev/null +++ b/monitoring/opentelemetry-reactive/src/main/java/io/quarkus/ts/opentelemetry/reactive/sse/ServerSentEventsPongClient.java @@ -0,0 +1,19 @@ +package io.quarkus.ts.opentelemetry.reactive.sse; + +import javax.ws.rs.GET; +import javax.ws.rs.Path; +import javax.ws.rs.Produces; +import javax.ws.rs.core.MediaType; + +import org.eclipse.microprofile.rest.client.inject.RegisterRestClient; + +import io.smallrye.mutiny.Multi; + +@RegisterRestClient +public interface ServerSentEventsPongClient { + @GET + @Path("/server-sent-events-pong") + @Produces(MediaType.SERVER_SENT_EVENTS) + Multi getPong(); + +} diff --git a/monitoring/opentelemetry-reactive/src/main/java/io/quarkus/ts/opentelemetry/reactive/sse/ServerSentEventsPongResource.java b/monitoring/opentelemetry-reactive/src/main/java/io/quarkus/ts/opentelemetry/reactive/sse/ServerSentEventsPongResource.java new file mode 100644 index 000000000..84a52c208 --- /dev/null +++ b/monitoring/opentelemetry-reactive/src/main/java/io/quarkus/ts/opentelemetry/reactive/sse/ServerSentEventsPongResource.java @@ -0,0 +1,20 @@ +package io.quarkus.ts.opentelemetry.reactive.sse; + +import javax.ws.rs.GET; +import javax.ws.rs.Path; +import javax.ws.rs.Produces; +import javax.ws.rs.core.MediaType; + +import io.quarkus.ts.opentelemetry.reactive.traceable.TraceableResource; +import io.smallrye.mutiny.Multi; + +@Path("/server-sent-events-pong") +public class ServerSentEventsPongResource extends TraceableResource { + + @GET + @Produces(MediaType.SERVER_SENT_EVENTS) + public Multi getPong() { + recordTraceId(); + return Multi.createFrom().item("pong"); + } +} diff --git a/monitoring/opentelemetry-reactive/src/main/java/io/quarkus/ts/opentelemetry/reactive/traceable/TraceableResource.java b/monitoring/opentelemetry-reactive/src/main/java/io/quarkus/ts/opentelemetry/reactive/traceable/TraceableResource.java new file mode 100644 index 000000000..7cb825458 --- /dev/null +++ b/monitoring/opentelemetry-reactive/src/main/java/io/quarkus/ts/opentelemetry/reactive/traceable/TraceableResource.java @@ -0,0 +1,28 @@ +package io.quarkus.ts.opentelemetry.reactive.traceable; + +import javax.ws.rs.GET; +import javax.ws.rs.Path; +import javax.ws.rs.Produces; +import javax.ws.rs.core.MediaType; + +import org.jboss.logging.Logger; +import org.jboss.logmanager.MDC; + +public abstract class TraceableResource { + + private static final Logger LOG = Logger.getLogger(TraceableResource.class); + + private String lastTraceId; + + @GET + @Path("/lastTraceId") + @Produces(MediaType.TEXT_PLAIN) + public String getLastTraceId() { + return lastTraceId; + } + + protected void recordTraceId() { + lastTraceId = MDC.get("traceId"); + LOG.info("Recorded trace ID: " + lastTraceId); + } +} diff --git a/monitoring/opentelemetry-reactive/src/main/proto/pong.proto b/monitoring/opentelemetry-reactive/src/main/proto/pong.proto new file mode 100644 index 000000000..a62e5faec --- /dev/null +++ b/monitoring/opentelemetry-reactive/src/main/proto/pong.proto @@ -0,0 +1,26 @@ +syntax = "proto3"; + +option java_multiple_files = true; +option java_package = "io.quarkus.example"; +option java_outer_classname = "PongProto"; + +package io.quarkus.example; + +service PongService { + rpc SayPong (PongRequest) returns (PongReply) {} + rpc ReturnLastTraceId (LastTraceIdRequest) returns (LastTraceIdReply) {} +} + +message PongRequest { +} + +message PongReply { + string message = 1; +} + +message LastTraceIdRequest { +} + +message LastTraceIdReply { + string message = 1; +} \ No newline at end of file diff --git a/monitoring/opentelemetry-reactive/src/main/resources/application.properties b/monitoring/opentelemetry-reactive/src/main/resources/application.properties index 3cbbc00ae..31d273872 100644 --- a/monitoring/opentelemetry-reactive/src/main/resources/application.properties +++ b/monitoring/opentelemetry-reactive/src/main/resources/application.properties @@ -1,3 +1,10 @@ io.quarkus.ts.opentelemetry.reactive.PingPongService/mp-rest/url=${pongservice_url}:${pongservice_port} io.quarkus.ts.opentelemetry.reactive.PingPongService/mp-rest/scope=javax.inject.Singleton +io.quarkus.ts.opentelemetry.reactive.sse.ServerSentEventsPongClient/mp-rest/url=http://localhost:${quarkus.http.port} +io.quarkus.ts.opentelemetry.reactive.sse.ServerSentEventsPongClient/mp-rest/scope=javax.inject.Singleton + +# gRPC +quarkus.grpc.clients.pong.host=localhost + +quarkus.application.name=pingpong \ No newline at end of file diff --git a/monitoring/opentelemetry-reactive/src/test/java/io/quarkus/ts/opentelemetry/reactive/OpenTelemetryGrpcIT.java b/monitoring/opentelemetry-reactive/src/test/java/io/quarkus/ts/opentelemetry/reactive/OpenTelemetryGrpcIT.java new file mode 100644 index 000000000..57d49c2f1 --- /dev/null +++ b/monitoring/opentelemetry-reactive/src/test/java/io/quarkus/ts/opentelemetry/reactive/OpenTelemetryGrpcIT.java @@ -0,0 +1,65 @@ +package io.quarkus.ts.opentelemetry.reactive; + +import static io.restassured.RestAssured.given; +import static org.awaitility.Awaitility.await; +import static org.hamcrest.Matchers.allOf; +import static org.hamcrest.Matchers.containsString; +import static org.junit.jupiter.api.Assertions.assertEquals; + +import java.util.concurrent.TimeUnit; + +import org.apache.http.HttpStatus; +import org.junit.jupiter.api.Test; + +import io.quarkus.test.bootstrap.JaegerService; +import io.quarkus.test.bootstrap.RestService; +import io.quarkus.test.scenarios.QuarkusScenario; +import io.quarkus.test.services.JaegerContainer; +import io.quarkus.test.services.QuarkusApplication; + +@QuarkusScenario +public class OpenTelemetryGrpcIT { + + @JaegerContainer(useOtlpCollector = true) + static final JaegerService jaeger = new JaegerService(); + + @QuarkusApplication() + static RestService app = new RestService() + .withProperty("quarkus.application.name", "pingpong") + .withProperty("quarkus.opentelemetry.tracer.exporter.otlp.endpoint", jaeger::getCollectorUrl); + + private static final String PING_ENDPOINT = "/grpc-ping"; + private static final String PONG_ENDPOINT = "/grpc-pong"; + private static final String SAY_PONG_PROTO = "SayPong"; + + @Test + public void testServerClientTrace() throws InterruptedException { + // When calling ping, the rest will invoke also the pong rest endpoint. + given() + .when().get(PING_ENDPOINT) + .then().statusCode(HttpStatus.SC_OK) + .body(containsString("ping pong")); + + // Then both ping and pong rest endpoints should have the same trace Id. + String pingTraceId = given() + .when().get(PING_ENDPOINT + "/lastTraceId") + .then().statusCode(HttpStatus.SC_OK).and().extract().asString(); + + assertTraceIdWithPongService(pingTraceId); + + // Then Jaeger is invoked + await().atMost(5, TimeUnit.SECONDS).untilAsserted(() -> given() + .when().get(jaeger.getTraceUrl() + "?traceID=" + pingTraceId) + .then().statusCode(HttpStatus.SC_OK) + .and().body(allOf(containsString(PING_ENDPOINT), containsString(SAY_PONG_PROTO)))); + } + + protected void assertTraceIdWithPongService(String expected) { + String pongTraceId = given() + .when().get(PONG_ENDPOINT + "/lastTraceId") + .then().statusCode(HttpStatus.SC_OK).and().extract().asString(); + + assertEquals(expected, pongTraceId); + } + +} diff --git a/monitoring/opentelemetry-reactive/src/test/java/io/quarkus/ts/opentelemetry/reactive/OpenTelemetrySseIT.java b/monitoring/opentelemetry-reactive/src/test/java/io/quarkus/ts/opentelemetry/reactive/OpenTelemetrySseIT.java new file mode 100644 index 000000000..26910a9dc --- /dev/null +++ b/monitoring/opentelemetry-reactive/src/test/java/io/quarkus/ts/opentelemetry/reactive/OpenTelemetrySseIT.java @@ -0,0 +1,64 @@ +package io.quarkus.ts.opentelemetry.reactive; + +import static io.restassured.RestAssured.given; +import static org.awaitility.Awaitility.await; +import static org.hamcrest.Matchers.allOf; +import static org.hamcrest.Matchers.containsString; +import static org.junit.jupiter.api.Assertions.assertEquals; + +import java.util.concurrent.TimeUnit; + +import org.apache.http.HttpStatus; +import org.junit.jupiter.api.Test; + +import io.quarkus.test.bootstrap.JaegerService; +import io.quarkus.test.bootstrap.RestService; +import io.quarkus.test.scenarios.QuarkusScenario; +import io.quarkus.test.services.JaegerContainer; +import io.quarkus.test.services.QuarkusApplication; + +@QuarkusScenario +public class OpenTelemetrySseIT { + + @JaegerContainer(useOtlpCollector = true) + static final JaegerService jaeger = new JaegerService(); + + @QuarkusApplication() + static RestService app = new RestService() + .withProperty("quarkus.application.name", "pingpong") + .withProperty("quarkus.opentelemetry.tracer.exporter.otlp.endpoint", jaeger::getCollectorUrl); + + private static final String PING_ENDPOINT = "/server-sent-events-ping"; + private static final String PONG_ENDPOINT = "/server-sent-events-pong"; + + @Test + public void testServerClientTrace() throws InterruptedException { + // When calling ping, the rest will invoke also the pong rest endpoint. + given() + .when().get(PING_ENDPOINT) + .then().statusCode(HttpStatus.SC_OK) + .body(containsString("ping pong")); + + // Then both ping and pong rest endpoints should have the same trace Id. + String pingTraceId = given() + .when().get(PING_ENDPOINT + "/lastTraceId") + .then().statusCode(HttpStatus.SC_OK).and().extract().asString(); + + assertTraceIdWithPongService(pingTraceId); + + // Then Jaeger is invoked + await().atMost(5, TimeUnit.SECONDS).untilAsserted(() -> given() + .when().get(jaeger.getTraceUrl() + "?traceID=" + pingTraceId) + .then().statusCode(HttpStatus.SC_OK) + .and().body(allOf(containsString(PING_ENDPOINT), containsString(PONG_ENDPOINT)))); + } + + protected void assertTraceIdWithPongService(String expected) { + String pongTraceId = given() + .when().get(PONG_ENDPOINT + "/lastTraceId") + .then().statusCode(HttpStatus.SC_OK).and().extract().asString(); + + assertEquals(expected, pongTraceId); + } + +} diff --git a/monitoring/opentelemetry/pom.xml b/monitoring/opentelemetry/pom.xml index 73d512c4c..8a155b933 100644 --- a/monitoring/opentelemetry/pom.xml +++ b/monitoring/opentelemetry/pom.xml @@ -23,14 +23,66 @@ io.quarkus quarkus-resteasy + + io.quarkus + quarkus-resteasy-mutiny + io.quarkus quarkus-opentelemetry + + io.quarkus + quarkus-grpc + io.quarkus.qe quarkus-test-service-jaeger test + + + + ${quarkus.platform.group-id} + quarkus-maven-plugin + + + + generate-code + generate-code-tests + build + + + + + + + + + + skip-tests-on-windows + + + windows + + + + + + maven-surefire-plugin + + true + + + + maven-failsafe-plugin + + true + + + + + + diff --git a/monitoring/opentelemetry/src/main/java/io/quarkus/ts/opentelemetry/grpc/GrpcPingResource.java b/monitoring/opentelemetry/src/main/java/io/quarkus/ts/opentelemetry/grpc/GrpcPingResource.java new file mode 100644 index 000000000..14507e813 --- /dev/null +++ b/monitoring/opentelemetry/src/main/java/io/quarkus/ts/opentelemetry/grpc/GrpcPingResource.java @@ -0,0 +1,28 @@ +package io.quarkus.ts.opentelemetry.grpc; + +import javax.inject.Inject; +import javax.ws.rs.GET; +import javax.ws.rs.Path; +import javax.ws.rs.Produces; +import javax.ws.rs.core.MediaType; + +import io.quarkus.example.PongRequest; +import io.quarkus.example.PongServiceGrpc; +import io.quarkus.grpc.GrpcClient; +import io.quarkus.ts.opentelemetry.traceable.TraceableResource; + +@Path("/grpc-ping") +public class GrpcPingResource extends TraceableResource { + + @Inject + @GrpcClient("pong") + PongServiceGrpc.PongServiceBlockingStub pongClient; + + @GET + @Produces(MediaType.TEXT_PLAIN) + public String getPing() { + recordTraceId(); + + return "ping " + pongClient.sayPong(PongRequest.newBuilder().build()).getMessage(); + } +} diff --git a/monitoring/opentelemetry/src/main/java/io/quarkus/ts/opentelemetry/grpc/GrpcPongResource.java b/monitoring/opentelemetry/src/main/java/io/quarkus/ts/opentelemetry/grpc/GrpcPongResource.java new file mode 100644 index 000000000..d35879fe8 --- /dev/null +++ b/monitoring/opentelemetry/src/main/java/io/quarkus/ts/opentelemetry/grpc/GrpcPongResource.java @@ -0,0 +1,34 @@ +package io.quarkus.ts.opentelemetry.grpc; + +import javax.inject.Inject; +import javax.ws.rs.GET; +import javax.ws.rs.Path; +import javax.ws.rs.Produces; +import javax.ws.rs.core.MediaType; + +import org.jboss.logging.Logger; + +import io.quarkus.example.LastTraceIdRequest; +import io.quarkus.example.PongServiceGrpc; +import io.quarkus.grpc.GrpcClient; +import io.quarkus.ts.opentelemetry.traceable.TraceableResource; + +@Path("/grpc-pong") +public class GrpcPongResource { + + @Inject + @GrpcClient("pong") + PongServiceGrpc.PongServiceBlockingStub pongClient; + + private static final Logger LOG = Logger.getLogger(TraceableResource.class); + + @GET + @Path("/lastTraceId") + @Produces(MediaType.TEXT_PLAIN) + public String getLastTraceId() { + String lastTraceId = pongClient.returnLastTraceId(LastTraceIdRequest.newBuilder().build()).getMessage(); + LOG.info("Recorded trace ID: " + lastTraceId); + return lastTraceId; + } + +} diff --git a/monitoring/opentelemetry/src/main/java/io/quarkus/ts/opentelemetry/grpc/GrpcPongService.java b/monitoring/opentelemetry/src/main/java/io/quarkus/ts/opentelemetry/grpc/GrpcPongService.java new file mode 100644 index 000000000..39025a050 --- /dev/null +++ b/monitoring/opentelemetry/src/main/java/io/quarkus/ts/opentelemetry/grpc/GrpcPongService.java @@ -0,0 +1,34 @@ +package io.quarkus.ts.opentelemetry.grpc; + +import org.jboss.logmanager.MDC; + +import io.grpc.stub.StreamObserver; +import io.quarkus.example.LastTraceIdReply; +import io.quarkus.example.LastTraceIdRequest; +import io.quarkus.example.PongReply; +import io.quarkus.example.PongRequest; +import io.quarkus.example.PongServiceGrpc; +import io.quarkus.grpc.GrpcService; + +@GrpcService +public class GrpcPongService extends PongServiceGrpc.PongServiceImplBase { + + private String lastTraceId; + + @Override + public void sayPong(PongRequest request, StreamObserver responseObserver) { + lastTraceId = MDC.get("traceId"); + responseObserver.onNext(PongReply.newBuilder().setMessage("pong").build()); + responseObserver.onCompleted(); + } + + @Override + public void returnLastTraceId(LastTraceIdRequest request, StreamObserver responseObserver) { + responseObserver.onNext(LastTraceIdReply.newBuilder().setMessage(getLastTraceId()).build()); + responseObserver.onCompleted(); + } + + public String getLastTraceId() { + return lastTraceId; + } +} diff --git a/monitoring/opentelemetry/src/main/java/io/quarkus/ts/opentelemetry/sse/ServerSentEventsPingResource.java b/monitoring/opentelemetry/src/main/java/io/quarkus/ts/opentelemetry/sse/ServerSentEventsPingResource.java new file mode 100644 index 000000000..f698d0266 --- /dev/null +++ b/monitoring/opentelemetry/src/main/java/io/quarkus/ts/opentelemetry/sse/ServerSentEventsPingResource.java @@ -0,0 +1,27 @@ +package io.quarkus.ts.opentelemetry.sse; + +import javax.inject.Inject; +import javax.ws.rs.GET; +import javax.ws.rs.Path; +import javax.ws.rs.Produces; +import javax.ws.rs.core.MediaType; + +import org.eclipse.microprofile.rest.client.inject.RestClient; + +import io.quarkus.ts.opentelemetry.traceable.TraceableResource; +import io.smallrye.mutiny.Multi; + +@Path("/server-sent-events-ping") +public class ServerSentEventsPingResource extends TraceableResource { + + @Inject + @RestClient + ServerSentEventsPongClient pongClient; + + @GET + @Produces(MediaType.SERVER_SENT_EVENTS) + public Multi getPing() { + recordTraceId(); + return pongClient.getPong().map(response -> "ping " + response); + } +} diff --git a/monitoring/opentelemetry/src/main/java/io/quarkus/ts/opentelemetry/sse/ServerSentEventsPongClient.java b/monitoring/opentelemetry/src/main/java/io/quarkus/ts/opentelemetry/sse/ServerSentEventsPongClient.java new file mode 100644 index 000000000..9da4cad17 --- /dev/null +++ b/monitoring/opentelemetry/src/main/java/io/quarkus/ts/opentelemetry/sse/ServerSentEventsPongClient.java @@ -0,0 +1,19 @@ +package io.quarkus.ts.opentelemetry.sse; + +import javax.ws.rs.GET; +import javax.ws.rs.Path; +import javax.ws.rs.Produces; +import javax.ws.rs.core.MediaType; + +import org.eclipse.microprofile.rest.client.inject.RegisterRestClient; + +import io.smallrye.mutiny.Multi; + +@RegisterRestClient +public interface ServerSentEventsPongClient { + @GET + @Path("/server-sent-events-pong") + @Produces(MediaType.SERVER_SENT_EVENTS) + Multi getPong(); + +} diff --git a/monitoring/opentelemetry/src/main/java/io/quarkus/ts/opentelemetry/sse/ServerSentEventsPongResource.java b/monitoring/opentelemetry/src/main/java/io/quarkus/ts/opentelemetry/sse/ServerSentEventsPongResource.java new file mode 100644 index 000000000..41b8e6485 --- /dev/null +++ b/monitoring/opentelemetry/src/main/java/io/quarkus/ts/opentelemetry/sse/ServerSentEventsPongResource.java @@ -0,0 +1,20 @@ +package io.quarkus.ts.opentelemetry.sse; + +import javax.ws.rs.GET; +import javax.ws.rs.Path; +import javax.ws.rs.Produces; +import javax.ws.rs.core.MediaType; + +import io.quarkus.ts.opentelemetry.traceable.TraceableResource; +import io.smallrye.mutiny.Multi; + +@Path("/server-sent-events-pong") +public class ServerSentEventsPongResource extends TraceableResource { + + @GET + @Produces(MediaType.SERVER_SENT_EVENTS) + public Multi getPong() { + recordTraceId(); + return Multi.createFrom().item("pong"); + } +} diff --git a/monitoring/opentelemetry/src/main/java/io/quarkus/ts/opentelemetry/traceable/TraceableResource.java b/monitoring/opentelemetry/src/main/java/io/quarkus/ts/opentelemetry/traceable/TraceableResource.java new file mode 100644 index 000000000..cd852b8aa --- /dev/null +++ b/monitoring/opentelemetry/src/main/java/io/quarkus/ts/opentelemetry/traceable/TraceableResource.java @@ -0,0 +1,28 @@ +package io.quarkus.ts.opentelemetry.traceable; + +import javax.ws.rs.GET; +import javax.ws.rs.Path; +import javax.ws.rs.Produces; +import javax.ws.rs.core.MediaType; + +import org.jboss.logging.Logger; +import org.jboss.logmanager.MDC; + +public abstract class TraceableResource { + + private static final Logger LOG = Logger.getLogger(TraceableResource.class); + + private String lastTraceId; + + @GET + @Path("/lastTraceId") + @Produces(MediaType.TEXT_PLAIN) + public String getLastTraceId() { + return lastTraceId; + } + + protected void recordTraceId() { + lastTraceId = MDC.get("traceId"); + LOG.info("Recorded trace ID: " + lastTraceId); + } +} diff --git a/monitoring/opentelemetry/src/main/proto/pong.proto b/monitoring/opentelemetry/src/main/proto/pong.proto new file mode 100644 index 000000000..a62e5faec --- /dev/null +++ b/monitoring/opentelemetry/src/main/proto/pong.proto @@ -0,0 +1,26 @@ +syntax = "proto3"; + +option java_multiple_files = true; +option java_package = "io.quarkus.example"; +option java_outer_classname = "PongProto"; + +package io.quarkus.example; + +service PongService { + rpc SayPong (PongRequest) returns (PongReply) {} + rpc ReturnLastTraceId (LastTraceIdRequest) returns (LastTraceIdReply) {} +} + +message PongRequest { +} + +message PongReply { + string message = 1; +} + +message LastTraceIdRequest { +} + +message LastTraceIdReply { + string message = 1; +} \ No newline at end of file diff --git a/monitoring/opentelemetry/src/main/resources/application.properties b/monitoring/opentelemetry/src/main/resources/application.properties index 4067be4b8..9fc09e8ce 100644 --- a/monitoring/opentelemetry/src/main/resources/application.properties +++ b/monitoring/opentelemetry/src/main/resources/application.properties @@ -1,3 +1,10 @@ io.quarkus.ts.opentelemetry.PingPongService/mp-rest/url=${pongservice_url}:${pongservice_port} io.quarkus.ts.opentelemetry.PingPongService/mp-rest/scope=javax.inject.Singleton +io.quarkus.ts.opentelemetry.sse.ServerSentEventsPongClient/mp-rest/url=http://localhost:${quarkus.http.port} +io.quarkus.ts.opentelemetry.sse.ServerSentEventsPongClient/mp-rest/scope=javax.inject.Singleton + +# gRPC +quarkus.grpc.clients.pong.host=localhost + +quarkus.application.name=pingpong \ No newline at end of file diff --git a/monitoring/opentelemetry/src/test/java/io/quarkus/ts/opentelemetry/OpenTelemetryGrpcIT.java b/monitoring/opentelemetry/src/test/java/io/quarkus/ts/opentelemetry/OpenTelemetryGrpcIT.java new file mode 100644 index 000000000..ce401b4cf --- /dev/null +++ b/monitoring/opentelemetry/src/test/java/io/quarkus/ts/opentelemetry/OpenTelemetryGrpcIT.java @@ -0,0 +1,65 @@ +package io.quarkus.ts.opentelemetry; + +import static io.restassured.RestAssured.given; +import static org.awaitility.Awaitility.await; +import static org.hamcrest.Matchers.allOf; +import static org.hamcrest.Matchers.containsString; +import static org.junit.jupiter.api.Assertions.assertEquals; + +import java.util.concurrent.TimeUnit; + +import org.apache.http.HttpStatus; +import org.junit.jupiter.api.Test; + +import io.quarkus.test.bootstrap.JaegerService; +import io.quarkus.test.bootstrap.RestService; +import io.quarkus.test.scenarios.QuarkusScenario; +import io.quarkus.test.services.JaegerContainer; +import io.quarkus.test.services.QuarkusApplication; + +@QuarkusScenario +public class OpenTelemetryGrpcIT { + + @JaegerContainer(useOtlpCollector = true) + static final JaegerService jaeger = new JaegerService(); + + @QuarkusApplication() + static RestService app = new RestService() + .withProperty("quarkus.application.name", "pingpong") + .withProperty("quarkus.opentelemetry.tracer.exporter.otlp.endpoint", jaeger::getCollectorUrl); + + private static final String PING_ENDPOINT = "/grpc-ping"; + private static final String PONG_ENDPOINT = "/grpc-pong"; + private static final String SAY_PONG_PROTO = "SayPong"; + + @Test + public void testServerClientTrace() throws InterruptedException { + // When calling ping, the rest will invoke also the pong rest endpoint. + given() + .when().get(PING_ENDPOINT) + .then().statusCode(HttpStatus.SC_OK) + .body(containsString("ping pong")); + + // Then both ping and pong rest endpoints should have the same trace Id. + String pingTraceId = given() + .when().get(PING_ENDPOINT + "/lastTraceId") + .then().statusCode(HttpStatus.SC_OK).and().extract().asString(); + + assertTraceIdWithPongService(pingTraceId); + + // Then Jaeger is invoked + await().atMost(5, TimeUnit.SECONDS).untilAsserted(() -> given() + .when().get(jaeger.getTraceUrl() + "?traceID=" + pingTraceId) + .then().statusCode(HttpStatus.SC_OK) + .and().body(allOf(containsString(PING_ENDPOINT), containsString(SAY_PONG_PROTO)))); + } + + protected void assertTraceIdWithPongService(String expected) { + String pongTraceId = given() + .when().get(PONG_ENDPOINT + "/lastTraceId") + .then().statusCode(HttpStatus.SC_OK).and().extract().asString(); + + assertEquals(expected, pongTraceId); + } + +} diff --git a/monitoring/opentelemetry/src/test/java/io/quarkus/ts/opentelemetry/OpenTelemetrySseIT.java b/monitoring/opentelemetry/src/test/java/io/quarkus/ts/opentelemetry/OpenTelemetrySseIT.java new file mode 100644 index 000000000..2b0b63b35 --- /dev/null +++ b/monitoring/opentelemetry/src/test/java/io/quarkus/ts/opentelemetry/OpenTelemetrySseIT.java @@ -0,0 +1,70 @@ +package io.quarkus.ts.opentelemetry; + +import static io.restassured.RestAssured.given; +import static org.awaitility.Awaitility.await; +import static org.hamcrest.Matchers.allOf; +import static org.hamcrest.Matchers.containsString; +import static org.junit.jupiter.api.Assertions.assertEquals; + +import java.util.concurrent.TimeUnit; + +import javax.ws.rs.core.MediaType; + +import org.apache.http.HttpStatus; +import org.junit.jupiter.api.Disabled; +import org.junit.jupiter.api.Test; + +import io.quarkus.test.bootstrap.JaegerService; +import io.quarkus.test.bootstrap.RestService; +import io.quarkus.test.scenarios.QuarkusScenario; +import io.quarkus.test.services.JaegerContainer; +import io.quarkus.test.services.QuarkusApplication; + +@QuarkusScenario +@Disabled("Input from Clement: RESTEasy classic and SSE is barely working, the fact that RESTEasy classic requires a worker " + + "thread can lead to very annoying issue, we recommend to switch to RESTEasy reactive") +public class OpenTelemetrySseIT { + + @JaegerContainer(useOtlpCollector = true) + static final JaegerService jaeger = new JaegerService(); + + @QuarkusApplication() + static RestService app = new RestService() + .withProperty("quarkus.application.name", "pingpong") + .withProperty("quarkus.opentelemetry.tracer.exporter.otlp.endpoint", jaeger::getCollectorUrl); + + private static final String PING_ENDPOINT = "/server-sent-events-ping"; + private static final String PONG_ENDPOINT = "/server-sent-events-pong"; + + @Test + public void testServerClientTrace() throws InterruptedException { + // When calling ping, the rest will invoke also the pong rest endpoint. + given() + .when().get(PING_ENDPOINT) + .then().statusCode(HttpStatus.SC_OK) + .contentType(MediaType.SERVER_SENT_EVENTS) + .body(containsString("ping pong")); + + // Then both ping and pong rest endpoints should have the same trace Id. + String pingTraceId = given() + .when().get(PING_ENDPOINT + "/lastTraceId") + .then().statusCode(HttpStatus.SC_OK).and().extract().asString(); + + assertTraceIdWithPongService(pingTraceId); + + // Then Jaeger is invoked + await().atMost(5, TimeUnit.SECONDS).untilAsserted(() -> given() + .when().get(jaeger.getTraceUrl() + "?traceID=" + pingTraceId) + .then().statusCode(HttpStatus.SC_OK) + .and().body(allOf(containsString(PING_ENDPOINT), containsString(PONG_ENDPOINT)))); + } + + protected void assertTraceIdWithPongService(String expected) { + String pongTraceId = given() + .when().get(PONG_ENDPOINT + "/lastTraceId") + .then().statusCode(HttpStatus.SC_OK).and().extract().asString(); + + assertEquals(expected, pongTraceId); + } + +}