Skip to content

Commit

Permalink
Merge pull request #24117 from radcortez/opentelemetry-grpc
Browse files Browse the repository at this point in the history
OpenTelemetry gRPC support
  • Loading branch information
cescoffier authored Mar 13, 2022
2 parents be471ae + aa5c3bd commit b46fdbb
Show file tree
Hide file tree
Showing 22 changed files with 1,140 additions and 0 deletions.
41 changes: 41 additions & 0 deletions extensions/opentelemetry/opentelemetry/deployment/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,11 @@
<artifactId>quarkus-smallrye-health-deployment</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>io.quarkus</groupId>
<artifactId>quarkus-grpc-deployment</artifactId>
<scope>test</scope>
</dependency>

<dependency>
<groupId>io.quarkus</groupId>
Expand Down Expand Up @@ -121,7 +126,43 @@
</dependencies>

<build>
<extensions>
<extension>
<groupId>kr.motd.maven</groupId>
<artifactId>os-maven-plugin</artifactId>
<version>${os-maven-plugin.version}</version>
</extension>
</extensions>

<plugins>
<plugin>
<groupId>org.xolstice.maven.plugins</groupId>
<artifactId>protobuf-maven-plugin</artifactId>
<version>0.6.1</version>
<configuration>
<protocArtifact>com.google.protobuf:protoc:${protoc.version}:exe:${os.detected.classifier}</protocArtifact>
<pluginId>grpc-java</pluginId>
<pluginArtifact>io.grpc:protoc-gen-grpc-java:${grpc.version}:exe:${os.detected.classifier}</pluginArtifact>
<protocPlugins>
<protocPlugin>
<id>quarkus-grpc-protoc-plugin</id>
<groupId>io.quarkus</groupId>
<artifactId>quarkus-grpc-protoc-plugin</artifactId>
<version>${project.version}</version>
<mainClass>io.quarkus.grpc.protoc.plugin.MutinyGrpcGenerator</mainClass>
</protocPlugin>
</protocPlugins>
</configuration>
<executions>
<execution>
<id>test-compile</id>
<goals>
<goal>test-compile</goal>
<goal>test-compile-custom</goal>
</goals>
</execution>
</executions>
</plugin>
<plugin>
<artifactId>maven-compiler-plugin</artifactId>
<configuration>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,8 @@
import io.quarkus.opentelemetry.runtime.tracing.TracerProducer;
import io.quarkus.opentelemetry.runtime.tracing.TracerRecorder;
import io.quarkus.opentelemetry.runtime.tracing.TracerRuntimeConfig;
import io.quarkus.opentelemetry.runtime.tracing.grpc.GrpcTracingClientInterceptor;
import io.quarkus.opentelemetry.runtime.tracing.grpc.GrpcTracingServerInterceptor;
import io.quarkus.runtime.configuration.ConfigurationException;
import io.quarkus.runtime.configuration.NormalizeRootHttpPathConverter;
import io.quarkus.vertx.core.deployment.VertxOptionsConsumerBuildItem;
Expand Down Expand Up @@ -74,6 +76,16 @@ public boolean getAsBoolean() {
}
}

static class GrpcExtensionAvailable implements BooleanSupplier {
private static final boolean IS_GRPC_EXTENSION_AVAILABLE = isClassPresent(
"io.quarkus.grpc.runtime.GrpcServerRecorder");

@Override
public boolean getAsBoolean() {
return IS_GRPC_EXTENSION_AVAILABLE;
}
}

@BuildStep(onlyIf = TracerEnabled.class)
UnremovableBeanBuildItem ensureProducersAreRetained(
CombinedIndexBuildItem indexBuildItem,
Expand Down Expand Up @@ -167,6 +179,12 @@ void dropNames(Optional<StaticResourcesBuildItem> staticResources,
dropStaticResources.produce(new DropStaticResourcesBuildItem(resources));
}

@BuildStep(onlyIf = { TracerEnabled.class, GrpcExtensionAvailable.class })
void grpcTracers(BuildProducer<AdditionalBeanBuildItem> additionalBeans) {
additionalBeans.produce(new AdditionalBeanBuildItem(GrpcTracingServerInterceptor.class));
additionalBeans.produce(new AdditionalBeanBuildItem(GrpcTracingClientInterceptor.class));
}

@BuildStep(onlyIf = TracerEnabled.class)
@Record(ExecutionTime.STATIC_INIT)
VertxOptionsConsumerBuildItem vertxTracingOptions(TracerRecorder recorder) {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,225 @@
package io.quarkus.opentelemetry.deployment;

import static io.opentelemetry.semconv.trace.attributes.SemanticAttributes.NET_PEER_IP;
import static io.opentelemetry.semconv.trace.attributes.SemanticAttributes.NET_PEER_PORT;
import static io.opentelemetry.semconv.trace.attributes.SemanticAttributes.NET_TRANSPORT;
import static io.opentelemetry.semconv.trace.attributes.SemanticAttributes.RPC_GRPC_STATUS_CODE;
import static io.opentelemetry.semconv.trace.attributes.SemanticAttributes.RPC_METHOD;
import static io.opentelemetry.semconv.trace.attributes.SemanticAttributes.RPC_SERVICE;
import static io.opentelemetry.semconv.trace.attributes.SemanticAttributes.RPC_SYSTEM;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.junit.jupiter.api.Assertions.fail;

import java.time.Duration;
import java.util.List;

import javax.enterprise.context.ApplicationScoped;
import javax.inject.Inject;

import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.RegisterExtension;

import io.grpc.Status;
import io.grpc.stub.StreamObserver;
import io.opentelemetry.api.trace.SpanKind;
import io.opentelemetry.extension.annotations.WithSpan;
import io.opentelemetry.sdk.trace.data.SpanData;
import io.quarkus.grpc.GrpcClient;
import io.quarkus.grpc.GrpcService;
import io.quarkus.test.QuarkusUnitTest;
import io.smallrye.mutiny.Multi;

public class GrpcOpenTelemetryTest {
@RegisterExtension
static final QuarkusUnitTest TEST = new QuarkusUnitTest()
.withApplicationRoot((jar) -> jar
.addClasses(TestSpanExporter.class)
.addClasses(HelloService.class)
.addClasses(GreeterGrpc.class, MutinyGreeterGrpc.class,
Greeter.class, GreeterBean.class, GreeterClient.class,
HelloProto.class, HelloRequest.class, HelloRequestOrBuilder.class,
HelloReply.class, HelloReplyOrBuilder.class)
.addClasses(StreamService.class)
.addClasses(StreamingGrpc.class, MutinyStreamingGrpc.class,
Streaming.class, StreamingBean.class, StreamingClient.class,
StreamingProto.class, Item.class, ItemOrBuilder.class));

@Inject
TestSpanExporter spanExporter;
@GrpcClient
MutinyGreeterGrpc.MutinyGreeterStub greeterStub;
@Inject
HelloBean helloBean;

@AfterEach
void tearDown() {
spanExporter.reset();
}

@Test
void grpc() {
String response = greeterStub.sayHello(
HelloRequest.newBuilder().setName("Naruto").build())
.map(HelloReply::getMessage)
.await().atMost(Duration.ofSeconds(5));
assertEquals("Hello Naruto", response);

List<SpanData> spans = spanExporter.getFinishedSpanItems(2);
assertEquals(2, spans.size());

SpanData server = spans.get(0);
assertEquals("helloworld.Greeter/SayHello", server.getName());
assertEquals(SpanKind.SERVER, server.getKind());
assertEquals("grpc", server.getAttributes().get(RPC_SYSTEM));
assertEquals("helloworld.Greeter", server.getAttributes().get(RPC_SERVICE));
assertEquals("SayHello", server.getAttributes().get(RPC_METHOD));
assertEquals(Status.Code.OK.value(), server.getAttributes().get(RPC_GRPC_STATUS_CODE));
assertNotNull(server.getAttributes().get(NET_PEER_IP));
assertNotNull(server.getAttributes().get(NET_PEER_PORT));
assertEquals("ip_tcp", server.getAttributes().get(NET_TRANSPORT));

SpanData client = spans.get(1);
assertEquals("helloworld.Greeter/SayHello", client.getName());
assertEquals(SpanKind.CLIENT, client.getKind());
assertEquals("grpc", client.getAttributes().get(RPC_SYSTEM));
assertEquals("helloworld.Greeter", client.getAttributes().get(RPC_SERVICE));
assertEquals("SayHello", client.getAttributes().get(RPC_METHOD));
assertEquals(Status.Code.OK.value(), client.getAttributes().get(RPC_GRPC_STATUS_CODE));

assertEquals(server.getTraceId(), client.getTraceId());
}

@Test
void error() {
try {
greeterStub.sayHello(HelloRequest.newBuilder().setName("error").build())
.map(HelloReply::getMessage)
.await()
.atMost(Duration.ofSeconds(5));
fail();
} catch (Exception e) {
assertTrue(true);
}

List<SpanData> spans = spanExporter.getFinishedSpanItems(2);
assertEquals(2, spans.size());

SpanData server = spans.get(0);
assertEquals("helloworld.Greeter/SayHello", server.getName());
assertEquals(SpanKind.SERVER, server.getKind());
assertEquals("grpc", server.getAttributes().get(RPC_SYSTEM));
assertEquals("helloworld.Greeter", server.getAttributes().get(RPC_SERVICE));
assertEquals("SayHello", server.getAttributes().get(RPC_METHOD));
assertEquals(Status.Code.UNKNOWN.value(), server.getAttributes().get(RPC_GRPC_STATUS_CODE));
assertNotNull(server.getAttributes().get(NET_PEER_IP));
assertNotNull(server.getAttributes().get(NET_PEER_PORT));
assertEquals("ip_tcp", server.getAttributes().get(NET_TRANSPORT));

SpanData client = spans.get(1);
assertEquals("helloworld.Greeter/SayHello", client.getName());
assertEquals(SpanKind.CLIENT, client.getKind());
assertEquals("grpc", client.getAttributes().get(RPC_SYSTEM));
assertEquals("helloworld.Greeter", client.getAttributes().get(RPC_SERVICE));
assertEquals("SayHello", client.getAttributes().get(RPC_METHOD));
assertEquals(Status.Code.UNKNOWN.value(), server.getAttributes().get(RPC_GRPC_STATUS_CODE));

assertEquals(server.getTraceId(), client.getTraceId());
}

@Test
void withCdi() {
assertEquals("Hello Naruto", helloBean.hello("Naruto"));

List<SpanData> spans = spanExporter.getFinishedSpanItems(3);
assertEquals(3, spans.size());

assertEquals(spans.get(0).getTraceId(), spans.get(1).getTraceId());
assertEquals(spans.get(0).getTraceId(), spans.get(2).getTraceId());
}

@GrpcService
public static class HelloService extends GreeterGrpc.GreeterImplBase {
@Override
public void sayHello(final HelloRequest request, final StreamObserver<HelloReply> responseObserver) {
if (request.getName().equals("error")) {
responseObserver.onError(new RuntimeException());
return;
}

responseObserver.onNext(HelloReply.newBuilder().setMessage("Hello " + request.getName()).build());
responseObserver.onCompleted();
}
}

@ApplicationScoped
public static class HelloBean {
@GrpcClient
Greeter greeter;

// TODO - radcortez - how to propagate the context if this is a Uni?
@WithSpan
public String hello(String name) {
return greeter.sayHello(HelloRequest.newBuilder().setName(name).build())
.onItem()
.transform(HelloReply::getMessage)
.await()
.atMost(Duration.ofSeconds(5));
}
}

@GrpcClient
MutinyStreamingGrpc.MutinyStreamingStub streamingStub;

@Test
void stream() {
Multi<Item> request = Multi.createFrom().items(item("Goku"), item("Vegeta"), item("Piccolo"), item("Beerus"),
item("Whis"));
Multi<Item> response = streamingStub.pipe(request);

List<String> items = response.map(Item::getMessage).collect().asList().await().atMost(Duration.ofSeconds(5));
assertTrue(items.contains("Hello Goku"));
assertTrue(items.contains("Hello Vegeta"));
assertTrue(items.contains("Hello Piccolo"));
assertTrue(items.contains("Hello Beerus"));
assertTrue(items.contains("Hello Whis"));

List<SpanData> spans = spanExporter.getFinishedSpanItems(2);
assertEquals(2, spans.size());

SpanData server = spans.get(0);
assertEquals("streaming.Streaming/Pipe", server.getName());
assertEquals(SpanKind.SERVER, server.getKind());
assertEquals("grpc", server.getAttributes().get(RPC_SYSTEM));
assertEquals("streaming.Streaming", server.getAttributes().get(RPC_SERVICE));
assertEquals("Pipe", server.getAttributes().get(RPC_METHOD));
assertEquals(Status.Code.OK.value(), server.getAttributes().get(RPC_GRPC_STATUS_CODE));
assertNotNull(server.getAttributes().get(NET_PEER_IP));
assertNotNull(server.getAttributes().get(NET_PEER_PORT));
assertEquals("ip_tcp", server.getAttributes().get(NET_TRANSPORT));

SpanData client = spans.get(1);
assertEquals("streaming.Streaming/Pipe", client.getName());
assertEquals(SpanKind.CLIENT, client.getKind());
assertEquals("grpc", client.getAttributes().get(RPC_SYSTEM));
assertEquals("streaming.Streaming", client.getAttributes().get(RPC_SERVICE));
assertEquals("Pipe", client.getAttributes().get(RPC_METHOD));
assertEquals(Status.Code.OK.value(), client.getAttributes().get(RPC_GRPC_STATUS_CODE));

assertEquals(server.getTraceId(), client.getTraceId());
}

@GrpcService
public static class StreamService implements Streaming {
@Override
public Multi<Item> pipe(final Multi<Item> request) {
return request.onItem().transform(item -> item("Hello " + item.getMessage()));
}
}

private static Item item(final String message) {
return Item.newBuilder().setMessage(message).build();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
syntax = "proto3";

option java_multiple_files = true;
option java_package = "io.quarkus.opentelemetry.deployment";
option java_outer_classname = "HelloProto";

package helloworld;

// The greeting service definition.
service Greeter {
// Sends a greeting
rpc SayHello(HelloRequest) returns (HelloReply) {}
}

// The request message containing the user's name.
message HelloRequest {
string name = 1;
}

// The response message containing the greetings
message HelloReply {
string message = 1;
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
syntax = "proto3";

option java_multiple_files = true;
option java_package = "io.quarkus.opentelemetry.deployment";
option java_outer_classname = "StreamingProto";

package streaming;

service Streaming {
rpc Pipe(stream Item) returns (stream Item) {}
}

message Item {
string message = 1;
}
5 changes: 5 additions & 0 deletions extensions/opentelemetry/opentelemetry/runtime/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,11 @@
<artifactId>quarkus-rest-client</artifactId>
<optional>true</optional>
</dependency>
<dependency>
<groupId>io.quarkus</groupId>
<artifactId>quarkus-grpc</artifactId>
<optional>true</optional>
</dependency>

<dependency>
<groupId>io.smallrye.common</groupId>
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
package io.quarkus.opentelemetry.runtime.tracing.grpc;

import io.grpc.Status;
import io.opentelemetry.instrumentation.api.instrumenter.rpc.RpcAttributesExtractor;

class GrpcAttributesExtractor extends RpcAttributesExtractor<GrpcRequest, Status> {
@Override
protected String system(final GrpcRequest grpcRequest) {
return "grpc";
}

@Override
protected String service(final GrpcRequest grpcRequest) {
return grpcRequest.getMethodDescriptor().getServiceName();
}

@Override
protected String method(final GrpcRequest grpcRequest) {
return grpcRequest.getMethodDescriptor().getBareMethodName();
}
}
Loading

0 comments on commit b46fdbb

Please sign in to comment.