Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

gRPC and SSE coverage for OpenTelemetry #999

Merged
merged 1 commit into from
Jan 18, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
48 changes: 48 additions & 0 deletions monitoring/opentelemetry-reactive/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -27,10 +27,58 @@
<groupId>io.quarkus</groupId>
<artifactId>quarkus-opentelemetry</artifactId>
</dependency>
<dependency>
<groupId>io.quarkus</groupId>
<artifactId>quarkus-grpc</artifactId>
</dependency>
<dependency>
<groupId>io.quarkus.qe</groupId>
<artifactId>quarkus-test-service-jaeger</artifactId>
<scope>test</scope>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>${quarkus.platform.group-id}</groupId>
<artifactId>quarkus-maven-plugin</artifactId>
<executions>
<execution>
<goals>
<goal>generate-code</goal>
<goal>generate-code-tests</goal>
<goal>build</goal>
</goals>
</execution>
</executions>
</plugin>
</plugins>
</build>
<profiles>
<!-- Skipped on Windows as does not support Linux Containers / Testcontainers -->
<profile>
<id>skip-tests-on-windows</id>
<activation>
<os>
<family>windows</family>
</os>
</activation>
<build>
<plugins>
<plugin>
<artifactId>maven-surefire-plugin</artifactId>
<configuration>
<skip>true</skip>
</configuration>
</plugin>
<plugin>
<artifactId>maven-failsafe-plugin</artifactId>
<configuration>
<skip>true</skip>
</configuration>
</plugin>
</plugins>
</build>
</profile>
</profiles>
</project>
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
package io.quarkus.ts.opentelemetry.reactive.grpc;

import javax.inject.Inject;
import javax.ws.rs.GET;
import javax.ws.rs.Path;
import javax.ws.rs.Produces;
import javax.ws.rs.core.MediaType;

import io.quarkus.example.PongRequest;
import io.quarkus.example.PongServiceGrpc;
import io.quarkus.grpc.GrpcClient;
import io.quarkus.ts.opentelemetry.reactive.traceable.TraceableResource;

@Path("/grpc-ping")
public class GrpcPingResource extends TraceableResource {

@Inject
@GrpcClient("pong")
PongServiceGrpc.PongServiceBlockingStub pongClient;

@GET
@Produces(MediaType.TEXT_PLAIN)
public String getPing() {
recordTraceId();

return "ping " + pongClient.sayPong(PongRequest.newBuilder().build()).getMessage();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
package io.quarkus.ts.opentelemetry.reactive.grpc;

import javax.inject.Inject;
import javax.ws.rs.GET;
import javax.ws.rs.Path;
import javax.ws.rs.Produces;
import javax.ws.rs.core.MediaType;

import org.jboss.logging.Logger;

import io.quarkus.example.LastTraceIdRequest;
import io.quarkus.example.PongServiceGrpc;
import io.quarkus.grpc.GrpcClient;
import io.quarkus.ts.opentelemetry.reactive.traceable.TraceableResource;

@Path("/grpc-pong")
public class GrpcPongResource {

@Inject
@GrpcClient("pong")
PongServiceGrpc.PongServiceBlockingStub pongClient;

private static final Logger LOG = Logger.getLogger(TraceableResource.class);

@GET
@Path("/lastTraceId")
@Produces(MediaType.TEXT_PLAIN)
public String getLastTraceId() {
String lastTraceId = pongClient.returnLastTraceId(LastTraceIdRequest.newBuilder().build()).getMessage();
LOG.info("Recorded trace ID: " + lastTraceId);
return lastTraceId;
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
package io.quarkus.ts.opentelemetry.reactive.grpc;

import org.jboss.logmanager.MDC;

import io.grpc.stub.StreamObserver;
import io.quarkus.example.LastTraceIdReply;
import io.quarkus.example.LastTraceIdRequest;
import io.quarkus.example.PongReply;
import io.quarkus.example.PongRequest;
import io.quarkus.example.PongServiceGrpc;
import io.quarkus.grpc.GrpcService;

@GrpcService
public class GrpcPongService extends PongServiceGrpc.PongServiceImplBase {

private String lastTraceId;

@Override
public void sayPong(PongRequest request, StreamObserver<PongReply> responseObserver) {
lastTraceId = MDC.get("traceId");
responseObserver.onNext(PongReply.newBuilder().setMessage("pong").build());
responseObserver.onCompleted();
}

@Override
public void returnLastTraceId(LastTraceIdRequest request, StreamObserver<LastTraceIdReply> responseObserver) {
responseObserver.onNext(LastTraceIdReply.newBuilder().setMessage(getLastTraceId()).build());
responseObserver.onCompleted();
}

public String getLastTraceId() {
return lastTraceId;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
package io.quarkus.ts.opentelemetry.reactive.sse;

import javax.inject.Inject;
import javax.ws.rs.GET;
import javax.ws.rs.Path;
import javax.ws.rs.Produces;
import javax.ws.rs.core.MediaType;

import org.eclipse.microprofile.rest.client.inject.RestClient;

import io.quarkus.ts.opentelemetry.reactive.traceable.TraceableResource;
import io.smallrye.mutiny.Multi;

@Path("/server-sent-events-ping")
public class ServerSentEventsPingResource extends TraceableResource {

@Inject
@RestClient
ServerSentEventsPongClient pongClient;

@GET
@Produces(MediaType.SERVER_SENT_EVENTS)
public Multi<String> getPing() {
recordTraceId();
return pongClient.getPong().map(response -> "ping " + response);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
package io.quarkus.ts.opentelemetry.reactive.sse;

import javax.ws.rs.GET;
import javax.ws.rs.Path;
import javax.ws.rs.Produces;
import javax.ws.rs.core.MediaType;

import org.eclipse.microprofile.rest.client.inject.RegisterRestClient;

import io.smallrye.mutiny.Multi;

@RegisterRestClient
public interface ServerSentEventsPongClient {
@GET
@Path("/server-sent-events-pong")
@Produces(MediaType.SERVER_SENT_EVENTS)
Multi<String> getPong();

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
package io.quarkus.ts.opentelemetry.reactive.sse;

import javax.ws.rs.GET;
import javax.ws.rs.Path;
import javax.ws.rs.Produces;
import javax.ws.rs.core.MediaType;

import io.quarkus.ts.opentelemetry.reactive.traceable.TraceableResource;
import io.smallrye.mutiny.Multi;

@Path("/server-sent-events-pong")
public class ServerSentEventsPongResource extends TraceableResource {

@GET
@Produces(MediaType.SERVER_SENT_EVENTS)
public Multi<String> getPong() {
recordTraceId();
return Multi.createFrom().item("pong");
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
package io.quarkus.ts.opentelemetry.reactive.traceable;

import javax.ws.rs.GET;
import javax.ws.rs.Path;
import javax.ws.rs.Produces;
import javax.ws.rs.core.MediaType;

import org.jboss.logging.Logger;
import org.jboss.logmanager.MDC;

public abstract class TraceableResource {

private static final Logger LOG = Logger.getLogger(TraceableResource.class);

private String lastTraceId;

@GET
@Path("/lastTraceId")
@Produces(MediaType.TEXT_PLAIN)
public String getLastTraceId() {
return lastTraceId;
}

protected void recordTraceId() {
lastTraceId = MDC.get("traceId");
LOG.info("Recorded trace ID: " + lastTraceId);
}
}
26 changes: 26 additions & 0 deletions monitoring/opentelemetry-reactive/src/main/proto/pong.proto
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
syntax = "proto3";

option java_multiple_files = true;
option java_package = "io.quarkus.example";
option java_outer_classname = "PongProto";

package io.quarkus.example;

service PongService {
rpc SayPong (PongRequest) returns (PongReply) {}
rpc ReturnLastTraceId (LastTraceIdRequest) returns (LastTraceIdReply) {}
}

message PongRequest {
}

message PongReply {
string message = 1;
}

message LastTraceIdRequest {
}

message LastTraceIdReply {
string message = 1;
}
Original file line number Diff line number Diff line change
@@ -1,3 +1,10 @@
io.quarkus.ts.opentelemetry.reactive.PingPongService/mp-rest/url=${pongservice_url}:${pongservice_port}
io.quarkus.ts.opentelemetry.reactive.PingPongService/mp-rest/scope=javax.inject.Singleton

io.quarkus.ts.opentelemetry.reactive.sse.ServerSentEventsPongClient/mp-rest/url=http://localhost:${quarkus.http.port}
io.quarkus.ts.opentelemetry.reactive.sse.ServerSentEventsPongClient/mp-rest/scope=javax.inject.Singleton

# gRPC
quarkus.grpc.clients.pong.host=localhost

quarkus.application.name=pingpong
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
package io.quarkus.ts.opentelemetry.reactive;

import static io.restassured.RestAssured.given;
import static org.awaitility.Awaitility.await;
import static org.hamcrest.Matchers.allOf;
import static org.hamcrest.Matchers.containsString;
import static org.junit.jupiter.api.Assertions.assertEquals;

import java.util.concurrent.TimeUnit;

import org.apache.http.HttpStatus;
import org.junit.jupiter.api.Test;

import io.quarkus.test.bootstrap.JaegerService;
import io.quarkus.test.bootstrap.RestService;
import io.quarkus.test.scenarios.QuarkusScenario;
import io.quarkus.test.services.JaegerContainer;
import io.quarkus.test.services.QuarkusApplication;

@QuarkusScenario
public class OpenTelemetryGrpcIT {

@JaegerContainer(useOtlpCollector = true)
static final JaegerService jaeger = new JaegerService();

@QuarkusApplication()
static RestService app = new RestService()
.withProperty("quarkus.application.name", "pingpong")
.withProperty("quarkus.opentelemetry.tracer.exporter.otlp.endpoint", jaeger::getCollectorUrl);

private static final String PING_ENDPOINT = "/grpc-ping";
private static final String PONG_ENDPOINT = "/grpc-pong";
private static final String SAY_PONG_PROTO = "SayPong";

@Test
public void testServerClientTrace() throws InterruptedException {
// When calling ping, the rest will invoke also the pong rest endpoint.
given()
.when().get(PING_ENDPOINT)
.then().statusCode(HttpStatus.SC_OK)
.body(containsString("ping pong"));

// Then both ping and pong rest endpoints should have the same trace Id.
String pingTraceId = given()
.when().get(PING_ENDPOINT + "/lastTraceId")
.then().statusCode(HttpStatus.SC_OK).and().extract().asString();

assertTraceIdWithPongService(pingTraceId);

// Then Jaeger is invoked
await().atMost(5, TimeUnit.SECONDS).untilAsserted(() -> given()
.when().get(jaeger.getTraceUrl() + "?traceID=" + pingTraceId)
.then().statusCode(HttpStatus.SC_OK)
.and().body(allOf(containsString(PING_ENDPOINT), containsString(SAY_PONG_PROTO))));
}

protected void assertTraceIdWithPongService(String expected) {
String pongTraceId = given()
.when().get(PONG_ENDPOINT + "/lastTraceId")
.then().statusCode(HttpStatus.SC_OK).and().extract().asString();

assertEquals(expected, pongTraceId);
}

}
Loading