Skip to content

Commit

Permalink
quarkus grpc - multiple server instances
Browse files Browse the repository at this point in the history
  • Loading branch information
michalszynkiewicz committed Jun 25, 2020
1 parent 12bcdca commit 3e88edd
Show file tree
Hide file tree
Showing 10 changed files with 300 additions and 135 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@ ServiceStartBuildItem build(GrpcServerRecorder recorder, GrpcConfiguration confi
ShutdownContextBuildItem shutdown, List<BindableServiceBuildItem> bindables,
VertxBuildItem vertx) {
if (!bindables.isEmpty()) {
recorder.initializeGrpcServer(vertx.getVertx(), config, shutdown);
recorder.initializeGrpcServer(vertx.getVertx(), config);
return new ServiceStartBuildItem(GRPC_SERVER);
}
return null;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
package io.quarkus.grpc.server.scaling;

import java.util.ArrayList;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;

import javax.enterprise.context.ApplicationScoped;
import javax.inject.Inject;

import io.grpc.examples.helloworld.GreeterGrpc;
import io.grpc.examples.helloworld.HelloRequest;
import io.quarkus.grpc.runtime.annotations.GrpcService;

public class BasicScalingTest {

@Inject
GreeterConsumer greetingConsumer;

Set<String> getThreadsUsedForRequests(int requestNo) throws InterruptedException, ExecutionException, TimeoutException {
List<Callable<String>> calls = new ArrayList<>();
for (int i = 0; i < requestNo; i++) {
calls.add(() -> {
System.out.println("invoking on " + Thread.currentThread()); // mstodo remove
return greetingConsumer.invoke("foo");
});
}
List<Future<String>> results = Executors.newFixedThreadPool(requestNo)
.invokeAll(calls);

Set<String> threads = new HashSet<>();
for (Future<String> result : results) {
threads.add(result.get(10, TimeUnit.SECONDS));
}
return threads;
}

@ApplicationScoped
static class GreeterConsumer {

@Inject
@GrpcService("hello-service")
GreeterGrpc.GreeterBlockingStub service;

public String invoke(String s) {
return service.sayHello(HelloRequest.newBuilder().setName(s).build())
.getMessage();
}

}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
package io.quarkus.grpc.server.scaling;

import static org.assertj.core.api.Assertions.assertThat;

import java.util.Set;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeoutException;

import org.jboss.shrinkwrap.api.ShrinkWrap;
import org.jboss.shrinkwrap.api.spec.JavaArchive;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.RegisterExtension;

import io.grpc.examples.helloworld.GreeterGrpc;
import io.quarkus.test.QuarkusUnitTest;

public class MultipleGrpcVerticlesTest extends BasicScalingTest {
@RegisterExtension
static final QuarkusUnitTest config = new QuarkusUnitTest().setArchiveProducer(
() -> ShrinkWrap.create(JavaArchive.class)
.addPackage(GreeterGrpc.class.getPackage())
.addClass(ThreadReturningGreeterService.class))
.withConfigurationResource("multiple-instances-config.properties");

@Test
public void shouldUseMultipleThreads() throws InterruptedException, TimeoutException, ExecutionException {
Set<String> threads = getThreadsUsedForRequests(20);

assertThat(threads).hasSizeGreaterThan(1);
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
package io.quarkus.grpc.server.scaling;

import static org.assertj.core.api.Assertions.assertThat;

import java.util.Set;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeoutException;

import org.jboss.shrinkwrap.api.ShrinkWrap;
import org.jboss.shrinkwrap.api.spec.JavaArchive;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.RegisterExtension;

import io.grpc.examples.helloworld.GreeterGrpc;
import io.quarkus.test.QuarkusUnitTest;

public class SingleGrpcVerticleTest extends BasicScalingTest {

@RegisterExtension
static final QuarkusUnitTest config = new QuarkusUnitTest().setArchiveProducer(
() -> ShrinkWrap.create(JavaArchive.class)
.addPackage(GreeterGrpc.class.getPackage())
.addClass(ThreadReturningGreeterService.class))
.withConfigurationResource("single-instance-config.properties");

@Test
public void shouldUseMultipleThreads() throws InterruptedException, TimeoutException, ExecutionException {
Set<String> threads = getThreadsUsedForRequests(20);

assertThat(threads).hasSize(1);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
package io.quarkus.grpc.server.scaling;

import javax.inject.Singleton;

import io.grpc.examples.helloworld.GreeterGrpc;
import io.grpc.examples.helloworld.HelloReply;
import io.grpc.examples.helloworld.HelloRequest;
import io.grpc.stub.StreamObserver;

@Singleton
public class ThreadReturningGreeterService extends GreeterGrpc.GreeterImplBase {

@Override
public void sayHello(HelloRequest request, StreamObserver<HelloReply> responseObserver) {
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
// mstodo remove
String threadName = Thread.currentThread().getName();
System.out.println("say hello invoked on " + threadName);
responseObserver.onNext(HelloReply.newBuilder().setMessage(threadName).build());
responseObserver.onCompleted();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
quarkus.grpc.server.instances=20
quarkus.grpc.clients.hello-service.host=localhost
quarkus.grpc.clients.hello-service.keep-alive-timeout=1s
quarkus.vertx.event-loops-pool-size=20
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
quarkus.grpc.server.instances=1
quarkus.grpc.clients.hello-service.host=localhost
quarkus.grpc.clients.hello-service.keep-alive-timeout=1s
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

import io.vertx.grpc.VertxServer;

// mstodo remove
public class GrpcServerHolder {

public static volatile VertxServer server;
Expand Down
Loading

0 comments on commit 3e88edd

Please sign in to comment.