Skip to content

Commit

Permalink
Rethink closing strategy of the ThreadLocalPool optimised pgpool
Browse files Browse the repository at this point in the history
  • Loading branch information
Sanne committed Jun 24, 2020
1 parent c00822d commit 6d19026
Showing 1 changed file with 79 additions and 20 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.locks.StampedLock;

import org.jboss.logging.Logger;

Expand All @@ -22,8 +23,7 @@ public abstract class ThreadLocalPool<PoolType extends Pool> implements Pool {

private static final Logger log = Logger.getLogger(ThreadLocalPool.class);

private final AtomicReference<ThreadLocal<PoolType>> pool = new AtomicReference<>(new ThreadLocal<>());
private static final List<Pool> threadLocalPools = new ArrayList<>();
private final AtomicReference<ThreadLocalPoolSet> poolset = new AtomicReference<>(new ThreadLocalPoolSet());

protected final PoolOptions poolOptions;
protected final Vertx vertx;
Expand All @@ -34,17 +34,17 @@ public ThreadLocalPool(Vertx vertx, PoolOptions poolOptions) {
}

private PoolType pool() {
ThreadLocal<PoolType> poolThreadLocal = pool.get();
PoolType ret = poolThreadLocal.get();
if (ret == null) {
log.debugf("Making pool for thread: %s", Thread.currentThread());
ret = createThreadLocalPool();
synchronized (threadLocalPools) {
threadLocalPools.add(ret);
}
poolThreadLocal.set(ret);
//We re-try to be nice on an extremely unlikely race condition.
//3 attempts should be more than enough:
//especially consider that if this race is triggered, then someone is trying to use the pool on shutdown,
//which is inherently a broken plan.
for (int i = 0; i < 3; i++) {
final ThreadLocalPoolSet currentConnections = poolset.get();
PoolType p = currentConnections.getPool();
if (p != null)
return p;
}
return ret;
throw new IllegalStateException("Multiple attempts to reopen a new pool on a closed instance: aborting");
}

protected abstract PoolType createThreadLocalPool();
Expand Down Expand Up @@ -74,18 +74,77 @@ public void begin(Handler<AsyncResult<Transaction>> handler) {
* called from a single thread, when doing shutdown, and needs to close all the
* pools and reinitialise the thread local so that all newly created pools after
* the restart will start with an empty thread local instead of a closed one.
* N.B. while we take care of the pool to behave as best as we can,
* it's responsibility of the user of the returned pools to not use them
* while a close is being requested.
*/
@Override
public void close() {
// close all the thread-local pools
synchronized (threadLocalPools) {
for (Pool pool : threadLocalPools) {
log.debugf("Closing pool: %s", pool);
pool.close();
// close all the thread-local pools, then discard the current ThreadLocal pool.
// Atomically set a new pool to be used: useful for live-reloading.
final ThreadLocalPoolSet previousPool = poolset.getAndSet(new ThreadLocalPoolSet());
previousPool.close();
}

private class ThreadLocalPoolSet {
final List<Pool> threadLocalPools = new ArrayList<>();
final ThreadLocal<PoolType> threadLocal = new ThreadLocal<>();
final StampedLock stampedLock = new StampedLock();
boolean isOpen = true;

public PoolType getPool() {
final long optimisticRead = stampedLock.tryOptimisticRead();
if (isOpen == false) {
//Let the caller re-try on a different instance
return null;
}
PoolType ret = threadLocal.get();
if (ret != null) {
if (stampedLock.validate(optimisticRead)) {
return ret;
} else {
//On invalid optimisticRead stamp, it means this pool instance was closed:
//let the caller re-try on a different instance
return null;
}
} else {
//Now acquire an exclusive readlock:
final long readLock = stampedLock.tryConvertToReadLock(optimisticRead);
//Again, on failure the pool was closed, return null in such case.
if (readLock == 0)
return null;
//else, we own the exclusive read lock and can now enter our slow path:
try {
log.debugf("Making pool for thread: %s", Thread.currentThread());
ret = createThreadLocalPool();
synchronized (threadLocalPools) {
threadLocalPools.add(ret);
}
threadLocal.set(ret);
return ret;
} finally {
stampedLock.unlockRead(readLock);
}
}
}

public void close() {
final long lock = stampedLock.writeLock();
try {
isOpen = false;
//While this synchronized block might take a while as we have to close all
//pool instances, it shouldn't block the getPool method as contention is
//prevented by the exclusive stamped lock.
synchronized (threadLocalPools) {
for (Pool pool : threadLocalPools) {
log.debugf("Closing pool: %s", pool);
pool.close();
}
}
} finally {
stampedLock.unlockWrite(lock);
}
threadLocalPools.clear();
}
// discard the TL to clear them all
pool.set(new ThreadLocal<PoolType>());
}

}

0 comments on commit 6d19026

Please sign in to comment.