Skip to content

Commit

Permalink
gRPC Dev UI - fix problems when reusing the existing HTTP server
Browse files Browse the repository at this point in the history
- i.e. when quarkus.grpc.server.use-separate-server=false is used
- fixes #30244
  • Loading branch information
mkouba committed Jan 10, 2023
1 parent f7f159e commit b28201e
Show file tree
Hide file tree
Showing 4 changed files with 61 additions and 40 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -56,11 +56,13 @@
import io.quarkus.grpc.deployment.DelegatingGrpcBeanBuildItem;
import io.quarkus.grpc.deployment.GrpcDotNames;
import io.quarkus.grpc.protoc.plugin.MutinyGrpcGenerator;
import io.quarkus.grpc.runtime.config.GrpcConfiguration;
import io.quarkus.grpc.runtime.devmode.CollectStreams;
import io.quarkus.grpc.runtime.devmode.DelegatingGrpcBeansStorage;
import io.quarkus.grpc.runtime.devmode.GrpcDevConsoleRecorder;
import io.quarkus.grpc.runtime.devmode.GrpcServices;
import io.quarkus.grpc.runtime.devmode.StreamCollectorInterceptor;
import io.quarkus.vertx.http.runtime.HttpConfiguration;

public class GrpcDevConsoleProcessor {

Expand Down Expand Up @@ -145,8 +147,9 @@ public void collectMessagePrototypes(CombinedIndexBuildItem index,
@Consume(RuntimeConfigSetupCompleteBuildItem.class)
@Record(ExecutionTime.RUNTIME_INIT)
@BuildStep(onlyIf = IsDevelopment.class)
public DevConsoleRouteBuildItem createWebSocketEndpoint(GrpcDevConsoleRecorder recorder) {
recorder.setServerConfiguration();
public DevConsoleRouteBuildItem createWebSocketEndpoint(GrpcDevConsoleRecorder recorder,
HttpConfiguration httpConfiguration, GrpcConfiguration grpcConfiguration) {
recorder.setServerConfiguration(httpConfiguration, grpcConfiguration);
return DevConsoleRouteBuildItem.builder().path("grpc-test").method("GET").handler(recorder.handler()).build();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -221,33 +221,40 @@ public void init() {
if (serviceDescriptors != null) {
return;
}
if (serverConfig == null) {
log.warnf("Unable to initialize client stubs for gRPC Dev UI - server config not found");
return;
}
if (Boolean.TRUE.equals(serverConfig.get("ssl"))) {
log.warnf("Unable to initialize client stubs for gRPC Dev UI - SSL is not supported");
return;
}

serviceDescriptors = new HashMap<>();
grpcClientStubs = new HashMap<>();
try {
if (serverConfig == null || Boolean.FALSE.equals(serverConfig.get("ssl"))) {
for (Class<?> grpcServiceClass : grpcServices) {

Method method = grpcServiceClass.getDeclaredMethod("getServiceDescriptor");
ServiceDescriptor serviceDescriptor = (ServiceDescriptor) method.invoke(null);
serviceDescriptors.put(serviceDescriptor.getName(), serviceDescriptor);

// TODO more config options
Channel channel = NettyChannelBuilder
.forAddress(serverConfig.get("host").toString(), (Integer) serverConfig.get("port"))
.usePlaintext()
.build();
Method stubFactoryMethod;

try {
stubFactoryMethod = grpcServiceClass.getDeclaredMethod("newStub", Channel.class);
} catch (NoSuchMethodException e) {
log.warnf("Ignoring gRPC service - newStub() method not declared on %s", grpcServiceClass);
continue;
}
for (Class<?> grpcServiceClass : grpcServices) {

Method method = grpcServiceClass.getDeclaredMethod("getServiceDescriptor");
ServiceDescriptor serviceDescriptor = (ServiceDescriptor) method.invoke(null);
serviceDescriptors.put(serviceDescriptor.getName(), serviceDescriptor);

// TODO more config options
Channel channel = NettyChannelBuilder
.forAddress(serverConfig.get("host").toString(), (Integer) serverConfig.get("port"))
.usePlaintext()
.build();
Method stubFactoryMethod;

Object stub = stubFactoryMethod.invoke(null, channel);
grpcClientStubs.put(serviceDescriptor.getName(), stub);
try {
stubFactoryMethod = grpcServiceClass.getDeclaredMethod("newStub", Channel.class);
} catch (NoSuchMethodException e) {
log.warnf("Ignoring gRPC service - newStub() method not declared on %s", grpcServiceClass);
continue;
}

Object stub = stubFactoryMethod.invoke(null, channel);
grpcClientStubs.put(serviceDescriptor.getName(), stub);
}
} catch (InvocationTargetException | NoSuchMethodException | IllegalAccessException e) {
throw new IllegalStateException("Unable to initialize client stubs for gRPC Dev UI");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@
import io.grpc.ServerMethodDefinition;
import io.grpc.ServerServiceDefinition;
import io.quarkus.arc.Arc;
import io.quarkus.arc.InstanceHandle;
import io.quarkus.arc.Subclass;
import io.quarkus.grpc.runtime.config.GrpcConfiguration;
import io.quarkus.grpc.runtime.config.GrpcServerConfiguration;
Expand Down Expand Up @@ -173,6 +174,8 @@ private void buildGrpcServer(Vertx vertx, GrpcServerConfiguration configuration,
}
});
shutdown.addShutdownTask(route::remove); // remove this route at shutdown, this should reset it

initHealthStorage();
}

// TODO -- handle Avro, plain text ... when supported / needed
Expand Down Expand Up @@ -225,12 +228,15 @@ private void postStartup(GrpcServerConfiguration configuration, boolean test) {
}

private void initHealthStorage() {
GrpcHealthStorage storage = Arc.container().instance(GrpcHealthStorage.class).get();
storage.setStatus(GrpcHealthStorage.DEFAULT_SERVICE_NAME,
HealthOuterClass.HealthCheckResponse.ServingStatus.SERVING);
for (GrpcServiceDefinition service : services) {
storage.setStatus(service.definition.getServiceDescriptor().getName(),
InstanceHandle<GrpcHealthStorage> storageHandle = Arc.container().instance(GrpcHealthStorage.class);
if (storageHandle.isAvailable()) {
GrpcHealthStorage storage = storageHandle.get();
storage.setStatus(GrpcHealthStorage.DEFAULT_SERVICE_NAME,
HealthOuterClass.HealthCheckResponse.ServingStatus.SERVING);
for (GrpcServiceDefinition service : services) {
storage.setStatus(service.definition.getServiceDescriptor().getName(),
HealthOuterClass.HealthCheckResponse.ServingStatus.SERVING);
}
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,13 +5,13 @@

import org.jboss.logging.Logger;

import io.quarkus.arc.Arc;
import io.quarkus.arc.InstanceHandle;
import io.quarkus.dev.console.DevConsoleManager;
import io.quarkus.dev.testing.GrpcWebSocketProxy;
import io.quarkus.grpc.runtime.config.GrpcConfiguration;
import io.quarkus.grpc.runtime.config.GrpcServerConfiguration;
import io.quarkus.runtime.annotations.Recorder;
import io.quarkus.vertx.http.runtime.HttpConfiguration;
import io.quarkus.vertx.http.runtime.HttpConfiguration.InsecureRequests;
import io.vertx.core.AsyncResult;
import io.vertx.core.Handler;
import io.vertx.core.buffer.Buffer;
Expand All @@ -20,17 +20,22 @@

@Recorder
public class GrpcDevConsoleRecorder {
private static final Logger log = Logger.getLogger(GrpcDevConsoleRecorder.class);

public void setServerConfiguration() {
try (InstanceHandle<GrpcConfiguration> config = Arc.container().instance(GrpcConfiguration.class)) {
GrpcServerConfiguration serverConfig = config.get().server;
Map<String, Object> map = new HashMap<>();
private static final Logger LOG = Logger.getLogger(GrpcDevConsoleRecorder.class);

public void setServerConfiguration(HttpConfiguration httpConfiguration, GrpcConfiguration grpcConfiguration) {
GrpcServerConfiguration serverConfig = grpcConfiguration.server;
Map<String, Object> map = new HashMap<>();
if (serverConfig.useSeparateServer) {
map.put("host", serverConfig.host);
map.put("port", serverConfig.port);
map.put("ssl", serverConfig.ssl.certificate.isPresent() || serverConfig.ssl.keyStore.isPresent());
DevConsoleManager.setGlobal("io.quarkus.grpc.serverConfig", map);
} else {
map.put("host", httpConfiguration.host);
map.put("port", httpConfiguration.port);
map.put("ssl", httpConfiguration.insecureRequests != InsecureRequests.ENABLED);
}
DevConsoleManager.setGlobal("io.quarkus.grpc.serverConfig", map);
}

public Handler<RoutingContext> handler() {
Expand All @@ -39,12 +44,12 @@ public Handler<RoutingContext> handler() {
public void handle(RoutingContext context) {
context.request().toWebSocket(webSocket -> {
if (webSocket.failed()) {
log.error("failed to connect web socket", webSocket.cause());
LOG.error("failed to connect web socket", webSocket.cause());
} else {
ServerWebSocket serverWebSocket = webSocket.result();
Integer socketId = GrpcWebSocketProxy.addWebSocket(
message -> serverWebSocket.writeTextMessage(message)
.onFailure(e -> log
.onFailure(e -> LOG
.info("failed to send back message to the gRPC Dev Console WebSocket", e)),
runnable -> {
if (!serverWebSocket.isClosed()) {
Expand All @@ -60,7 +65,7 @@ public void handle(AsyncResult<Void> event) {
});

if (socketId == null) {
log.error("No gRPC dev console WebSocketListener");
LOG.error("No gRPC dev console WebSocketListener");
serverWebSocket.close();
return;
}
Expand Down

0 comments on commit b28201e

Please sign in to comment.