Skip to content

Commit

Permalink
gRPC: fixed running client calls on event loop if needed
Browse files Browse the repository at this point in the history
  • Loading branch information
michalszynkiewicz committed May 31, 2021
1 parent f01abe4 commit 4c81250
Showing 1 changed file with 8 additions and 25 deletions.
Original file line number Diff line number Diff line change
@@ -1,10 +1,5 @@
package io.quarkus.grpc.runtime.supports;

import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;

import javax.enterprise.context.ApplicationScoped;
import javax.enterprise.inject.spi.Prioritized;

Expand All @@ -20,50 +15,38 @@
import io.vertx.core.Context;
import io.vertx.core.Vertx;

/**
* gRPC Client emissions should be on the event loop if the subscription is executed on the event loop
*/
@ApplicationScoped
public class IOThreadClientInterceptor implements ClientInterceptor, Prioritized {

@Override
public <ReqT, RespT> ClientCall<ReqT, RespT> interceptCall(MethodDescriptor<ReqT, RespT> method,
CallOptions callOptions, Channel next) {

boolean isOnEventLoop = Context.isOnEventLoopThread();
Context context = Vertx.currentContext();

return new ForwardingClientCall.SimpleForwardingClientCall<ReqT, RespT>(next.newCall(method, callOptions)) {

@Override
public void start(Listener<RespT> responseListener, Metadata headers) {
super.start(new ForwardingClientCallListener.SimpleForwardingClientCallListener<RespT>(responseListener) {
private volatile CompletableFuture<Void> onMessageCompletion;

@Override
public void onMessage(RespT message) {
if (context != null) {
onMessageCompletion = new CompletableFuture<>();
context.runOnContext(unused -> {
try {
super.onMessage(message);
onMessageCompletion.complete(null);
} catch (Throwable any) {
onMessageCompletion.completeExceptionally(any);
}
});
if (isOnEventLoop) {
context.runOnContext(unused -> super.onMessage(message));
} else {
super.onMessage(message);
}
}

@Override
public void onClose(Status status, Metadata trailers) {
if (onMessageCompletion != null && !Context.isOnEventLoopThread()) {
try {
onMessageCompletion.get(60, TimeUnit.SECONDS);
} catch (InterruptedException | ExecutionException e) {
throw new RuntimeException("`onMessage` failed or interrupted", e);
} catch (TimeoutException e) {
throw new RuntimeException("`onMessage` did not complete in 60 seconds");
}
super.onClose(status, trailers);
if (isOnEventLoop) {
context.runOnContext(unused -> super.onClose(status, trailers));
} else {
super.onClose(status, trailers);
}
Expand Down

0 comments on commit 4c81250

Please sign in to comment.