Skip to content

Commit

Permalink
Merge pull request #15260 from michalszynkiewicz/unblock-grpc
Browse files Browse the repository at this point in the history
gRPC blocking: fix calling grpc client within blocking service
  • Loading branch information
michalszynkiewicz authored Feb 28, 2021
2 parents 7820c02 + 1f4e20c commit 5dc1e0b
Show file tree
Hide file tree
Showing 5 changed files with 139 additions and 9 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
package io.quarkus.grpc.server;

import static org.assertj.core.api.Assertions.assertThat;

import javax.inject.Inject;

import org.jboss.shrinkwrap.api.ShrinkWrap;
import org.jboss.shrinkwrap.api.spec.JavaArchive;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.Timeout;
import org.junit.jupiter.api.extension.RegisterExtension;

import io.grpc.examples.helloworld.GreeterGrpc;
import io.grpc.examples.helloworld3.Greeter3Grpc;
import io.grpc.examples.helloworld3.HelloReply3;
import io.grpc.examples.helloworld3.HelloRequest3;
import io.quarkus.grpc.runtime.annotations.GrpcService;
import io.quarkus.grpc.server.services.GrpcCallWithinBlockingService;
import io.quarkus.grpc.server.services.HelloService;
import io.quarkus.test.QuarkusUnitTest;

public class ClientCallFromBlockingServiceTest {
@RegisterExtension
static final QuarkusUnitTest config = new QuarkusUnitTest().setArchiveProducer(
() -> ShrinkWrap.create(JavaArchive.class)
.addPackage(Greeter3Grpc.class.getPackage())
.addPackage(GreeterGrpc.class.getPackage())
.addClass(HelloService.class)
.addClass(GrpcCallWithinBlockingService.class))
.withConfigurationResource("call-from-blocking-service.properties");

@Inject
@GrpcService("service3")
Greeter3Grpc.Greeter3BlockingStub greeter3Client;

@Test
@Timeout(5)
void shouldWorkMultipleTimes() {
for (int i = 0; i < 20; i++) {
HelloReply3 reply = greeter3Client.sayHello(HelloRequest3.newBuilder().setName("Slim").build());
assertThat(reply.getMessage()).isEqualTo("response:Hello Slim");
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
package io.quarkus.grpc.server.services;

import javax.inject.Inject;
import javax.inject.Singleton;

import io.grpc.examples.helloworld.GreeterGrpc;
import io.grpc.examples.helloworld.HelloReply;
import io.grpc.examples.helloworld.HelloRequest;
import io.grpc.examples.helloworld3.Greeter3Grpc;
import io.grpc.examples.helloworld3.HelloReply3;
import io.grpc.examples.helloworld3.HelloRequest3;
import io.grpc.stub.StreamObserver;
import io.quarkus.grpc.runtime.annotations.GrpcService;
import io.smallrye.common.annotation.Blocking;

@Singleton
public class GrpcCallWithinBlockingService extends Greeter3Grpc.Greeter3ImplBase {

@Inject
@GrpcService("greeter")
GreeterGrpc.GreeterBlockingStub greeter;

@Override
@Blocking
public void sayHello(HelloRequest3 request, StreamObserver<HelloReply3> responseObserver) {
HelloReply reply = greeter.sayHello(HelloRequest.newBuilder().setName(request.getName()).build());
responseObserver.onNext(HelloReply3.newBuilder().setMessage("response:" + reply.getMessage()).build());
responseObserver.onCompleted();
}
}
24 changes: 24 additions & 0 deletions extensions/grpc/deployment/src/test/proto/helloworld3.proto
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
syntax = "proto3";

option java_multiple_files = true;
option java_package = "io.grpc.examples.helloworld3";
option java_outer_classname = "HelloWorldProto3";
option objc_class_prefix = "HLW";

package helloworld;

// The greeting service definition.
service Greeter3 {
// Sends a greeting
rpc SayHello (HelloRequest3) returns (HelloReply3) {}
}

// The request message containing the user's name.
message HelloRequest3 {
string name = 1;
}

// The response message containing the greetings
message HelloReply3 {
string message = 1;
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
quarkus.grpc.clients.greeter.host=localhost
quarkus.grpc.clients.service3.host=localhost
Original file line number Diff line number Diff line change
@@ -1,9 +1,22 @@
package io.quarkus.grpc.runtime.supports;

import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;

import javax.enterprise.context.ApplicationScoped;
import javax.enterprise.inject.spi.Prioritized;

import io.grpc.*;
import io.grpc.CallOptions;
import io.grpc.Channel;
import io.grpc.ClientCall;
import io.grpc.ClientInterceptor;
import io.grpc.ForwardingClientCall;
import io.grpc.ForwardingClientCallListener;
import io.grpc.Metadata;
import io.grpc.MethodDescriptor;
import io.grpc.Status;
import io.vertx.core.Context;
import io.vertx.core.Vertx;

Expand All @@ -21,21 +34,38 @@ public <ReqT, RespT> ClientCall<ReqT, RespT> interceptCall(MethodDescriptor<ReqT
@Override
public void start(Listener<RespT> responseListener, Metadata headers) {
super.start(new ForwardingClientCallListener.SimpleForwardingClientCallListener<RespT>(responseListener) {
private volatile CompletableFuture<Void> onMessageCompletion;

@Override
public void onMessage(RespT message) {
runInContextIfNeed(() -> super.onMessage(message));
if (context != null) {
onMessageCompletion = new CompletableFuture<>();
context.runOnContext(unused -> {
try {
super.onMessage(message);
onMessageCompletion.complete(null);
} catch (Throwable any) {
onMessageCompletion.completeExceptionally(any);
}
});
} else {
super.onMessage(message);
}
}

@Override
public void onClose(Status status, Metadata trailers) {
runInContextIfNeed(() -> super.onClose(status, trailers));
}

private void runInContextIfNeed(Runnable fun) {
if (context != null) {
context.runOnContext(unused -> fun.run());
if (onMessageCompletion != null && !Context.isOnEventLoopThread()) {
try {
onMessageCompletion.get(60, TimeUnit.SECONDS);
} catch (InterruptedException | ExecutionException e) {
throw new RuntimeException("`onMessage` failed or interrupted", e);
} catch (TimeoutException e) {
throw new RuntimeException("`onMessage` did not complete in 60 seconds");
}
super.onClose(status, trailers);
} else {
fun.run();
super.onClose(status, trailers);
}
}
}, headers);
Expand Down

0 comments on commit 5dc1e0b

Please sign in to comment.