Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Rethink closing strategy of the ThreadLocalPool optimised pgpool #10228

Merged
merged 3 commits into from
Jun 25, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -88,4 +88,10 @@ public class DataSourceReactiveRuntimeConfig {
*/
@ConfigItem
public PfxConfiguration keyCertificatePfx;

/**
* Experimental: use one connection pool per thread.
*/
@ConfigItem
public Optional<Boolean> threadLocal;
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,150 @@
package io.quarkus.reactive.datasource.runtime;

import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.locks.StampedLock;

import org.jboss.logging.Logger;

import io.vertx.core.AsyncResult;
import io.vertx.core.Handler;
import io.vertx.core.Vertx;
import io.vertx.sqlclient.Pool;
import io.vertx.sqlclient.PoolOptions;
import io.vertx.sqlclient.PreparedQuery;
import io.vertx.sqlclient.Query;
import io.vertx.sqlclient.Row;
import io.vertx.sqlclient.RowSet;
import io.vertx.sqlclient.SqlConnection;
import io.vertx.sqlclient.Transaction;

public abstract class ThreadLocalPool<PoolType extends Pool> implements Pool {

private static final Logger log = Logger.getLogger(ThreadLocalPool.class);

private final AtomicReference<ThreadLocalPoolSet> poolset = new AtomicReference<>(new ThreadLocalPoolSet());

protected final PoolOptions poolOptions;
protected final Vertx vertx;

public ThreadLocalPool(Vertx vertx, PoolOptions poolOptions) {
this.vertx = vertx;
this.poolOptions = poolOptions;
}

private PoolType pool() {
//We re-try to be nice on an extremely unlikely race condition.
//3 attempts should be more than enough:
//especially consider that if this race is triggered, then someone is trying to use the pool on shutdown,
//which is inherently a broken plan.
for (int i = 0; i < 3; i++) {
final ThreadLocalPoolSet currentConnections = poolset.get();
PoolType p = currentConnections.getPool();
if (p != null)
return p;
}
throw new IllegalStateException("Multiple attempts to reopen a new pool on a closed instance: aborting");
}

protected abstract PoolType createThreadLocalPool();

@Override
public void getConnection(Handler<AsyncResult<SqlConnection>> handler) {
pool().getConnection(handler);
}

@Override
public Query<RowSet<Row>> query(String sql) {
return pool().query(sql);
}

@Override
public PreparedQuery<RowSet<Row>> preparedQuery(String sql) {
return pool().preparedQuery(sql);
}

@Override
public void begin(Handler<AsyncResult<Transaction>> handler) {
pool().begin(handler);
}

/**
* This is a bit weird because it works on all ThreadLocal pools, but it's only
* called from a single thread, when doing shutdown, and needs to close all the
* pools and reinitialise the thread local so that all newly created pools after
* the restart will start with an empty thread local instead of a closed one.
* N.B. while we take care of the pool to behave as best as we can,
* it's responsibility of the user of the returned pools to not use them
* while a close is being requested.
*/
@Override
public void close() {
// close all the thread-local pools, then discard the current ThreadLocal pool.
// Atomically set a new pool to be used: useful for live-reloading.
final ThreadLocalPoolSet previousPool = poolset.getAndSet(new ThreadLocalPoolSet());
previousPool.close();
}

private class ThreadLocalPoolSet {
final List<Pool> threadLocalPools = new ArrayList<>();
final ThreadLocal<PoolType> threadLocal = new ThreadLocal<>();
final StampedLock stampedLock = new StampedLock();
boolean isOpen = true;

public PoolType getPool() {
final long optimisticRead = stampedLock.tryOptimisticRead();
if (isOpen == false) {
//Let the caller re-try on a different instance
return null;
}
PoolType ret = threadLocal.get();
if (ret != null) {
if (stampedLock.validate(optimisticRead)) {
return ret;
} else {
//On invalid optimisticRead stamp, it means this pool instance was closed:
//let the caller re-try on a different instance
return null;
}
} else {
//Now acquire an exclusive readlock:
final long readLock = stampedLock.tryConvertToReadLock(optimisticRead);
//Again, on failure the pool was closed, return null in such case.
if (readLock == 0)
return null;
//else, we own the exclusive read lock and can now enter our slow path:
try {
log.debugf("Making pool for thread: %s", Thread.currentThread());
ret = createThreadLocalPool();
synchronized (threadLocalPools) {
threadLocalPools.add(ret);
}
threadLocal.set(ret);
return ret;
} finally {
stampedLock.unlockRead(readLock);
}
}
}

public void close() {
final long lock = stampedLock.writeLock();
try {
isOpen = false;
//While this synchronized block might take a while as we have to close all
//pool instances, it shouldn't block the getPool method as contention is
//prevented by the exclusive stamped lock.
synchronized (threadLocalPools) {
for (Pool pool : threadLocalPools) {
log.debugf("Closing pool: %s", pool);
pool.close();
}
}
} finally {
stampedLock.unlockWrite(lock);
}
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,10 @@ private DB2Pool initialize(Vertx vertx, DataSourceRuntimeConfig dataSourceRuntim
dataSourceReactiveDB2Config);
DB2ConnectOptions connectOptions = toConnectOptions(dataSourceRuntimeConfig, dataSourceReactiveRuntimeConfig,
dataSourceReactiveDB2Config);
if (dataSourceReactiveRuntimeConfig.threadLocal.isPresent() &&
dataSourceReactiveRuntimeConfig.threadLocal.get()) {
return new ThreadLocalDB2Pool(vertx, connectOptions, poolOptions);
}
return DB2Pool.pool(vertx, connectOptions, poolOptions);
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
package io.quarkus.reactive.db2.client.runtime;

import io.quarkus.reactive.datasource.runtime.ThreadLocalPool;
import io.vertx.core.Vertx;
import io.vertx.db2client.DB2ConnectOptions;
import io.vertx.db2client.DB2Pool;
import io.vertx.sqlclient.PoolOptions;

public class ThreadLocalDB2Pool extends ThreadLocalPool<DB2Pool> implements DB2Pool {

private final DB2ConnectOptions db2ConnectOptions;

public ThreadLocalDB2Pool(Vertx vertx, DB2ConnectOptions db2ConnectOptions, PoolOptions poolOptions) {
super(vertx, poolOptions);
this.db2ConnectOptions = db2ConnectOptions;
}

@Override
protected DB2Pool createThreadLocalPool() {
return DB2Pool.pool(vertx, db2ConnectOptions, poolOptions);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,10 @@ private MySQLPool initialize(Vertx vertx, DataSourceRuntimeConfig dataSourceRunt
dataSourceReactiveMySQLConfig);
MySQLConnectOptions mysqlConnectOptions = toMySQLConnectOptions(dataSourceRuntimeConfig,
dataSourceReactiveRuntimeConfig, dataSourceReactiveMySQLConfig);
if (dataSourceReactiveRuntimeConfig.threadLocal.isPresent() &&
dataSourceReactiveRuntimeConfig.threadLocal.get()) {
return new ThreadLocalMySQLPool(vertx, mysqlConnectOptions, poolOptions);
}
return MySQLPool.pool(vertx, mysqlConnectOptions, poolOptions);
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
package io.quarkus.reactive.mysql.client.runtime;

import io.quarkus.reactive.datasource.runtime.ThreadLocalPool;
import io.vertx.core.Vertx;
import io.vertx.mysqlclient.MySQLConnectOptions;
import io.vertx.mysqlclient.MySQLPool;
import io.vertx.sqlclient.PoolOptions;

public class ThreadLocalMySQLPool extends ThreadLocalPool<MySQLPool> implements MySQLPool {

private final MySQLConnectOptions mySQLConnectOptions;

public ThreadLocalMySQLPool(Vertx vertx, MySQLConnectOptions mySQLConnectOptions, PoolOptions poolOptions) {
super(vertx, poolOptions);
this.mySQLConnectOptions = mySQLConnectOptions;
}

@Override
protected MySQLPool createThreadLocalPool() {
return MySQLPool.pool(vertx, mySQLConnectOptions, poolOptions);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,10 @@ private PgPool initialize(Vertx vertx, DataSourceRuntimeConfig dataSourceRuntime
dataSourceReactivePostgreSQLConfig);
PgConnectOptions pgConnectOptions = toPgConnectOptions(dataSourceRuntimeConfig, dataSourceReactiveRuntimeConfig,
dataSourceReactivePostgreSQLConfig);
if (dataSourceReactiveRuntimeConfig.threadLocal.isPresent() &&
dataSourceReactiveRuntimeConfig.threadLocal.get()) {
return new ThreadLocalPgPool(vertx, pgConnectOptions, poolOptions);
}
return PgPool.pool(vertx, pgConnectOptions, poolOptions);
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
package io.quarkus.reactive.pg.client.runtime;

import io.quarkus.reactive.datasource.runtime.ThreadLocalPool;
import io.vertx.core.Vertx;
import io.vertx.pgclient.PgConnectOptions;
import io.vertx.pgclient.PgPool;
import io.vertx.sqlclient.PoolOptions;

public class ThreadLocalPgPool extends ThreadLocalPool<PgPool> implements PgPool {

private final PgConnectOptions pgConnectOptions;

public ThreadLocalPgPool(Vertx vertx, PgConnectOptions pgConnectOptions, PoolOptions poolOptions) {
super(vertx, poolOptions);
this.pgConnectOptions = pgConnectOptions;
}

@Override
protected PgPool createThreadLocalPool() {
return PgPool.pool(vertx, pgConnectOptions, poolOptions);
}
}
10 changes: 10 additions & 0 deletions integration-tests/reactive-pg-client/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,11 @@
<artifactId>quarkus-junit5</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>io.quarkus</groupId>
<artifactId>quarkus-junit5-internal</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>io.rest-assured</groupId>
<artifactId>rest-assured</artifactId>
Expand All @@ -51,6 +56,11 @@
<directory>src/main/resources</directory>
<filtering>true</filtering>
</resource>
<resource>
<directory>src/test/resources</directory>
<filtering>true</filtering>
<targetPath>../test-classes</targetPath>
</resource>
</resources>
<plugins>
<plugin>
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
package io.quarkus.it.reactive.pg.client;

import java.util.concurrent.CompletionStage;

import javax.annotation.PostConstruct;
import javax.inject.Inject;
import javax.ws.rs.GET;
import javax.ws.rs.Path;
import javax.ws.rs.Produces;
import javax.ws.rs.core.MediaType;

import io.vertx.core.json.JsonArray;
import io.vertx.core.json.JsonObject;
import io.vertx.mutiny.pgclient.PgPool;
import io.vertx.mutiny.sqlclient.Row;

@Path("/hot-fruits")
public class HotReloadFruitResource {

@Inject
PgPool client;

@PostConstruct
void setupDb() {
client.query("DROP TABLE IF EXISTS fruits").execute()
.flatMap(r -> client.query("CREATE TABLE fruits (id SERIAL PRIMARY KEY, name TEXT NOT NULL)").execute())
.flatMap(r -> client.query("INSERT INTO fruits (name) VALUES ('Orange')").execute())
.flatMap(r -> client.query("INSERT INTO fruits (name) VALUES ('Pear')").execute())
.flatMap(r -> client.query("INSERT INTO fruits (name) VALUES ('Apple')").execute())
.await().indefinitely();
}

@GET
@Produces(MediaType.APPLICATION_JSON)
public CompletionStage<JsonArray> listFruits() {
return client.query("SELECT * FROM fruits").execute()
.map(pgRowSet -> {
JsonArray jsonArray = new JsonArray();
for (Row row : pgRowSet) {
jsonArray.add(toJson(row));
}
return jsonArray;
})
.subscribeAsCompletionStage();
}

private JsonObject toJson(Row row) {
return new JsonObject()
.put("id", row.getLong("id"))
.put("name", row.getString("name"));
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
quarkus.datasource.db-kind=postgresql
quarkus.datasource.username=hibernate_orm_test
quarkus.datasource.password=hibernate_orm_test
quarkus.datasource.reactive.url=${reactive-postgres.url}
quarkus.datasource.reactive.thread-local=true
quarkus.log.category."io.quarkus.reactive.datasource".level=DEBUG
Loading