From 4c81250a07351be37a05d61db70c81314ad16d53 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Micha=C5=82=20Szynkiewicz?= Date: Mon, 31 May 2021 09:47:50 +0200 Subject: [PATCH] gRPC: fixed running client calls on event loop if needed --- .../supports/IOThreadClientInterceptor.java | 33 +++++-------------- 1 file changed, 8 insertions(+), 25 deletions(-) diff --git a/extensions/grpc/runtime/src/main/java/io/quarkus/grpc/runtime/supports/IOThreadClientInterceptor.java b/extensions/grpc/runtime/src/main/java/io/quarkus/grpc/runtime/supports/IOThreadClientInterceptor.java index b4a44661f58d3..625685af5b744 100644 --- a/extensions/grpc/runtime/src/main/java/io/quarkus/grpc/runtime/supports/IOThreadClientInterceptor.java +++ b/extensions/grpc/runtime/src/main/java/io/quarkus/grpc/runtime/supports/IOThreadClientInterceptor.java @@ -1,10 +1,5 @@ package io.quarkus.grpc.runtime.supports; -import java.util.concurrent.CompletableFuture; -import java.util.concurrent.ExecutionException; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.TimeoutException; - import javax.enterprise.context.ApplicationScoped; import javax.enterprise.inject.spi.Prioritized; @@ -20,6 +15,9 @@ import io.vertx.core.Context; import io.vertx.core.Vertx; +/** + * gRPC Client emissions should be on the event loop if the subscription is executed on the event loop + */ @ApplicationScoped public class IOThreadClientInterceptor implements ClientInterceptor, Prioritized { @@ -27,6 +25,7 @@ public class IOThreadClientInterceptor implements ClientInterceptor, Prioritized public ClientCall interceptCall(MethodDescriptor method, CallOptions callOptions, Channel next) { + boolean isOnEventLoop = Context.isOnEventLoopThread(); Context context = Vertx.currentContext(); return new ForwardingClientCall.SimpleForwardingClientCall(next.newCall(method, callOptions)) { @@ -34,20 +33,11 @@ public ClientCall interceptCall(MethodDescriptor responseListener, Metadata headers) { super.start(new ForwardingClientCallListener.SimpleForwardingClientCallListener(responseListener) { - private volatile CompletableFuture onMessageCompletion; @Override public void onMessage(RespT message) { - if (context != null) { - onMessageCompletion = new CompletableFuture<>(); - context.runOnContext(unused -> { - try { - super.onMessage(message); - onMessageCompletion.complete(null); - } catch (Throwable any) { - onMessageCompletion.completeExceptionally(any); - } - }); + if (isOnEventLoop) { + context.runOnContext(unused -> super.onMessage(message)); } else { super.onMessage(message); } @@ -55,15 +45,8 @@ public void onMessage(RespT message) { @Override public void onClose(Status status, Metadata trailers) { - if (onMessageCompletion != null && !Context.isOnEventLoopThread()) { - try { - onMessageCompletion.get(60, TimeUnit.SECONDS); - } catch (InterruptedException | ExecutionException e) { - throw new RuntimeException("`onMessage` failed or interrupted", e); - } catch (TimeoutException e) { - throw new RuntimeException("`onMessage` did not complete in 60 seconds"); - } - super.onClose(status, trailers); + if (isOnEventLoop) { + context.runOnContext(unused -> super.onClose(status, trailers)); } else { super.onClose(status, trailers); }