diff --git a/extensions/resteasy-reactive/rest-client-reactive/deployment/src/test/java/io/quarkus/rest/client/reactive/ConnectionPoolSizeTest.java b/extensions/resteasy-reactive/rest-client-reactive/deployment/src/test/java/io/quarkus/rest/client/reactive/ConnectionPoolSizeTest.java new file mode 100644 index 0000000000000..bf318df4cb1f6 --- /dev/null +++ b/extensions/resteasy-reactive/rest-client-reactive/deployment/src/test/java/io/quarkus/rest/client/reactive/ConnectionPoolSizeTest.java @@ -0,0 +1,122 @@ +package io.quarkus.rest.client.reactive; + +import static org.assertj.core.api.Assertions.assertThat; + +import java.net.URI; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.TimeUnit; + +import javax.inject.Inject; +import javax.ws.rs.GET; +import javax.ws.rs.Path; + +import org.eclipse.microprofile.rest.client.RestClientBuilder; +import org.jboss.resteasy.reactive.client.api.QuarkusRestClientProperties; +import org.jboss.shrinkwrap.api.ShrinkWrap; +import org.jboss.shrinkwrap.api.spec.JavaArchive; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.Timeout; +import org.junit.jupiter.api.extension.RegisterExtension; + +import io.quarkus.rest.client.reactive.headers.ClientHeaderParamFromPropertyTest; +import io.quarkus.test.QuarkusUnitTest; +import io.quarkus.test.common.http.TestHTTPResource; +import io.smallrye.mutiny.Uni; +import io.vertx.core.Vertx; + +public class ConnectionPoolSizeTest { + @RegisterExtension + static final QuarkusUnitTest config = new QuarkusUnitTest() + .setArchiveProducer( + () -> ShrinkWrap.create(JavaArchive.class).addClasses(ClientHeaderParamFromPropertyTest.Client.class)); + + @TestHTTPResource + URI uri; + + @Test + void shouldPerform20CallsWithoutQueuing() throws InterruptedException { + Client client = RestClientBuilder.newBuilder().baseUri(uri) + .build(Client.class); + + CountDownLatch latch = executeCalls(client, 20); + + assertThat(latch.await(2, TimeUnit.SECONDS)) + .overridingErrorMessage("Failed to do 20 calls in 2 seconds") + .isTrue(); + } + + @Test + @Timeout(5) + void shouldPerform21CallsWithQueuing() throws InterruptedException { + Client client = RestClientBuilder.newBuilder().baseUri(uri) + .build(Client.class); + + long start = System.currentTimeMillis(); + CountDownLatch latch = executeCalls(client, 21); + latch.await(); + + assertThat(System.currentTimeMillis() - start).isLessThan(3000).isGreaterThanOrEqualTo(2000); + } + + @Test + @Timeout(5) + void shouldPerform5CallsWithoutQueueingOnQueue6() throws InterruptedException { + Client client = RestClientBuilder.newBuilder().baseUri(uri) + .property(QuarkusRestClientProperties.CONNECTION_POOL_SIZE, 6) + .build(Client.class); + + long start = System.currentTimeMillis(); + CountDownLatch latch = executeCalls(client, 5); + latch.await(); + + assertThat(System.currentTimeMillis() - start).isLessThan(2000); + } + + @Test + @Timeout(5) + void shouldPerform5CallsWithQueueingOnQueue4() throws InterruptedException { + Client client = RestClientBuilder.newBuilder().baseUri(uri) + .property(QuarkusRestClientProperties.CONNECTION_POOL_SIZE, 4) + .build(Client.class); + + long start = System.currentTimeMillis(); + CountDownLatch latch = executeCalls(client, 5); + latch.await(); + + assertThat(System.currentTimeMillis() - start).isLessThan(3000).isGreaterThanOrEqualTo(2000); + } + + private CountDownLatch executeCalls(Client client, int callAmount) { + ExecutorService executorService = Executors.newFixedThreadPool(callAmount); + CountDownLatch latch = new CountDownLatch(callAmount); + for (int i = 0; i < callAmount; i++) { + executorService.execute(() -> { + String result = client.get(); + latch.countDown(); + assertThat(result).isEqualTo("hello, world!"); + }); + } + return latch; + } + + @Path("/hello") + public interface Client { + @GET + String get(); + } + + @Path("/hello") + public static class SlowResource { + @Inject + Vertx vertx; + + @GET + public Uni getSlowly() { + return Uni.createFrom().emitter(emitter -> vertx.setTimer(1000 /* ms */, + val -> emitter.complete("hello, world!"))); + } + } + +} diff --git a/independent-projects/resteasy-reactive/client/runtime/src/main/java/org/jboss/resteasy/reactive/client/impl/ClientImpl.java b/independent-projects/resteasy-reactive/client/runtime/src/main/java/org/jboss/resteasy/reactive/client/impl/ClientImpl.java index 8c73b1112f659..728fcc3fc0176 100644 --- a/independent-projects/resteasy-reactive/client/runtime/src/main/java/org/jboss/resteasy/reactive/client/impl/ClientImpl.java +++ b/independent-projects/resteasy-reactive/client/runtime/src/main/java/org/jboss/resteasy/reactive/client/impl/ClientImpl.java @@ -70,6 +70,7 @@ public class ClientImpl implements Client { private static final Logger log = Logger.getLogger(ClientImpl.class); // TODO: remove private static final int DEFAULT_CONNECT_TIMEOUT = 15000; + private static final int DEFAULT_CONNECTION_POOL_SIZE = 20; final ClientContext clientContext; final boolean closeVertx; @@ -125,10 +126,13 @@ public Vertx get() { } Object connectionPoolSize = configuration.getProperty(CONNECTION_POOL_SIZE); - if (connectionPoolSize != null) { - log.infof("Setting connectionPoolSize to %d s", connectionPoolSize); - options.setMaxPoolSize((int) connectionPoolSize); + if (connectionPoolSize == null) { + connectionPoolSize = DEFAULT_CONNECTION_POOL_SIZE; + } else { + log.debugf("Setting connectionPoolSize to %d s", connectionPoolSize); } + options.setMaxPoolSize((int) connectionPoolSize); + this.httpClient = this.vertx.createHttpClient(options); handlerChain = new HandlerChain(followRedirects); }