diff --git a/extensions/reactive-datasource/runtime/src/main/java/io/quarkus/reactive/datasource/runtime/DataSourceReactiveRuntimeConfig.java b/extensions/reactive-datasource/runtime/src/main/java/io/quarkus/reactive/datasource/runtime/DataSourceReactiveRuntimeConfig.java index 709d06a23dcaf..e527466abe325 100644 --- a/extensions/reactive-datasource/runtime/src/main/java/io/quarkus/reactive/datasource/runtime/DataSourceReactiveRuntimeConfig.java +++ b/extensions/reactive-datasource/runtime/src/main/java/io/quarkus/reactive/datasource/runtime/DataSourceReactiveRuntimeConfig.java @@ -88,4 +88,10 @@ public class DataSourceReactiveRuntimeConfig { */ @ConfigItem public PfxConfiguration keyCertificatePfx; + + /** + * Experimental: use one connection pool per thread. + */ + @ConfigItem + public Optional threadLocal; } diff --git a/extensions/reactive-datasource/runtime/src/main/java/io/quarkus/reactive/datasource/runtime/ThreadLocalPool.java b/extensions/reactive-datasource/runtime/src/main/java/io/quarkus/reactive/datasource/runtime/ThreadLocalPool.java new file mode 100644 index 0000000000000..cacc1330bfc95 --- /dev/null +++ b/extensions/reactive-datasource/runtime/src/main/java/io/quarkus/reactive/datasource/runtime/ThreadLocalPool.java @@ -0,0 +1,150 @@ +package io.quarkus.reactive.datasource.runtime; + +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.atomic.AtomicReference; +import java.util.concurrent.locks.StampedLock; + +import org.jboss.logging.Logger; + +import io.vertx.core.AsyncResult; +import io.vertx.core.Handler; +import io.vertx.core.Vertx; +import io.vertx.sqlclient.Pool; +import io.vertx.sqlclient.PoolOptions; +import io.vertx.sqlclient.PreparedQuery; +import io.vertx.sqlclient.Query; +import io.vertx.sqlclient.Row; +import io.vertx.sqlclient.RowSet; +import io.vertx.sqlclient.SqlConnection; +import io.vertx.sqlclient.Transaction; + +public abstract class ThreadLocalPool implements Pool { + + private static final Logger log = Logger.getLogger(ThreadLocalPool.class); + + private final AtomicReference poolset = new AtomicReference<>(new ThreadLocalPoolSet()); + + protected final PoolOptions poolOptions; + protected final Vertx vertx; + + public ThreadLocalPool(Vertx vertx, PoolOptions poolOptions) { + this.vertx = vertx; + this.poolOptions = poolOptions; + } + + private PoolType pool() { + //We re-try to be nice on an extremely unlikely race condition. + //3 attempts should be more than enough: + //especially consider that if this race is triggered, then someone is trying to use the pool on shutdown, + //which is inherently a broken plan. + for (int i = 0; i < 3; i++) { + final ThreadLocalPoolSet currentConnections = poolset.get(); + PoolType p = currentConnections.getPool(); + if (p != null) + return p; + } + throw new IllegalStateException("Multiple attempts to reopen a new pool on a closed instance: aborting"); + } + + protected abstract PoolType createThreadLocalPool(); + + @Override + public void getConnection(Handler> handler) { + pool().getConnection(handler); + } + + @Override + public Query> query(String sql) { + return pool().query(sql); + } + + @Override + public PreparedQuery> preparedQuery(String sql) { + return pool().preparedQuery(sql); + } + + @Override + public void begin(Handler> handler) { + pool().begin(handler); + } + + /** + * This is a bit weird because it works on all ThreadLocal pools, but it's only + * called from a single thread, when doing shutdown, and needs to close all the + * pools and reinitialise the thread local so that all newly created pools after + * the restart will start with an empty thread local instead of a closed one. + * N.B. while we take care of the pool to behave as best as we can, + * it's responsibility of the user of the returned pools to not use them + * while a close is being requested. + */ + @Override + public void close() { + // close all the thread-local pools, then discard the current ThreadLocal pool. + // Atomically set a new pool to be used: useful for live-reloading. + final ThreadLocalPoolSet previousPool = poolset.getAndSet(new ThreadLocalPoolSet()); + previousPool.close(); + } + + private class ThreadLocalPoolSet { + final List threadLocalPools = new ArrayList<>(); + final ThreadLocal threadLocal = new ThreadLocal<>(); + final StampedLock stampedLock = new StampedLock(); + boolean isOpen = true; + + public PoolType getPool() { + final long optimisticRead = stampedLock.tryOptimisticRead(); + if (isOpen == false) { + //Let the caller re-try on a different instance + return null; + } + PoolType ret = threadLocal.get(); + if (ret != null) { + if (stampedLock.validate(optimisticRead)) { + return ret; + } else { + //On invalid optimisticRead stamp, it means this pool instance was closed: + //let the caller re-try on a different instance + return null; + } + } else { + //Now acquire an exclusive readlock: + final long readLock = stampedLock.tryConvertToReadLock(optimisticRead); + //Again, on failure the pool was closed, return null in such case. + if (readLock == 0) + return null; + //else, we own the exclusive read lock and can now enter our slow path: + try { + log.debugf("Making pool for thread: %s", Thread.currentThread()); + ret = createThreadLocalPool(); + synchronized (threadLocalPools) { + threadLocalPools.add(ret); + } + threadLocal.set(ret); + return ret; + } finally { + stampedLock.unlockRead(readLock); + } + } + } + + public void close() { + final long lock = stampedLock.writeLock(); + try { + isOpen = false; + //While this synchronized block might take a while as we have to close all + //pool instances, it shouldn't block the getPool method as contention is + //prevented by the exclusive stamped lock. + synchronized (threadLocalPools) { + for (Pool pool : threadLocalPools) { + log.debugf("Closing pool: %s", pool); + pool.close(); + } + } + } finally { + stampedLock.unlockWrite(lock); + } + } + } + +} diff --git a/extensions/reactive-db2-client/runtime/src/main/java/io/quarkus/reactive/db2/client/runtime/DB2PoolRecorder.java b/extensions/reactive-db2-client/runtime/src/main/java/io/quarkus/reactive/db2/client/runtime/DB2PoolRecorder.java index 092134f90e202..c26b5dd0ef737 100644 --- a/extensions/reactive-db2-client/runtime/src/main/java/io/quarkus/reactive/db2/client/runtime/DB2PoolRecorder.java +++ b/extensions/reactive-db2-client/runtime/src/main/java/io/quarkus/reactive/db2/client/runtime/DB2PoolRecorder.java @@ -46,6 +46,10 @@ private DB2Pool initialize(Vertx vertx, DataSourceRuntimeConfig dataSourceRuntim dataSourceReactiveDB2Config); DB2ConnectOptions connectOptions = toConnectOptions(dataSourceRuntimeConfig, dataSourceReactiveRuntimeConfig, dataSourceReactiveDB2Config); + if (dataSourceReactiveRuntimeConfig.threadLocal.isPresent() && + dataSourceReactiveRuntimeConfig.threadLocal.get()) { + return new ThreadLocalDB2Pool(vertx, connectOptions, poolOptions); + } return DB2Pool.pool(vertx, connectOptions, poolOptions); } diff --git a/extensions/reactive-db2-client/runtime/src/main/java/io/quarkus/reactive/db2/client/runtime/ThreadLocalDB2Pool.java b/extensions/reactive-db2-client/runtime/src/main/java/io/quarkus/reactive/db2/client/runtime/ThreadLocalDB2Pool.java new file mode 100644 index 0000000000000..a9b6b6fd6ef24 --- /dev/null +++ b/extensions/reactive-db2-client/runtime/src/main/java/io/quarkus/reactive/db2/client/runtime/ThreadLocalDB2Pool.java @@ -0,0 +1,22 @@ +package io.quarkus.reactive.db2.client.runtime; + +import io.quarkus.reactive.datasource.runtime.ThreadLocalPool; +import io.vertx.core.Vertx; +import io.vertx.db2client.DB2ConnectOptions; +import io.vertx.db2client.DB2Pool; +import io.vertx.sqlclient.PoolOptions; + +public class ThreadLocalDB2Pool extends ThreadLocalPool implements DB2Pool { + + private final DB2ConnectOptions db2ConnectOptions; + + public ThreadLocalDB2Pool(Vertx vertx, DB2ConnectOptions db2ConnectOptions, PoolOptions poolOptions) { + super(vertx, poolOptions); + this.db2ConnectOptions = db2ConnectOptions; + } + + @Override + protected DB2Pool createThreadLocalPool() { + return DB2Pool.pool(vertx, db2ConnectOptions, poolOptions); + } +} diff --git a/extensions/reactive-mysql-client/runtime/src/main/java/io/quarkus/reactive/mysql/client/runtime/MySQLPoolRecorder.java b/extensions/reactive-mysql-client/runtime/src/main/java/io/quarkus/reactive/mysql/client/runtime/MySQLPoolRecorder.java index f906ce8dd6a34..6a16b2c184cdd 100644 --- a/extensions/reactive-mysql-client/runtime/src/main/java/io/quarkus/reactive/mysql/client/runtime/MySQLPoolRecorder.java +++ b/extensions/reactive-mysql-client/runtime/src/main/java/io/quarkus/reactive/mysql/client/runtime/MySQLPoolRecorder.java @@ -68,6 +68,10 @@ private MySQLPool initialize(Vertx vertx, DataSourceRuntimeConfig dataSourceRunt dataSourceReactiveMySQLConfig); MySQLConnectOptions mysqlConnectOptions = toMySQLConnectOptions(dataSourceRuntimeConfig, dataSourceReactiveRuntimeConfig, dataSourceReactiveMySQLConfig); + if (dataSourceReactiveRuntimeConfig.threadLocal.isPresent() && + dataSourceReactiveRuntimeConfig.threadLocal.get()) { + return new ThreadLocalMySQLPool(vertx, mysqlConnectOptions, poolOptions); + } return MySQLPool.pool(vertx, mysqlConnectOptions, poolOptions); } diff --git a/extensions/reactive-mysql-client/runtime/src/main/java/io/quarkus/reactive/mysql/client/runtime/ThreadLocalMySQLPool.java b/extensions/reactive-mysql-client/runtime/src/main/java/io/quarkus/reactive/mysql/client/runtime/ThreadLocalMySQLPool.java new file mode 100644 index 0000000000000..a6ed317e8fbb4 --- /dev/null +++ b/extensions/reactive-mysql-client/runtime/src/main/java/io/quarkus/reactive/mysql/client/runtime/ThreadLocalMySQLPool.java @@ -0,0 +1,22 @@ +package io.quarkus.reactive.mysql.client.runtime; + +import io.quarkus.reactive.datasource.runtime.ThreadLocalPool; +import io.vertx.core.Vertx; +import io.vertx.mysqlclient.MySQLConnectOptions; +import io.vertx.mysqlclient.MySQLPool; +import io.vertx.sqlclient.PoolOptions; + +public class ThreadLocalMySQLPool extends ThreadLocalPool implements MySQLPool { + + private final MySQLConnectOptions mySQLConnectOptions; + + public ThreadLocalMySQLPool(Vertx vertx, MySQLConnectOptions mySQLConnectOptions, PoolOptions poolOptions) { + super(vertx, poolOptions); + this.mySQLConnectOptions = mySQLConnectOptions; + } + + @Override + protected MySQLPool createThreadLocalPool() { + return MySQLPool.pool(vertx, mySQLConnectOptions, poolOptions); + } +} diff --git a/extensions/reactive-pg-client/runtime/src/main/java/io/quarkus/reactive/pg/client/runtime/PgPoolRecorder.java b/extensions/reactive-pg-client/runtime/src/main/java/io/quarkus/reactive/pg/client/runtime/PgPoolRecorder.java index dcf0c1568c766..70e6b8a0f9b45 100644 --- a/extensions/reactive-pg-client/runtime/src/main/java/io/quarkus/reactive/pg/client/runtime/PgPoolRecorder.java +++ b/extensions/reactive-pg-client/runtime/src/main/java/io/quarkus/reactive/pg/client/runtime/PgPoolRecorder.java @@ -68,6 +68,10 @@ private PgPool initialize(Vertx vertx, DataSourceRuntimeConfig dataSourceRuntime dataSourceReactivePostgreSQLConfig); PgConnectOptions pgConnectOptions = toPgConnectOptions(dataSourceRuntimeConfig, dataSourceReactiveRuntimeConfig, dataSourceReactivePostgreSQLConfig); + if (dataSourceReactiveRuntimeConfig.threadLocal.isPresent() && + dataSourceReactiveRuntimeConfig.threadLocal.get()) { + return new ThreadLocalPgPool(vertx, pgConnectOptions, poolOptions); + } return PgPool.pool(vertx, pgConnectOptions, poolOptions); } diff --git a/extensions/reactive-pg-client/runtime/src/main/java/io/quarkus/reactive/pg/client/runtime/ThreadLocalPgPool.java b/extensions/reactive-pg-client/runtime/src/main/java/io/quarkus/reactive/pg/client/runtime/ThreadLocalPgPool.java new file mode 100644 index 0000000000000..d5e5e9e86c5f5 --- /dev/null +++ b/extensions/reactive-pg-client/runtime/src/main/java/io/quarkus/reactive/pg/client/runtime/ThreadLocalPgPool.java @@ -0,0 +1,22 @@ +package io.quarkus.reactive.pg.client.runtime; + +import io.quarkus.reactive.datasource.runtime.ThreadLocalPool; +import io.vertx.core.Vertx; +import io.vertx.pgclient.PgConnectOptions; +import io.vertx.pgclient.PgPool; +import io.vertx.sqlclient.PoolOptions; + +public class ThreadLocalPgPool extends ThreadLocalPool implements PgPool { + + private final PgConnectOptions pgConnectOptions; + + public ThreadLocalPgPool(Vertx vertx, PgConnectOptions pgConnectOptions, PoolOptions poolOptions) { + super(vertx, poolOptions); + this.pgConnectOptions = pgConnectOptions; + } + + @Override + protected PgPool createThreadLocalPool() { + return PgPool.pool(vertx, pgConnectOptions, poolOptions); + } +} diff --git a/integration-tests/reactive-pg-client/pom.xml b/integration-tests/reactive-pg-client/pom.xml index a04e31ab58d87..81e304f65c6af 100644 --- a/integration-tests/reactive-pg-client/pom.xml +++ b/integration-tests/reactive-pg-client/pom.xml @@ -38,6 +38,11 @@ quarkus-junit5 test + + io.quarkus + quarkus-junit5-internal + test + io.rest-assured rest-assured @@ -51,6 +56,11 @@ src/main/resources true + + src/test/resources + true + ../test-classes + diff --git a/integration-tests/reactive-pg-client/src/test/java/io/quarkus/it/reactive/pg/client/HotReloadFruitResource.java b/integration-tests/reactive-pg-client/src/test/java/io/quarkus/it/reactive/pg/client/HotReloadFruitResource.java new file mode 100644 index 0000000000000..17a2937681a6c --- /dev/null +++ b/integration-tests/reactive-pg-client/src/test/java/io/quarkus/it/reactive/pg/client/HotReloadFruitResource.java @@ -0,0 +1,53 @@ +package io.quarkus.it.reactive.pg.client; + +import java.util.concurrent.CompletionStage; + +import javax.annotation.PostConstruct; +import javax.inject.Inject; +import javax.ws.rs.GET; +import javax.ws.rs.Path; +import javax.ws.rs.Produces; +import javax.ws.rs.core.MediaType; + +import io.vertx.core.json.JsonArray; +import io.vertx.core.json.JsonObject; +import io.vertx.mutiny.pgclient.PgPool; +import io.vertx.mutiny.sqlclient.Row; + +@Path("/hot-fruits") +public class HotReloadFruitResource { + + @Inject + PgPool client; + + @PostConstruct + void setupDb() { + client.query("DROP TABLE IF EXISTS fruits").execute() + .flatMap(r -> client.query("CREATE TABLE fruits (id SERIAL PRIMARY KEY, name TEXT NOT NULL)").execute()) + .flatMap(r -> client.query("INSERT INTO fruits (name) VALUES ('Orange')").execute()) + .flatMap(r -> client.query("INSERT INTO fruits (name) VALUES ('Pear')").execute()) + .flatMap(r -> client.query("INSERT INTO fruits (name) VALUES ('Apple')").execute()) + .await().indefinitely(); + } + + @GET + @Produces(MediaType.APPLICATION_JSON) + public CompletionStage listFruits() { + return client.query("SELECT * FROM fruits").execute() + .map(pgRowSet -> { + JsonArray jsonArray = new JsonArray(); + for (Row row : pgRowSet) { + jsonArray.add(toJson(row)); + } + return jsonArray; + }) + .subscribeAsCompletionStage(); + } + + private JsonObject toJson(Row row) { + return new JsonObject() + .put("id", row.getLong("id")) + .put("name", row.getString("name")); + } + +} diff --git a/integration-tests/reactive-pg-client/src/test/resources/application-tl.properties b/integration-tests/reactive-pg-client/src/test/resources/application-tl.properties new file mode 100644 index 0000000000000..54e42ebfd6ceb --- /dev/null +++ b/integration-tests/reactive-pg-client/src/test/resources/application-tl.properties @@ -0,0 +1,6 @@ +quarkus.datasource.db-kind=postgresql +quarkus.datasource.username=hibernate_orm_test +quarkus.datasource.password=hibernate_orm_test +quarkus.datasource.reactive.url=${reactive-postgres.url} +quarkus.datasource.reactive.thread-local=true +quarkus.log.category."io.quarkus.reactive.datasource".level=DEBUG diff --git a/test-framework/junit5-internal/src/main/java/io/quarkus/test/QuarkusDevModeTest.java b/test-framework/junit5-internal/src/main/java/io/quarkus/test/QuarkusDevModeTest.java index 41554726513d3..c06aafeb2b99b 100644 --- a/test-framework/junit5-internal/src/main/java/io/quarkus/test/QuarkusDevModeTest.java +++ b/test-framework/junit5-internal/src/main/java/io/quarkus/test/QuarkusDevModeTest.java @@ -17,9 +17,13 @@ import java.util.List; import java.util.ServiceLoader; import java.util.function.Function; +import java.util.function.Predicate; import java.util.function.Supplier; import java.util.jar.Attributes; import java.util.jar.Manifest; +import java.util.logging.LogManager; +import java.util.logging.LogRecord; +import java.util.logging.Logger; import java.util.stream.Stream; import org.jboss.shrinkwrap.api.exporter.ExplodedExporter; @@ -58,14 +62,18 @@ public class QuarkusDevModeTest implements BeforeEachCallback, AfterEachCallback, TestInstanceFactory { + private static final Logger rootLogger; + static { System.setProperty("java.util.logging.manager", "org.jboss.logmanager.LogManager"); + rootLogger = LogManager.getLogManager().getLogger(""); } private DevModeMain devModeMain; private Path deploymentDir; private Supplier archiveProducer; private String logFileName; + private InMemoryLogHandler inMemoryLogHandler = new InMemoryLogHandler((r) -> false); private Path deploymentSourcePath; private Path deploymentResourcePath; @@ -97,6 +105,15 @@ public QuarkusDevModeTest setLogFileName(String logFileName) { return this; } + public QuarkusDevModeTest setLogRecordPredicate(Predicate predicate) { + this.inMemoryLogHandler = new InMemoryLogHandler(predicate); + return this; + } + + public List getLogRecords() { + return inMemoryLogHandler.records; + } + public Object createTestInstance(TestInstanceFactoryContext factoryContext, ExtensionContext extensionContext) throws TestInstantiationException { try { @@ -110,6 +127,7 @@ public Object createTestInstance(TestInstanceFactoryContext factoryContext, Exte @Override public void beforeEach(ExtensionContext extensionContext) throws Exception { + rootLogger.addHandler(inMemoryLogHandler); if (archiveProducer == null) { throw new RuntimeException("QuarkusDevModeTest does not have archive producer set"); } @@ -174,6 +192,7 @@ public void afterEach(ExtensionContext extensionContext) throws Exception { FileUtil.deleteDirectory(deploymentDir); } } + rootLogger.removeHandler(inMemoryLogHandler); } private DevModeContext exportArchive(Path deploymentDir, Path testSourceDir) {