diff --git a/core/devmode-spi/src/main/java/io/quarkus/dev/testing/GrpcWebSocketProxy.java b/core/devmode-spi/src/main/java/io/quarkus/dev/testing/GrpcWebSocketProxy.java new file mode 100644 index 0000000000000..d55876d528576 --- /dev/null +++ b/core/devmode-spi/src/main/java/io/quarkus/dev/testing/GrpcWebSocketProxy.java @@ -0,0 +1,64 @@ +package io.quarkus.dev.testing; + +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.function.Consumer; + +public class GrpcWebSocketProxy { + + private static final AtomicInteger connectionIdSeq = new AtomicInteger(); + + private static volatile WebSocketListener webSocketListener; + + private static final Map> webSocketConnections = new ConcurrentHashMap<>(); + + public static Integer addWebSocket(Consumer responseConsumer, + Consumer closeHandler) { + if (webSocketListener != null) { + int id = connectionIdSeq.getAndIncrement(); + webSocketListener.onOpen(id, responseConsumer); + + webSocketConnections.put(id, closeHandler); + return id; + } + return null; + } + + public static void closeAll() { + CountDownLatch latch = new CountDownLatch(webSocketConnections.size()); + for (Map.Entry> connection : webSocketConnections.entrySet()) { + connection.getValue().accept(latch::countDown); + webSocketListener.onClose(connection.getKey()); + } + try { + if (!latch.await(5, TimeUnit.SECONDS)) { + System.err.println("Failed to close all the websockets in 5 seconds"); + } + } catch (InterruptedException e) { + System.err.println("Interrupted while waiting for websockets to be closed"); + } + } + + public static void closeWebSocket(int id) { + webSocketListener.onClose(id); + } + + public static void setWebSocketListener(WebSocketListener listener) { + webSocketListener = listener; + } + + public static void addMessage(Integer socketId, String message) { + webSocketListener.newMessage(socketId, message); + } + + public interface WebSocketListener { + void onOpen(int id, Consumer responseConsumer); + + void newMessage(int id, String content); + + void onClose(int id); + } +} diff --git a/extensions/grpc/deployment/pom.xml b/extensions/grpc/deployment/pom.xml index 07f7d8cbabde8..c753299475663 100644 --- a/extensions/grpc/deployment/pom.xml +++ b/extensions/grpc/deployment/pom.xml @@ -53,7 +53,12 @@ quarkus-grpc-codegen - + + + io.quarkus + quarkus-vertx-http-deployment + + io.quarkus quarkus-resteasy-deployment diff --git a/extensions/grpc/deployment/src/main/java/io/quarkus/grpc/deployment/GrpcServerProcessor.java b/extensions/grpc/deployment/src/main/java/io/quarkus/grpc/deployment/GrpcServerProcessor.java index ee6a95e9c2914..bf3320359475b 100644 --- a/extensions/grpc/deployment/src/main/java/io/quarkus/grpc/deployment/GrpcServerProcessor.java +++ b/extensions/grpc/deployment/src/main/java/io/quarkus/grpc/deployment/GrpcServerProcessor.java @@ -298,7 +298,7 @@ ServiceStartBuildItem initializeServer(GrpcServerRecorder recorder, GrpcConfigur } } - if (!bindables.isEmpty()) { + if (!bindables.isEmpty() || LaunchMode.current() == LaunchMode.DEVELOPMENT) { recorder.initializeGrpcServer(vertx.getVertx(), config, shutdown, blocking, launchModeBuildItem.getLaunchMode()); return new ServiceStartBuildItem(GRPC_SERVER); } diff --git a/extensions/grpc/deployment/src/main/java/io/quarkus/grpc/deployment/devmode/GrpcDevConsoleProcessor.java b/extensions/grpc/deployment/src/main/java/io/quarkus/grpc/deployment/devmode/GrpcDevConsoleProcessor.java index d7a182649b62c..f977c493bdd96 100644 --- a/extensions/grpc/deployment/src/main/java/io/quarkus/grpc/deployment/devmode/GrpcDevConsoleProcessor.java +++ b/extensions/grpc/deployment/src/main/java/io/quarkus/grpc/deployment/devmode/GrpcDevConsoleProcessor.java @@ -1,7 +1,5 @@ package io.quarkus.grpc.deployment.devmode; -import static io.quarkus.deployment.annotations.ExecutionTime.RUNTIME_INIT; - import java.lang.reflect.InvocationTargetException; import java.lang.reflect.Method; import java.lang.reflect.Modifier; @@ -17,44 +15,38 @@ import org.jboss.jandex.DotName; import org.jboss.jandex.IndexView; import org.jboss.jandex.MethodInfo; -import org.jboss.logging.Logger; import com.google.protobuf.InvalidProtocolBufferException; -import com.google.protobuf.Message; -import com.google.protobuf.Message.Builder; import com.google.protobuf.MessageOrBuilder; import com.google.protobuf.util.JsonFormat; -import io.grpc.Channel; import io.grpc.MethodDescriptor; import io.grpc.MethodDescriptor.Marshaller; import io.grpc.MethodDescriptor.PrototypeMarshaller; import io.grpc.ServiceDescriptor; -import io.grpc.netty.NettyChannelBuilder; import io.quarkus.arc.deployment.AdditionalBeanBuildItem; import io.quarkus.arc.runtime.BeanLookupSupplier; import io.quarkus.deployment.IsDevelopment; import io.quarkus.deployment.annotations.BuildProducer; import io.quarkus.deployment.annotations.BuildStep; import io.quarkus.deployment.annotations.Consume; +import io.quarkus.deployment.annotations.ExecutionTime; import io.quarkus.deployment.annotations.Record; import io.quarkus.deployment.builditem.CombinedIndexBuildItem; import io.quarkus.deployment.builditem.RuntimeConfigSetupCompleteBuildItem; import io.quarkus.deployment.builditem.ServiceStartBuildItem; import io.quarkus.dev.console.DevConsoleManager; -import io.quarkus.devconsole.spi.DevConsoleRouteBuildItem; +import io.quarkus.dev.testing.GrpcWebSocketProxy; import io.quarkus.devconsole.spi.DevConsoleRuntimeTemplateInfoBuildItem; import io.quarkus.grpc.deployment.GrpcDotNames; import io.quarkus.grpc.protoc.plugin.MutinyGrpcGenerator; import io.quarkus.grpc.runtime.devmode.GrpcDevConsoleRecorder; import io.quarkus.grpc.runtime.devmode.GrpcServices; -import io.vertx.core.Handler; -import io.vertx.ext.web.RoutingContext; +import io.quarkus.vertx.http.deployment.NonApplicationRootPathBuildItem; +import io.quarkus.vertx.http.deployment.RouteBuildItem; public class GrpcDevConsoleProcessor { - private static final Logger LOG = Logger.getLogger(GrpcDevConsoleProcessor.class); - @BuildStep(onlyIf = IsDevelopment.class) public void devConsoleInfo(BuildProducer beans, BuildProducer infos) { @@ -72,7 +64,8 @@ public void collectMessagePrototypes(CombinedIndexBuildItem index, IllegalArgumentException, InvocationTargetException, InvalidProtocolBufferException { Map messagePrototypes = new HashMap<>(); - for (Class grpcServiceClass : getGrpcServices(index.getIndex())) { + Collection> grpcServices = getGrpcServices(index.getIndex()); + for (Class grpcServiceClass : grpcServices) { Method method = grpcServiceClass.getDeclaredMethod("getServiceDescriptor"); ServiceDescriptor serviceDescriptor = (ServiceDescriptor) method.invoke(null); @@ -89,145 +82,18 @@ public void collectMessagePrototypes(CombinedIndexBuildItem index, } DevConsoleManager.setGlobal("io.quarkus.grpc.messagePrototypes", messagePrototypes); + GrpcWebSocketProxy.setWebSocketListener( + new GrpcDevConsoleWebSocketListener(grpcServices, Thread.currentThread().getContextClassLoader())); } @Consume(RuntimeConfigSetupCompleteBuildItem.class) - @Record(value = RUNTIME_INIT) + @Record(ExecutionTime.RUNTIME_INIT) @BuildStep(onlyIf = IsDevelopment.class) - DevConsoleRouteBuildItem registerTestEndpoint(GrpcDevConsoleRecorder recorder, CombinedIndexBuildItem index) - throws ClassNotFoundException, NoSuchMethodException, - SecurityException, IllegalAccessException, IllegalArgumentException, InvocationTargetException { - // Store the server config so that it can be used in the test endpoint handler + public RouteBuildItem createWebSocketEndpoint(NonApplicationRootPathBuildItem nonApplicationRootPathBuildItem, + GrpcDevConsoleRecorder recorder) { recorder.setServerConfiguration(); - return new DevConsoleRouteBuildItem("test", "POST", new TestEndpointHandler(getGrpcServices(index.getIndex())), true); - } - - static class TestEndpointHandler implements Handler { - - private Map blockingStubs; - private Map serviceDescriptors; - private final Collection> grpcServiceClasses; - - TestEndpointHandler(Collection> grpcServiceClasses) { - this.grpcServiceClasses = grpcServiceClasses; - } - - void init() throws NoSuchMethodException, SecurityException, IllegalAccessException, IllegalArgumentException, - InvocationTargetException { - if (blockingStubs == null) { - blockingStubs = new HashMap<>(); - serviceDescriptors = new HashMap<>(); - - Map serverConfig = DevConsoleManager.getGlobal("io.quarkus.grpc.serverConfig"); - - if (Boolean.FALSE.equals(serverConfig.get("ssl"))) { - for (Class grpcServiceClass : grpcServiceClasses) { - - Method method = grpcServiceClass.getDeclaredMethod("getServiceDescriptor"); - ServiceDescriptor serviceDescriptor = (ServiceDescriptor) method.invoke(null); - serviceDescriptors.put(serviceDescriptor.getName(), serviceDescriptor); - - // TODO more config options - Channel channel = NettyChannelBuilder - .forAddress(serverConfig.get("host").toString(), (Integer) serverConfig.get("port")) - .usePlaintext() - .build(); - Method blockingStubFactoryMethod; - - try { - blockingStubFactoryMethod = grpcServiceClass.getDeclaredMethod("newBlockingStub", Channel.class); - } catch (NoSuchMethodException e) { - LOG.warnf("Ignoring gRPC service - newBlockingStub() method not declared on %s", grpcServiceClass); - continue; - } - - Object blockingStub = blockingStubFactoryMethod.invoke(null, channel); - blockingStubs.put(serviceDescriptor.getName(), blockingStub); - } - } - } - } - - @Override - public void handle(RoutingContext context) { - try { - // Lazily initialize the handler - init(); - } catch (Exception e) { - throw new IllegalStateException("Unable to initialize the test endpoint handler"); - } - - String serviceName = context.request().getParam("serviceName"); - String methodName = context.request().getParam("methodName"); - String testJsonData = context.getBodyAsString(); - - Object blockingStub = blockingStubs.get(serviceName); - - if (blockingStub == null) { - error(context, "No blocking stub found for: " + serviceName); - } else { - ServiceDescriptor serviceDescriptor = serviceDescriptors.get(serviceName); - MethodDescriptor methodDescriptor = null; - for (MethodDescriptor method : serviceDescriptor.getMethods()) { - if (method.getBareMethodName().equals(methodName)) { - methodDescriptor = method; - } - } - - if (methodDescriptor == null) { - error(context, "No method descriptor found for: " + serviceName + "/" + methodName); - } else { - - // We need to find the correct method declared on the blocking stub - Method stubMethod = null; - String realMethodName = decapitalize(methodDescriptor.getBareMethodName()); - - for (Method method : blockingStub.getClass().getDeclaredMethods()) { - if (method.getName().equals(realMethodName)) { - stubMethod = method; - } - } - - if (stubMethod == null) { - error(context, realMethodName + " method not declared on the " + blockingStub.getClass()); - } else { - - // Identify the request class - Marshaller requestMarshaller = methodDescriptor.getRequestMarshaller(); - if (requestMarshaller instanceof PrototypeMarshaller) { - PrototypeMarshaller protoMarshaller = (PrototypeMarshaller) requestMarshaller; - Class requestType = protoMarshaller.getMessagePrototype().getClass(); - - try { - // Create a new builder for the request message, e.g. HelloRequest.newBuilder() - Method newBuilderMethod = requestType.getDeclaredMethod("newBuilder"); - Message.Builder builder = (Builder) newBuilderMethod.invoke(null); - ; - - // Use the test data to build the request object - JsonFormat.parser().merge(testJsonData, builder); - - // Invoke the blocking stub method and format the response as JSON - Object response = stubMethod.invoke(blockingStub, builder.build()); - context.response().putHeader("Content-Type", "application/json"); - context.end(JsonFormat.printer().print((MessageOrBuilder) response)); - - } catch (Exception e) { - throw new IllegalStateException(e); - } - } else { - error(context, "Unable to identify the request type for: " + methodDescriptor); - } - } - } - } - - } - } - - static void error(RoutingContext rc, String message) { - LOG.warn(message); - rc.response().setStatusCode(500).end(message); + return nonApplicationRootPathBuildItem.routeBuilder().route("dev/grpc-test") + .handler(recorder.handler()).build(); } Collection> getGrpcServices(IndexView index) throws ClassNotFoundException { @@ -256,18 +122,4 @@ Collection> getGrpcServices(IndexView index) throws ClassNotFoundExcept } return serviceClasses; } - - static String decapitalize(String name) { - if (name == null || name.length() == 0) { - return name; - } - if (name.length() > 1 && Character.isUpperCase(name.charAt(1)) && - Character.isUpperCase(name.charAt(0))) { - return name; - } - char chars[] = name.toCharArray(); - chars[0] = Character.toLowerCase(chars[0]); - return new String(chars); - } - } diff --git a/extensions/grpc/deployment/src/main/java/io/quarkus/grpc/deployment/devmode/GrpcDevConsoleWebSocketListener.java b/extensions/grpc/deployment/src/main/java/io/quarkus/grpc/deployment/devmode/GrpcDevConsoleWebSocketListener.java new file mode 100644 index 0000000000000..0a5d470bb92d8 --- /dev/null +++ b/extensions/grpc/deployment/src/main/java/io/quarkus/grpc/deployment/devmode/GrpcDevConsoleWebSocketListener.java @@ -0,0 +1,305 @@ +package io.quarkus.grpc.deployment.devmode; + +import java.lang.reflect.InvocationTargetException; +import java.lang.reflect.Method; +import java.util.Collection; +import java.util.HashMap; +import java.util.Map; +import java.util.Optional; +import java.util.concurrent.ConcurrentHashMap; +import java.util.function.Consumer; + +import org.jboss.logging.Logger; + +import com.google.protobuf.InvalidProtocolBufferException; +import com.google.protobuf.Message; +import com.google.protobuf.MessageOrBuilder; +import com.google.protobuf.util.JsonFormat; + +import io.grpc.Channel; +import io.grpc.MethodDescriptor; +import io.grpc.ServiceDescriptor; +import io.grpc.netty.NettyChannelBuilder; +import io.grpc.stub.StreamObserver; +import io.quarkus.dev.console.DevConsoleManager; +import io.quarkus.dev.testing.GrpcWebSocketProxy; +import io.vertx.core.json.JsonObject; + +public class GrpcDevConsoleWebSocketListener implements GrpcWebSocketProxy.WebSocketListener { + + private static final Logger log = Logger.getLogger(GrpcDevConsoleWebSocketListener.class); + + private Map grpcClientStubs; + private Map serviceDescriptors; + + private final ClassLoader deploymentClassLoader; + private final Collection> grpcServices; + + private final Map webSocketConnections = new ConcurrentHashMap<>(); + + public GrpcDevConsoleWebSocketListener(Collection> grpcServices, ClassLoader deploymentClassLoader) { + this.grpcServices = grpcServices; + this.deploymentClassLoader = deploymentClassLoader; + } + + private void handle(String input, WebSocketData websocketData) { + ClassLoader originalCl = Thread.currentThread().getContextClassLoader(); + Thread.currentThread().setContextClassLoader(deploymentClassLoader); + try { + JsonObject grpcRequest = new JsonObject(input); + + // each message sent through this websocket has to have an ID + // this is an ID of the gRPC method invocation + // if client-side streaming is used, subsequent calls that should use the same + // stream should have the same ID + Integer id = grpcRequest.getInteger("id"); + String serviceName = grpcRequest.getString("serviceName"); + String methodName = grpcRequest.getString("methodName"); + + if ("DISCONNECT".equals(grpcRequest.getString("command"))) { + GrpcCallData grpcCall = websocketData.callsInProgress.get(id); + if (grpcCall != null && grpcCall.incomingStream != null) { + grpcCall.incomingStream.onCompleted(); + } + return; + } + + GrpcCallData grpcCall; + if (websocketData.callsInProgress.containsKey(id)) { + grpcCall = websocketData.callsInProgress.get(id); + } else { + Optional maybeOldCall = websocketData.callsInProgress.values() + .stream().filter(call -> call.methodName.equals(methodName) && call.serviceName.equals(serviceName)) + .findAny(); + maybeOldCall.ifPresent(call -> { + if (call.incomingStream != null) { + call.incomingStream.onCompleted(); + } + websocketData.callsInProgress.remove(call.requestId); + }); + + grpcCall = new GrpcCallData(); + grpcCall.serviceName = serviceName; + grpcCall.methodName = methodName; + grpcCall.requestId = id; + websocketData.callsInProgress.put(grpcCall.requestId, grpcCall); + } + + String testJsonData = grpcRequest.getString("content"); + Object grpcStub = grpcClientStubs.get(serviceName); + + if (grpcStub == null) { + websocketData.responseConsumer.accept(jsonResponse(id, "NO_STUB").encode()); + } else { + ServiceDescriptor serviceDescriptor = serviceDescriptors.get(serviceName); + MethodDescriptor methodDescriptor = null; + for (MethodDescriptor method : serviceDescriptor.getMethods()) { + if (method.getBareMethodName() != null && method.getBareMethodName().equals(methodName)) { + methodDescriptor = method; + } + } + if (methodDescriptor == null) { + websocketData.responseConsumer.accept(jsonResponse(id, "NO_DESCRIPTOR").encode()); + } else { + Method stubMethod = null; + String realMethodName = decapitalize(methodDescriptor.getBareMethodName()); + + for (Method method : grpcStub.getClass().getDeclaredMethods()) { + if (method.getName().equals(realMethodName)) { + stubMethod = method; + } + } + + if (stubMethod == null) { + websocketData.responseConsumer.accept(jsonResponse(id, "NO_METHOD").encode()); + log.error(realMethodName + " method not declared on the " + grpcStub.getClass()); + } else { + + // Identify the request class + MethodDescriptor.Marshaller requestMarshaller = methodDescriptor.getRequestMarshaller(); + if (requestMarshaller instanceof MethodDescriptor.PrototypeMarshaller) { + MethodDescriptor.PrototypeMarshaller protoMarshaller = (MethodDescriptor.PrototypeMarshaller) requestMarshaller; + Class requestType = protoMarshaller.getMessagePrototype().getClass(); + + try { + // Create a new builder for the request message, e.g. HelloRequest.newBuilder() + Method newBuilderMethod = requestType.getDeclaredMethod("newBuilder"); + Message.Builder builder = (Message.Builder) newBuilderMethod.invoke(null); + + // Use the test data to build the request object + JsonFormat.parser().merge(testJsonData, builder); + + Message message = builder.build(); + if (grpcCall.incomingStream != null) { + // we are already connected with this gRPC endpoint, just send the message + grpcCall.incomingStream.onNext(message); + } else { + // Invoke the stub method and format the response as JSON + + StreamObserver responseObserver = new StreamObserver() { + @Override + public void onNext(Object value) { + String body = null; + try { + body = JsonFormat.printer().print((MessageOrBuilder) value); + } catch (InvalidProtocolBufferException e) { + websocketData.responseConsumer + .accept(jsonResponse(id, "ERROR").put("body", e.getMessage()).encode()); + log.error("Failed to transform response to JSON", e); + } + JsonObject reply = jsonResponse(id, "PAYLOAD"); + reply.put("body", body); + websocketData.responseConsumer.accept(reply.encode()); + } + + @Override + public void onError(Throwable t) { + websocketData.responseConsumer + .accept(jsonResponse(id, "ERROR").put("body", t.getMessage()) + .encode()); + grpcCall.incomingStream = null; + log.error("Failure returned by gRPC service", t); + } + + @Override + public void onCompleted() { + websocketData.responseConsumer.accept(jsonResponse(id, "COMPLETED").encode()); + grpcCall.incomingStream = null; + } + }; + if (stubMethod.getParameterCount() == 1 + && stubMethod.getReturnType() == StreamObserver.class) { + // returned StreamObserver consumes incoming messages + //noinspection unchecked + grpcCall.incomingStream = (StreamObserver) stubMethod.invoke(grpcStub, + responseObserver); + grpcCall.incomingStream.onNext(message); + } else { + // incoming message should be passed as the first parameter of the invocation + stubMethod.invoke(grpcStub, message, responseObserver); + } + } + } catch (Exception e) { + throw new IllegalStateException(e); + } + } + } + } + } + } finally { + Thread.currentThread().setContextClassLoader(originalCl); + } + } + + static String decapitalize(String name) { + if (name == null || name.length() == 0) { + return name; + } + if (name.length() > 1 && Character.isUpperCase(name.charAt(1)) && + Character.isUpperCase(name.charAt(0))) { + return name; + } + char[] chars = name.toCharArray(); + chars[0] = Character.toLowerCase(chars[0]); + return new String(chars); + } + + private JsonObject jsonResponse(Integer id, String status) { + return new JsonObject() + .put("id", id) + .put("status", status); + } + + public void init() { + Map serverConfig = DevConsoleManager.getGlobal("io.quarkus.grpc.serverConfig"); + + if (serviceDescriptors != null) { + return; + } + serviceDescriptors = new HashMap<>(); + grpcClientStubs = new HashMap<>(); + try { + if (serverConfig == null || Boolean.FALSE.equals(serverConfig.get("ssl"))) { + for (Class grpcServiceClass : grpcServices) { + + Method method = grpcServiceClass.getDeclaredMethod("getServiceDescriptor"); + ServiceDescriptor serviceDescriptor = (ServiceDescriptor) method.invoke(null); + serviceDescriptors.put(serviceDescriptor.getName(), serviceDescriptor); + + // TODO more config options + Channel channel = NettyChannelBuilder + .forAddress(serverConfig.get("host").toString(), (Integer) serverConfig.get("port")) + .usePlaintext() + .build(); + Method stubFactoryMethod; + + try { + stubFactoryMethod = grpcServiceClass.getDeclaredMethod("newStub", Channel.class); + } catch (NoSuchMethodException e) { + log.warnf("Ignoring gRPC service - newStub() method not declared on %s", grpcServiceClass); + continue; + } + + Object stub = stubFactoryMethod.invoke(null, channel); + grpcClientStubs.put(serviceDescriptor.getName(), stub); + } + } + } catch (InvocationTargetException | NoSuchMethodException | IllegalAccessException e) { + throw new IllegalStateException("Unable to initialize client stubs for gRPC Dev UI"); + } + } + + @Override + public void onOpen(int id, Consumer responseConsumer) { + init(); + webSocketConnections.put(id, new WebSocketData(responseConsumer)); + } + + @Override + public void newMessage(int id, String content) { + WebSocketData webSocketData = webSocketConnections.get(id); + if (webSocketData != null) { + handle(content, webSocketData); + } else { + log.warn("gRPC Dev Console WebSocket message for an unregistered WebSocket id"); + } + } + + @Override + public void onClose(int id) { + closeAllClients(id); + + webSocketConnections.remove(id); + } + + private void closeAllClients(int id) { + WebSocketData webSocketData = webSocketConnections.get(id); + + if (webSocketData != null) { + for (GrpcCallData callData : webSocketData.callsInProgress.values()) { + try { + callData.incomingStream.onCompleted(); + } catch (Exception ignored) { + } + } + } + } + + private static class GrpcCallData { + Integer requestId; + String serviceName; + String methodName; + StreamObserver incomingStream; + } + + // contains information about all the connection done by a single + // browser window, i.e. a single websocket + private static class WebSocketData { + final Consumer responseConsumer; + Map callsInProgress = new HashMap<>(); + + private WebSocketData(Consumer responseConsumer) { + this.responseConsumer = responseConsumer; + } + } +} diff --git a/extensions/grpc/deployment/src/main/resources/dev-templates/service.html b/extensions/grpc/deployment/src/main/resources/dev-templates/service.html index 8633ca93ca4b0..6d91e12b6edc8 100644 --- a/extensions/grpc/deployment/src/main/resources/dev-templates/service.html +++ b/extensions/grpc/deployment/src/main/resources/dev-templates/service.html @@ -1,12 +1,113 @@ {#include main} {#style} - span.app-class { - cursor:pointer; - color:blue; - text-decoration:underline; - } +.connection-status { + font-style: italic; + font-size: 0.8em; + text-align: right; +} + +.status { + position: fixed; + width: 80%; + z-index: 100; +} +.status span { + background-color: #f0f0c0; + padding: 2px 20px; + border-radius: 3px; +} {/style} {#script} + var grpcWS; + var requestId = 0; + var clearMessageTimeout; + + var connections = new Map(); + + function hideConnected(connection) { + const idBase = connection.queryIdentifier.replace('#', '/'); + const connectedElement = document.getElementById(`$\{idBase}_connected`); + if (connectedElement) { + connectedElement.style.display = 'none'; + document.getElementById(`$\{idBase}_disconnect`).style.display = 'none'; + } + } + + function showConnected(connection) { + const idBase = connection.queryIdentifier.replace('#', '/'); + const connectedElement = document.getElementById(`$\{idBase}_connected`); + if (connectedElement) { + connectedElement.style.display = 'block'; + document.getElementById(`$\{idBase}_disconnect`).style.display = 'block'; + } + } + + function connect() { + var wsUri; + if (window.location.protocol === "https:") { + wsUri = "wss:"; + } else { + wsUri = "ws:"; + } + + wsUri += "//" + window.location.host + "/q/dev/grpc-test"; + grpcWS = new WebSocket(wsUri); + grpcWS.onopen = function (event) { + console.log("websocket connected"); + info("Web Socket bridge to gRPC connected"); + } + + grpcWS.onerror = function (error) { + console.log("error on gRPC websocket", error); + } + + grpcWS.onclose = function () { + info("Web Socket bridge to gRPC disconnected, reconnecting"); + connections.forEach(connection => hideConnected(connection)); + connections.clear(); + setTimeout(connect, 2000); + } + + grpcWS.onmessage = function (event) { + const data = JSON.parse(event.data); + if (data.status == 'RESET') { + connections.clear(); + } else if (data.status == 'PAYLOAD') { + const connection = connections.get(data.id); + const responseElement = connection.responseElement; + if (connection.responseText != '') { + connection.responseText = '\n---------\n' + connection.responseText; + } + + connection.responseText = data.body + connection.responseText; + responseElement.value = connection.responseText; + } else if (data.status == 'COMPLETED') { + const connection = connections.get(data.id); + hideConnected(connection); + connections.delete(data.id); + } else if (data.status == 'ERROR') { + console.log("failure!", data); + const connection = connections.get(data.id); + const responseElement = connection.responseElement; + if (connection.responseText != '') { + connection.responseText = '\n---------\n' + connection.responseText; + } + connection.responseText = data.body + connection.responseText; + connection.responseText = "FAILURE:\n" + connection.responseText; + responseElement.value = connection.responseText; + } + } + } + + function info(message) { + clearTimeout(clearMessageTimeout); + const statusText = document.getElementById('status-text') + statusText.innerText = message; + const status = document.getElementById('status-info') + status.style.display = 'block'; + clearMessageTimeout = setTimeout(() => status.style.display = 'none', 2000); + } + $(document).ready(function(){ if (!ideKnown()) { return; @@ -17,31 +118,63 @@ $(this).addClass("app-class"); } }); - + $(".app-class").on("click", function() { openInIDE($(this).text()); }); + connect(); }); - - function sendTestRequest(serviceName, methodName) { + + function disconnect(serviceName, methodName, methodType) { + const queryIdentifier = `$\{serviceName}#$\{methodName}`; + const connection = Array.from(connections.values()).find(conn => conn.queryIdentifier == queryIdentifier); + connections.delete(connection.id); + const request = { + id: connection.id, + command: 'DISCONNECT' + } + hideConnected(connection); + grpcWS.send(JSON.stringify(request)); + } + + function sendTestRequest(serviceName, methodName, methodType) { + const queryIdentifier = `$\{serviceName}#$\{methodName}`; + var connection = Array.from(connections.values()).find(conn => conn.queryIdentifier == queryIdentifier); const testRequest = document.getElementById(serviceName + "/" + methodName + "_request"); - $.ajax({ - type: "POST", - url: "test?serviceName=" + encodeURIComponent(serviceName) + "&methodName=" + encodeURIComponent(methodName), - contentType: "application/json", - data: testRequest.value, - success: function (data) { - const testResponse = document.getElementById(serviceName + "/" + methodName + "_response"); - testResponse.value = JSON.stringify(data); - } - }); + if (!connection || methodType == 'UNARY') { + requestId ++; + connection = { + id: requestId, + queryIdentifier: queryIdentifier, + responseElement: document.getElementById(serviceName + "/" + methodName + "_response"), + responseText: '', + unary: methodType == 'UNARY' + }; + connections.set(requestId, connection); + } + + const request = { + serviceName: serviceName, + methodName: methodName, + id: requestId, + content: testRequest.value + }; + + grpcWS.send(JSON.stringify(request)); + + if (methodType != 'UNARY') { + showConnected(connection); + } } - + {/script} {#breadcrumbs} Services{/breadcrumbs} {#title}{info:grpcServices.get(currentRequest.params.get('name')).name}{/title} {#body} - +
+ +
+ {#let service=info:grpcServices.get(currentRequest.params.get('name'))}

{#when service.status} @@ -57,30 +190,48 @@

Implemented by: {service.serviceClass}
- + {#for method in service.methodsWithPrototypes}
-

{method.type} {method.bareMethodName}

+
+
+
+

{method.type} {method.bareMethodName}

+
+
+ {#when method.type} + {#is in BIDI_STREAMING CLIENT_STREAMING} + + {#is SERVER_STREAMING} + + {/when} +
+
+
{#if method.isTestable} - {#when method.type} - {#is UNARY} -
-
-
- -
- -
-
- -
-
-
- {/when} +
+
+
+ +
+ + +
+
+ +
+
+
{/if} {/for} - + {/let} - + {/body} {/include} \ No newline at end of file diff --git a/extensions/grpc/deployment/src/test/java/io/quarkus/grpc/devconsole/DevConsoleUnaryMethodTest.java b/extensions/grpc/deployment/src/test/java/io/quarkus/grpc/devconsole/DevConsoleUnaryMethodTest.java index 926a5650cf2c2..10b37d2d00e68 100644 --- a/extensions/grpc/deployment/src/test/java/io/quarkus/grpc/devconsole/DevConsoleUnaryMethodTest.java +++ b/extensions/grpc/deployment/src/test/java/io/quarkus/grpc/devconsole/DevConsoleUnaryMethodTest.java @@ -1,6 +1,15 @@ package io.quarkus.grpc.devconsole; -import org.hamcrest.Matchers; +import static org.assertj.core.api.Assertions.assertThat; +import static org.awaitility.Awaitility.await; + +import java.util.List; +import java.util.Optional; +import java.util.concurrent.CopyOnWriteArrayList; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; + +import org.jboss.logging.Logger; import org.jboss.shrinkwrap.api.ShrinkWrap; import org.jboss.shrinkwrap.api.spec.JavaArchive; import org.junit.jupiter.api.Test; @@ -9,23 +18,55 @@ import io.grpc.examples.helloworld.MutinyGreeterGrpc; import io.quarkus.grpc.server.services.MutinyHelloService; import io.quarkus.test.QuarkusDevModeTest; -import io.restassured.RestAssured; +import io.vertx.core.Vertx; +import io.vertx.core.http.HttpClient; +import io.vertx.core.http.WebSocket; public class DevConsoleUnaryMethodTest { + private static final Logger log = Logger.getLogger(DevConsoleUnaryMethodTest.class); + @RegisterExtension static final QuarkusDevModeTest config = new QuarkusDevModeTest() .setArchiveProducer(() -> ShrinkWrap.create(JavaArchive.class).addPackage(MutinyGreeterGrpc.class.getPackage()) .addClass(MutinyHelloService.class)); @Test - public void testUnaryMethodCall() { - RestAssured.with().body("{\n\"name\": \"Martin\"}") - .post("q/dev/io.quarkus.quarkus-grpc/test?serviceName=helloworld.Greeter&methodName=SayHello") - .then() - .statusCode(200) - .body(Matchers.containsString("Hello Martin")); + public void websocketTest() throws Exception { + Vertx vertx = Vertx.vertx(); + + try { + List incomingMessages = new CopyOnWriteArrayList<>(); + HttpClient client = vertx.createHttpClient(); + + client.webSocket(8080, "localhost", "/q/dev/grpc-test", result -> { + if (result.failed()) { + log.error("failure making a web socket connection", result.cause()); + return; + } + WebSocket webSocket = result.result(); + webSocket.handler(buffer -> incomingMessages.add(buffer.toString())); + webSocket + .writeTextMessage("{\"id\": 123, \"serviceName\": \"helloworld.Greeter\",\"methodName\": \"SayHello\"" + + ", \"content\": \"{\\\"name\\\": \\\"Martin\\\"}\"}"); + }); + + await().atMost(5, TimeUnit.SECONDS) + .until(() -> incomingMessages.size() > 0); + + assertThat(incomingMessages).hasSize(2); + + Optional payloadMessage = incomingMessages.stream().filter(msg -> msg.contains("PAYLOAD")).findFirst(); + assertThat(payloadMessage).isNotEmpty(); + assertThat(payloadMessage.get()).contains("Hello Martin"); + } finally { + CountDownLatch latch = new CountDownLatch(1); + vertx.close(whatever -> latch.countDown()); + if (!latch.await(30, TimeUnit.SECONDS)) { + log.warn("Waiting for the test vertx instance to stop failed"); + } + } } } diff --git a/extensions/grpc/runtime/pom.xml b/extensions/grpc/runtime/pom.xml index 1f3441fbbfa2e..d903d623e27e6 100644 --- a/extensions/grpc/runtime/pom.xml +++ b/extensions/grpc/runtime/pom.xml @@ -29,6 +29,12 @@ io.quarkus quarkus-arc + + + + io.quarkus + quarkus-vertx-http + io.quarkus quarkus-grpc-stubs diff --git a/extensions/grpc/runtime/src/main/java/io/quarkus/grpc/runtime/GrpcServerRecorder.java b/extensions/grpc/runtime/src/main/java/io/quarkus/grpc/runtime/GrpcServerRecorder.java index 287eb3a0593d0..d9fa84bcc36de 100644 --- a/extensions/grpc/runtime/src/main/java/io/quarkus/grpc/runtime/GrpcServerRecorder.java +++ b/extensions/grpc/runtime/src/main/java/io/quarkus/grpc/runtime/GrpcServerRecorder.java @@ -98,7 +98,7 @@ public void initializeGrpcServer(RuntimeValue vertxSupplier, if (GrpcServerReloader.getServer() == null) { devModeStart(grpcContainer, vertx, configuration, shutdown, launchMode); } else { - devModeReload(grpcContainer, vertx, configuration); + devModeReload(grpcContainer, vertx, configuration, shutdown); } } else { prodStart(grpcContainer, vertx, configuration, launchMode); @@ -269,7 +269,8 @@ public String getImplementationClassName() { } } - private void devModeReload(GrpcContainer grpcContainer, Vertx vertx, GrpcServerConfiguration configuration) { + private void devModeReload(GrpcContainer grpcContainer, Vertx vertx, GrpcServerConfiguration configuration, + ShutdownContext shutdown) { List services = collectServiceDefinitions(grpcContainer.getServices()); List definitions = new ArrayList<>(); @@ -299,6 +300,14 @@ private void devModeReload(GrpcContainer grpcContainer, Vertx vertx, GrpcServerC initHealthStorage(); GrpcServerReloader.reinitialize(servicesWithInterceptors, methods, grpcContainer.getSortedInterceptors()); + + shutdown.addShutdownTask( + new Runnable() { // NOSONAR + @Override + public void run() { + GrpcServerReloader.reset(); + } + }); } public static int getVerticleCount() { diff --git a/extensions/grpc/runtime/src/main/java/io/quarkus/grpc/runtime/devmode/GrpcDevConsoleRecorder.java b/extensions/grpc/runtime/src/main/java/io/quarkus/grpc/runtime/devmode/GrpcDevConsoleRecorder.java index 812d7f722b911..8b9d823d9d0ae 100644 --- a/extensions/grpc/runtime/src/main/java/io/quarkus/grpc/runtime/devmode/GrpcDevConsoleRecorder.java +++ b/extensions/grpc/runtime/src/main/java/io/quarkus/grpc/runtime/devmode/GrpcDevConsoleRecorder.java @@ -3,15 +3,24 @@ import java.util.HashMap; import java.util.Map; +import org.jboss.logging.Logger; + import io.quarkus.arc.Arc; import io.quarkus.arc.InstanceHandle; import io.quarkus.dev.console.DevConsoleManager; +import io.quarkus.dev.testing.GrpcWebSocketProxy; import io.quarkus.grpc.runtime.config.GrpcConfiguration; import io.quarkus.grpc.runtime.config.GrpcServerConfiguration; import io.quarkus.runtime.annotations.Recorder; +import io.vertx.core.AsyncResult; +import io.vertx.core.Handler; +import io.vertx.core.buffer.Buffer; +import io.vertx.core.http.ServerWebSocket; +import io.vertx.ext.web.RoutingContext; @Recorder public class GrpcDevConsoleRecorder { + private static final Logger log = Logger.getLogger(GrpcDevConsoleRecorder.class); public void setServerConfiguration() { try (InstanceHandle config = Arc.container().instance(GrpcConfiguration.class)) { @@ -24,4 +33,47 @@ public void setServerConfiguration() { } } + public Handler handler() { + return new Handler() { + @Override + public void handle(RoutingContext context) { + context.request().toWebSocket(webSocket -> { + if (webSocket.failed()) { + log.error("failed to connect web socket", webSocket.cause()); + } else { + ServerWebSocket serverWebSocket = webSocket.result(); + Integer socketId = GrpcWebSocketProxy.addWebSocket( + message -> serverWebSocket.writeTextMessage(message) + .onFailure(e -> log + .info("failed to send back message to the gRPC Dev Console WebSocket", e)), + runnable -> { + if (!serverWebSocket.isClosed()) { + serverWebSocket.close(new Handler>() { + @Override + public void handle(AsyncResult event) { + runnable.run(); + } + }); + } else { + runnable.run(); + } + }); + + if (socketId == null) { + log.error("No gRPC dev console WebSocketListener"); + serverWebSocket.close(); + return; + } + serverWebSocket.closeHandler(ignored -> GrpcWebSocketProxy.closeWebSocket(socketId)); + serverWebSocket.handler(new Handler() { + @Override + public void handle(Buffer event) { + GrpcWebSocketProxy.addMessage(socketId, event.toString()); + } + }); + } + }); + } + }; + } } diff --git a/extensions/grpc/runtime/src/main/java/io/quarkus/grpc/runtime/devmode/GrpcServerReloader.java b/extensions/grpc/runtime/src/main/java/io/quarkus/grpc/runtime/devmode/GrpcServerReloader.java index a601059087cb4..6c56b683b0f64 100644 --- a/extensions/grpc/runtime/src/main/java/io/quarkus/grpc/runtime/devmode/GrpcServerReloader.java +++ b/extensions/grpc/runtime/src/main/java/io/quarkus/grpc/runtime/devmode/GrpcServerReloader.java @@ -8,6 +8,7 @@ import io.grpc.ServerMethodDefinition; import io.grpc.ServerServiceDefinition; import io.grpc.internal.ServerImpl; +import io.quarkus.dev.testing.GrpcWebSocketProxy; import io.quarkus.grpc.runtime.ServerCalls; import io.quarkus.grpc.runtime.StreamCollector; import io.quarkus.runtime.LaunchMode; @@ -52,6 +53,7 @@ public static void reset() { throw new IllegalStateException("Non-dev mode streams collector used in development mode"); } ((DevModeStreamsCollector) streamCollector).shutdown(); + GrpcWebSocketProxy.closeAll(); } catch (NoSuchFieldException | IllegalAccessException e) { throw new IllegalStateException("Unable to reinitialize gRPC server", e); } diff --git a/extensions/grpc/runtime/src/main/java/io/quarkus/grpc/runtime/devmode/GrpcServices.java b/extensions/grpc/runtime/src/main/java/io/quarkus/grpc/runtime/devmode/GrpcServices.java index 9227e86e6aa21..ae4a66fa982a8 100644 --- a/extensions/grpc/runtime/src/main/java/io/quarkus/grpc/runtime/devmode/GrpcServices.java +++ b/extensions/grpc/runtime/src/main/java/io/quarkus/grpc/runtime/devmode/GrpcServices.java @@ -146,10 +146,8 @@ public boolean hasPrototype() { } public boolean isTestable() { - if (configuration.server.ssl.certificate.isPresent() || configuration.server.ssl.keyStore.isPresent()) { - return false; - } - return MethodType.UNARY == getType(); + return !configuration.server.ssl.certificate.isPresent() + && !configuration.server.ssl.keyStore.isPresent(); } public String getPrototype() { diff --git a/extensions/grpc/runtime/src/main/java/io/quarkus/grpc/runtime/supports/IOThreadClientInterceptor.java b/extensions/grpc/runtime/src/main/java/io/quarkus/grpc/runtime/supports/IOThreadClientInterceptor.java index b4a44661f58d3..625685af5b744 100644 --- a/extensions/grpc/runtime/src/main/java/io/quarkus/grpc/runtime/supports/IOThreadClientInterceptor.java +++ b/extensions/grpc/runtime/src/main/java/io/quarkus/grpc/runtime/supports/IOThreadClientInterceptor.java @@ -1,10 +1,5 @@ package io.quarkus.grpc.runtime.supports; -import java.util.concurrent.CompletableFuture; -import java.util.concurrent.ExecutionException; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.TimeoutException; - import javax.enterprise.context.ApplicationScoped; import javax.enterprise.inject.spi.Prioritized; @@ -20,6 +15,9 @@ import io.vertx.core.Context; import io.vertx.core.Vertx; +/** + * gRPC Client emissions should be on the event loop if the subscription is executed on the event loop + */ @ApplicationScoped public class IOThreadClientInterceptor implements ClientInterceptor, Prioritized { @@ -27,6 +25,7 @@ public class IOThreadClientInterceptor implements ClientInterceptor, Prioritized public ClientCall interceptCall(MethodDescriptor method, CallOptions callOptions, Channel next) { + boolean isOnEventLoop = Context.isOnEventLoopThread(); Context context = Vertx.currentContext(); return new ForwardingClientCall.SimpleForwardingClientCall(next.newCall(method, callOptions)) { @@ -34,20 +33,11 @@ public ClientCall interceptCall(MethodDescriptor responseListener, Metadata headers) { super.start(new ForwardingClientCallListener.SimpleForwardingClientCallListener(responseListener) { - private volatile CompletableFuture onMessageCompletion; @Override public void onMessage(RespT message) { - if (context != null) { - onMessageCompletion = new CompletableFuture<>(); - context.runOnContext(unused -> { - try { - super.onMessage(message); - onMessageCompletion.complete(null); - } catch (Throwable any) { - onMessageCompletion.completeExceptionally(any); - } - }); + if (isOnEventLoop) { + context.runOnContext(unused -> super.onMessage(message)); } else { super.onMessage(message); } @@ -55,15 +45,8 @@ public void onMessage(RespT message) { @Override public void onClose(Status status, Metadata trailers) { - if (onMessageCompletion != null && !Context.isOnEventLoopThread()) { - try { - onMessageCompletion.get(60, TimeUnit.SECONDS); - } catch (InterruptedException | ExecutionException e) { - throw new RuntimeException("`onMessage` failed or interrupted", e); - } catch (TimeoutException e) { - throw new RuntimeException("`onMessage` did not complete in 60 seconds"); - } - super.onClose(status, trailers); + if (isOnEventLoop) { + context.runOnContext(unused -> super.onClose(status, trailers)); } else { super.onClose(status, trailers); } diff --git a/extensions/reactive-messaging-http/deployment/src/main/java/io/quarkus/reactivemessaging/http/deployment/ReactiveHttpProcessor.java b/extensions/reactive-messaging-http/deployment/src/main/java/io/quarkus/reactivemessaging/http/deployment/ReactiveHttpProcessor.java index f0e0e746a3001..6d4a1721204f3 100644 --- a/extensions/reactive-messaging-http/deployment/src/main/java/io/quarkus/reactivemessaging/http/deployment/ReactiveHttpProcessor.java +++ b/extensions/reactive-messaging-http/deployment/src/main/java/io/quarkus/reactivemessaging/http/deployment/ReactiveHttpProcessor.java @@ -107,7 +107,7 @@ void registerHttpConnector(BuildProducer beanProducer, }); } if (!wsConfigs.isEmpty()) { - Handler handler = recorder.createWebSocketeHandler(); + Handler handler = recorder.createWebSocketHandler(); wsConfigs.stream() .map(WebSocketStreamConfig::path) diff --git a/extensions/reactive-messaging-http/runtime/src/main/java/io/quarkus/reactivemessaging/http/runtime/ReactiveHttpRecorder.java b/extensions/reactive-messaging-http/runtime/src/main/java/io/quarkus/reactivemessaging/http/runtime/ReactiveHttpRecorder.java index 7ae54238a128b..bd8bb135d36b5 100644 --- a/extensions/reactive-messaging-http/runtime/src/main/java/io/quarkus/reactivemessaging/http/runtime/ReactiveHttpRecorder.java +++ b/extensions/reactive-messaging-http/runtime/src/main/java/io/quarkus/reactivemessaging/http/runtime/ReactiveHttpRecorder.java @@ -8,7 +8,7 @@ @Recorder public class ReactiveHttpRecorder { - public Handler createWebSocketeHandler() { + public Handler createWebSocketHandler() { ReactiveWebSocketHandlerBean bean = Arc.container().instance(ReactiveWebSocketHandlerBean.class).get(); return new ReactiveWebSocketHandler(bean); } diff --git a/extensions/vertx-http/deployment/src/main/java/io/quarkus/vertx/http/deployment/devmode/console/DevConsoleProcessor.java b/extensions/vertx-http/deployment/src/main/java/io/quarkus/vertx/http/deployment/devmode/console/DevConsoleProcessor.java index 7d0b351178e10..39c1e3ab7dc5d 100644 --- a/extensions/vertx-http/deployment/src/main/java/io/quarkus/vertx/http/deployment/devmode/console/DevConsoleProcessor.java +++ b/extensions/vertx-http/deployment/src/main/java/io/quarkus/vertx/http/deployment/devmode/console/DevConsoleProcessor.java @@ -319,8 +319,11 @@ public ServiceStartBuildItem setupDeploymentSideHandling(List groupAndArtifact = i.groupIdAndArtifactId(curateOutcomeBuildItem); // deployment side handling if (i.isDeploymentSide()) { - Route route = router.route(HttpMethod.valueOf(i.getMethod()), - "/" + groupAndArtifact.getKey() + "." + groupAndArtifact.getValue() + "/" + i.getPath()); + Route route = router + .route("/" + groupAndArtifact.getKey() + "." + groupAndArtifact.getValue() + "/" + i.getPath()); + if (i.getMethod() != null) { + route = route.method(HttpMethod.valueOf(i.getMethod())); + } if (i.isBodyHandlerRequired()) { route.handler(BodyHandler.create()); }