Skip to content

Commit

Permalink
Merge pull request #10228 from Sanne/pgpoolconcurrency
Browse files Browse the repository at this point in the history
Rethink closing strategy of the ThreadLocalPool optimised pgpool
  • Loading branch information
geoand authored Jun 25, 2020
2 parents e712f74 + 2064742 commit 7c01526
Show file tree
Hide file tree
Showing 12 changed files with 322 additions and 0 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -88,4 +88,10 @@ public class DataSourceReactiveRuntimeConfig {
*/
@ConfigItem
public PfxConfiguration keyCertificatePfx;

/**
* Experimental: use one connection pool per thread.
*/
@ConfigItem
public Optional<Boolean> threadLocal;
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,150 @@
package io.quarkus.reactive.datasource.runtime;

import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.locks.StampedLock;

import org.jboss.logging.Logger;

import io.vertx.core.AsyncResult;
import io.vertx.core.Handler;
import io.vertx.core.Vertx;
import io.vertx.sqlclient.Pool;
import io.vertx.sqlclient.PoolOptions;
import io.vertx.sqlclient.PreparedQuery;
import io.vertx.sqlclient.Query;
import io.vertx.sqlclient.Row;
import io.vertx.sqlclient.RowSet;
import io.vertx.sqlclient.SqlConnection;
import io.vertx.sqlclient.Transaction;

public abstract class ThreadLocalPool<PoolType extends Pool> implements Pool {

private static final Logger log = Logger.getLogger(ThreadLocalPool.class);

private final AtomicReference<ThreadLocalPoolSet> poolset = new AtomicReference<>(new ThreadLocalPoolSet());

protected final PoolOptions poolOptions;
protected final Vertx vertx;

public ThreadLocalPool(Vertx vertx, PoolOptions poolOptions) {
this.vertx = vertx;
this.poolOptions = poolOptions;
}

private PoolType pool() {
//We re-try to be nice on an extremely unlikely race condition.
//3 attempts should be more than enough:
//especially consider that if this race is triggered, then someone is trying to use the pool on shutdown,
//which is inherently a broken plan.
for (int i = 0; i < 3; i++) {
final ThreadLocalPoolSet currentConnections = poolset.get();
PoolType p = currentConnections.getPool();
if (p != null)
return p;
}
throw new IllegalStateException("Multiple attempts to reopen a new pool on a closed instance: aborting");
}

protected abstract PoolType createThreadLocalPool();

@Override
public void getConnection(Handler<AsyncResult<SqlConnection>> handler) {
pool().getConnection(handler);
}

@Override
public Query<RowSet<Row>> query(String sql) {
return pool().query(sql);
}

@Override
public PreparedQuery<RowSet<Row>> preparedQuery(String sql) {
return pool().preparedQuery(sql);
}

@Override
public void begin(Handler<AsyncResult<Transaction>> handler) {
pool().begin(handler);
}

/**
* This is a bit weird because it works on all ThreadLocal pools, but it's only
* called from a single thread, when doing shutdown, and needs to close all the
* pools and reinitialise the thread local so that all newly created pools after
* the restart will start with an empty thread local instead of a closed one.
* N.B. while we take care of the pool to behave as best as we can,
* it's responsibility of the user of the returned pools to not use them
* while a close is being requested.
*/
@Override
public void close() {
// close all the thread-local pools, then discard the current ThreadLocal pool.
// Atomically set a new pool to be used: useful for live-reloading.
final ThreadLocalPoolSet previousPool = poolset.getAndSet(new ThreadLocalPoolSet());
previousPool.close();
}

private class ThreadLocalPoolSet {
final List<Pool> threadLocalPools = new ArrayList<>();
final ThreadLocal<PoolType> threadLocal = new ThreadLocal<>();
final StampedLock stampedLock = new StampedLock();
boolean isOpen = true;

public PoolType getPool() {
final long optimisticRead = stampedLock.tryOptimisticRead();
if (isOpen == false) {
//Let the caller re-try on a different instance
return null;
}
PoolType ret = threadLocal.get();
if (ret != null) {
if (stampedLock.validate(optimisticRead)) {
return ret;
} else {
//On invalid optimisticRead stamp, it means this pool instance was closed:
//let the caller re-try on a different instance
return null;
}
} else {
//Now acquire an exclusive readlock:
final long readLock = stampedLock.tryConvertToReadLock(optimisticRead);
//Again, on failure the pool was closed, return null in such case.
if (readLock == 0)
return null;
//else, we own the exclusive read lock and can now enter our slow path:
try {
log.debugf("Making pool for thread: %s", Thread.currentThread());
ret = createThreadLocalPool();
synchronized (threadLocalPools) {
threadLocalPools.add(ret);
}
threadLocal.set(ret);
return ret;
} finally {
stampedLock.unlockRead(readLock);
}
}
}

public void close() {
final long lock = stampedLock.writeLock();
try {
isOpen = false;
//While this synchronized block might take a while as we have to close all
//pool instances, it shouldn't block the getPool method as contention is
//prevented by the exclusive stamped lock.
synchronized (threadLocalPools) {
for (Pool pool : threadLocalPools) {
log.debugf("Closing pool: %s", pool);
pool.close();
}
}
} finally {
stampedLock.unlockWrite(lock);
}
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,10 @@ private DB2Pool initialize(Vertx vertx, DataSourceRuntimeConfig dataSourceRuntim
dataSourceReactiveDB2Config);
DB2ConnectOptions connectOptions = toConnectOptions(dataSourceRuntimeConfig, dataSourceReactiveRuntimeConfig,
dataSourceReactiveDB2Config);
if (dataSourceReactiveRuntimeConfig.threadLocal.isPresent() &&
dataSourceReactiveRuntimeConfig.threadLocal.get()) {
return new ThreadLocalDB2Pool(vertx, connectOptions, poolOptions);
}
return DB2Pool.pool(vertx, connectOptions, poolOptions);
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
package io.quarkus.reactive.db2.client.runtime;

import io.quarkus.reactive.datasource.runtime.ThreadLocalPool;
import io.vertx.core.Vertx;
import io.vertx.db2client.DB2ConnectOptions;
import io.vertx.db2client.DB2Pool;
import io.vertx.sqlclient.PoolOptions;

public class ThreadLocalDB2Pool extends ThreadLocalPool<DB2Pool> implements DB2Pool {

private final DB2ConnectOptions db2ConnectOptions;

public ThreadLocalDB2Pool(Vertx vertx, DB2ConnectOptions db2ConnectOptions, PoolOptions poolOptions) {
super(vertx, poolOptions);
this.db2ConnectOptions = db2ConnectOptions;
}

@Override
protected DB2Pool createThreadLocalPool() {
return DB2Pool.pool(vertx, db2ConnectOptions, poolOptions);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,10 @@ private MySQLPool initialize(Vertx vertx, DataSourceRuntimeConfig dataSourceRunt
dataSourceReactiveMySQLConfig);
MySQLConnectOptions mysqlConnectOptions = toMySQLConnectOptions(dataSourceRuntimeConfig,
dataSourceReactiveRuntimeConfig, dataSourceReactiveMySQLConfig);
if (dataSourceReactiveRuntimeConfig.threadLocal.isPresent() &&
dataSourceReactiveRuntimeConfig.threadLocal.get()) {
return new ThreadLocalMySQLPool(vertx, mysqlConnectOptions, poolOptions);
}
return MySQLPool.pool(vertx, mysqlConnectOptions, poolOptions);
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
package io.quarkus.reactive.mysql.client.runtime;

import io.quarkus.reactive.datasource.runtime.ThreadLocalPool;
import io.vertx.core.Vertx;
import io.vertx.mysqlclient.MySQLConnectOptions;
import io.vertx.mysqlclient.MySQLPool;
import io.vertx.sqlclient.PoolOptions;

public class ThreadLocalMySQLPool extends ThreadLocalPool<MySQLPool> implements MySQLPool {

private final MySQLConnectOptions mySQLConnectOptions;

public ThreadLocalMySQLPool(Vertx vertx, MySQLConnectOptions mySQLConnectOptions, PoolOptions poolOptions) {
super(vertx, poolOptions);
this.mySQLConnectOptions = mySQLConnectOptions;
}

@Override
protected MySQLPool createThreadLocalPool() {
return MySQLPool.pool(vertx, mySQLConnectOptions, poolOptions);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,10 @@ private PgPool initialize(Vertx vertx, DataSourceRuntimeConfig dataSourceRuntime
dataSourceReactivePostgreSQLConfig);
PgConnectOptions pgConnectOptions = toPgConnectOptions(dataSourceRuntimeConfig, dataSourceReactiveRuntimeConfig,
dataSourceReactivePostgreSQLConfig);
if (dataSourceReactiveRuntimeConfig.threadLocal.isPresent() &&
dataSourceReactiveRuntimeConfig.threadLocal.get()) {
return new ThreadLocalPgPool(vertx, pgConnectOptions, poolOptions);
}
return PgPool.pool(vertx, pgConnectOptions, poolOptions);
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
package io.quarkus.reactive.pg.client.runtime;

import io.quarkus.reactive.datasource.runtime.ThreadLocalPool;
import io.vertx.core.Vertx;
import io.vertx.pgclient.PgConnectOptions;
import io.vertx.pgclient.PgPool;
import io.vertx.sqlclient.PoolOptions;

public class ThreadLocalPgPool extends ThreadLocalPool<PgPool> implements PgPool {

private final PgConnectOptions pgConnectOptions;

public ThreadLocalPgPool(Vertx vertx, PgConnectOptions pgConnectOptions, PoolOptions poolOptions) {
super(vertx, poolOptions);
this.pgConnectOptions = pgConnectOptions;
}

@Override
protected PgPool createThreadLocalPool() {
return PgPool.pool(vertx, pgConnectOptions, poolOptions);
}
}
10 changes: 10 additions & 0 deletions integration-tests/reactive-pg-client/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,11 @@
<artifactId>quarkus-junit5</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>io.quarkus</groupId>
<artifactId>quarkus-junit5-internal</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>io.rest-assured</groupId>
<artifactId>rest-assured</artifactId>
Expand All @@ -51,6 +56,11 @@
<directory>src/main/resources</directory>
<filtering>true</filtering>
</resource>
<resource>
<directory>src/test/resources</directory>
<filtering>true</filtering>
<targetPath>../test-classes</targetPath>
</resource>
</resources>
<plugins>
<plugin>
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
package io.quarkus.it.reactive.pg.client;

import java.util.concurrent.CompletionStage;

import javax.annotation.PostConstruct;
import javax.inject.Inject;
import javax.ws.rs.GET;
import javax.ws.rs.Path;
import javax.ws.rs.Produces;
import javax.ws.rs.core.MediaType;

import io.vertx.core.json.JsonArray;
import io.vertx.core.json.JsonObject;
import io.vertx.mutiny.pgclient.PgPool;
import io.vertx.mutiny.sqlclient.Row;

@Path("/hot-fruits")
public class HotReloadFruitResource {

@Inject
PgPool client;

@PostConstruct
void setupDb() {
client.query("DROP TABLE IF EXISTS fruits").execute()
.flatMap(r -> client.query("CREATE TABLE fruits (id SERIAL PRIMARY KEY, name TEXT NOT NULL)").execute())
.flatMap(r -> client.query("INSERT INTO fruits (name) VALUES ('Orange')").execute())
.flatMap(r -> client.query("INSERT INTO fruits (name) VALUES ('Pear')").execute())
.flatMap(r -> client.query("INSERT INTO fruits (name) VALUES ('Apple')").execute())
.await().indefinitely();
}

@GET
@Produces(MediaType.APPLICATION_JSON)
public CompletionStage<JsonArray> listFruits() {
return client.query("SELECT * FROM fruits").execute()
.map(pgRowSet -> {
JsonArray jsonArray = new JsonArray();
for (Row row : pgRowSet) {
jsonArray.add(toJson(row));
}
return jsonArray;
})
.subscribeAsCompletionStage();
}

private JsonObject toJson(Row row) {
return new JsonObject()
.put("id", row.getLong("id"))
.put("name", row.getString("name"));
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
quarkus.datasource.db-kind=postgresql
quarkus.datasource.username=hibernate_orm_test
quarkus.datasource.password=hibernate_orm_test
quarkus.datasource.reactive.url=${reactive-postgres.url}
quarkus.datasource.reactive.thread-local=true
quarkus.log.category."io.quarkus.reactive.datasource".level=DEBUG
Loading

0 comments on commit 7c01526

Please sign in to comment.