Skip to content

Commit

Permalink
quarkus grpc - multiple server instances
Browse files Browse the repository at this point in the history
  • Loading branch information
michalszynkiewicz committed Jun 25, 2020
1 parent 12bcdca commit f348d1e
Show file tree
Hide file tree
Showing 12 changed files with 291 additions and 148 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@ ServiceStartBuildItem build(GrpcServerRecorder recorder, GrpcConfiguration confi
ShutdownContextBuildItem shutdown, List<BindableServiceBuildItem> bindables,
VertxBuildItem vertx) {
if (!bindables.isEmpty()) {
recorder.initializeGrpcServer(vertx.getVertx(), config, shutdown);
recorder.initializeGrpcServer(vertx.getVertx(), config);
return new ServiceStartBuildItem(GRPC_SERVER);
}
return null;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@

import io.grpc.BindableService;
import io.grpc.ServerServiceDefinition;
import io.quarkus.grpc.runtime.GrpcServerHolder;
import io.quarkus.grpc.runtime.GrpcServerRecorder;
import io.quarkus.test.QuarkusUnitTest;

public class GrpcServerTest {
Expand All @@ -36,7 +36,7 @@ public void test() {
assertThat(services.stream().collect(Collectors.toList())).hasSize(2)
.anySatisfy(b -> assertThat(b.bindService().getServiceDescriptor().getName()).isEqualTo("service1"))
.anySatisfy(b -> assertThat(b.bindService().getServiceDescriptor().getName()).isEqualTo("service2"));
assertThat(GrpcServerHolder.server.getPort()).isEqualTo(9000);
assertThat(GrpcServerRecorder.getVerticleCount()).isGreaterThan(0);
}

@Singleton
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.RegisterExtension;

import io.quarkus.grpc.runtime.GrpcServerHolder;
import io.quarkus.grpc.runtime.GrpcServerRecorder;
import io.quarkus.test.QuarkusUnitTest;

/**
Expand All @@ -21,6 +21,6 @@ public class NoServerTest {

@Test
public void test() {
assertThat(GrpcServerHolder.server).isNull();
assertThat(GrpcServerRecorder.getVerticleCount()).isZero();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
package io.quarkus.grpc.server.scaling;

import static org.assertj.core.api.Assertions.assertThat;

import java.util.Set;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeoutException;

import org.jboss.shrinkwrap.api.ShrinkWrap;
import org.jboss.shrinkwrap.api.spec.JavaArchive;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.RegisterExtension;

import io.grpc.examples.helloworld.GreeterGrpc;
import io.quarkus.test.QuarkusUnitTest;

public class MultipleGrpcVerticlesTest extends ScalingTestBase {
@RegisterExtension
static final QuarkusUnitTest config = new QuarkusUnitTest().setArchiveProducer(
() -> ShrinkWrap.create(JavaArchive.class)
.addPackage(GreeterGrpc.class.getPackage())
.addClass(ThreadReturningGreeterService.class))
.withConfigurationResource("multiple-instances-config.properties");

@Test
public void shouldUseMultipleThreads() throws InterruptedException, TimeoutException, ExecutionException {
Set<String> threads = getThreadsUsedFor100Requests();

assertThat(threads).hasSizeGreaterThan(1);
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
package io.quarkus.grpc.server.scaling;

import java.util.ArrayList;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;

import io.grpc.Channel;
import io.grpc.ManagedChannelBuilder;
import io.grpc.examples.helloworld.GreeterGrpc;
import io.grpc.examples.helloworld.HelloReply;
import io.grpc.examples.helloworld.HelloRequest;

public class ScalingTestBase {

Set<String> getThreadsUsedFor100Requests() throws InterruptedException, ExecutionException, TimeoutException {
int requestNo = 100;
List<Callable<String>> calls = new ArrayList<>();
for (int i = 0; i < requestNo; i++) {
calls.add(() -> {
Channel channel = ManagedChannelBuilder.forAddress("localhost", 9000)
.usePlaintext()
.build();
HelloReply reply = GreeterGrpc.newBlockingStub(channel)
.sayHello(HelloRequest.newBuilder().setName("foo").build());
return reply.getMessage();
});
}
List<Future<String>> results = Executors.newFixedThreadPool(requestNo)
.invokeAll(calls);

Set<String> threads = new HashSet<>();
for (Future<String> result : results) {
threads.add(result.get(10, TimeUnit.SECONDS));
}
return threads;
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
package io.quarkus.grpc.server.scaling;

import static org.assertj.core.api.Assertions.assertThat;

import java.util.Set;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeoutException;

import org.jboss.shrinkwrap.api.ShrinkWrap;
import org.jboss.shrinkwrap.api.spec.JavaArchive;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.RegisterExtension;

import io.grpc.examples.helloworld.GreeterGrpc;
import io.quarkus.test.QuarkusUnitTest;

public class SingleGrpcVerticleTest extends ScalingTestBase {

@RegisterExtension
static final QuarkusUnitTest config = new QuarkusUnitTest().setArchiveProducer(
() -> ShrinkWrap.create(JavaArchive.class)
.addPackage(GreeterGrpc.class.getPackage())
.addClass(ThreadReturningGreeterService.class))
.withConfigurationResource("single-instance-config.properties");

@Test
public void shouldUseMultipleThreads() throws InterruptedException, TimeoutException, ExecutionException {
Set<String> threads = getThreadsUsedFor100Requests();

assertThat(threads).hasSize(1);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
package io.quarkus.grpc.server.scaling;

import javax.inject.Singleton;

import io.grpc.examples.helloworld.GreeterGrpc;
import io.grpc.examples.helloworld.HelloReply;
import io.grpc.examples.helloworld.HelloRequest;
import io.grpc.stub.StreamObserver;

@Singleton
public class ThreadReturningGreeterService extends GreeterGrpc.GreeterImplBase {

@Override
public void sayHello(HelloRequest request, StreamObserver<HelloReply> responseObserver) {
String threadName = Thread.currentThread().getName();
responseObserver.onNext(HelloReply.newBuilder().setMessage(threadName).build());
responseObserver.onCompleted();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
quarkus.grpc.server.instances=3
quarkus.grpc.clients.hello-service.host=localhost
quarkus.grpc.clients.hello-service.keep-alive-timeout=1s
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
quarkus.grpc.server.instances=1
quarkus.grpc.clients.hello-service.host=localhost
quarkus.grpc.clients.hello-service.keep-alive-timeout=1s

This file was deleted.

Loading

0 comments on commit f348d1e

Please sign in to comment.