From 6d1902689238c2cde152bb8502f00fb9fe0340d9 Mon Sep 17 00:00:00 2001 From: Sanne Grinovero Date: Wed, 24 Jun 2020 15:01:13 +0100 Subject: [PATCH] Rethink closing strategy of the ThreadLocalPool optimised pgpool --- .../datasource/runtime/ThreadLocalPool.java | 99 +++++++++++++++---- 1 file changed, 79 insertions(+), 20 deletions(-) diff --git a/extensions/reactive-datasource/runtime/src/main/java/io/quarkus/reactive/datasource/runtime/ThreadLocalPool.java b/extensions/reactive-datasource/runtime/src/main/java/io/quarkus/reactive/datasource/runtime/ThreadLocalPool.java index 5617ff96d3fb3..cacc1330bfc95 100644 --- a/extensions/reactive-datasource/runtime/src/main/java/io/quarkus/reactive/datasource/runtime/ThreadLocalPool.java +++ b/extensions/reactive-datasource/runtime/src/main/java/io/quarkus/reactive/datasource/runtime/ThreadLocalPool.java @@ -3,6 +3,7 @@ import java.util.ArrayList; import java.util.List; import java.util.concurrent.atomic.AtomicReference; +import java.util.concurrent.locks.StampedLock; import org.jboss.logging.Logger; @@ -22,8 +23,7 @@ public abstract class ThreadLocalPool implements Pool { private static final Logger log = Logger.getLogger(ThreadLocalPool.class); - private final AtomicReference> pool = new AtomicReference<>(new ThreadLocal<>()); - private static final List threadLocalPools = new ArrayList<>(); + private final AtomicReference poolset = new AtomicReference<>(new ThreadLocalPoolSet()); protected final PoolOptions poolOptions; protected final Vertx vertx; @@ -34,17 +34,17 @@ public ThreadLocalPool(Vertx vertx, PoolOptions poolOptions) { } private PoolType pool() { - ThreadLocal poolThreadLocal = pool.get(); - PoolType ret = poolThreadLocal.get(); - if (ret == null) { - log.debugf("Making pool for thread: %s", Thread.currentThread()); - ret = createThreadLocalPool(); - synchronized (threadLocalPools) { - threadLocalPools.add(ret); - } - poolThreadLocal.set(ret); + //We re-try to be nice on an extremely unlikely race condition. + //3 attempts should be more than enough: + //especially consider that if this race is triggered, then someone is trying to use the pool on shutdown, + //which is inherently a broken plan. + for (int i = 0; i < 3; i++) { + final ThreadLocalPoolSet currentConnections = poolset.get(); + PoolType p = currentConnections.getPool(); + if (p != null) + return p; } - return ret; + throw new IllegalStateException("Multiple attempts to reopen a new pool on a closed instance: aborting"); } protected abstract PoolType createThreadLocalPool(); @@ -74,18 +74,77 @@ public void begin(Handler> handler) { * called from a single thread, when doing shutdown, and needs to close all the * pools and reinitialise the thread local so that all newly created pools after * the restart will start with an empty thread local instead of a closed one. + * N.B. while we take care of the pool to behave as best as we can, + * it's responsibility of the user of the returned pools to not use them + * while a close is being requested. */ @Override public void close() { - // close all the thread-local pools - synchronized (threadLocalPools) { - for (Pool pool : threadLocalPools) { - log.debugf("Closing pool: %s", pool); - pool.close(); + // close all the thread-local pools, then discard the current ThreadLocal pool. + // Atomically set a new pool to be used: useful for live-reloading. + final ThreadLocalPoolSet previousPool = poolset.getAndSet(new ThreadLocalPoolSet()); + previousPool.close(); + } + + private class ThreadLocalPoolSet { + final List threadLocalPools = new ArrayList<>(); + final ThreadLocal threadLocal = new ThreadLocal<>(); + final StampedLock stampedLock = new StampedLock(); + boolean isOpen = true; + + public PoolType getPool() { + final long optimisticRead = stampedLock.tryOptimisticRead(); + if (isOpen == false) { + //Let the caller re-try on a different instance + return null; + } + PoolType ret = threadLocal.get(); + if (ret != null) { + if (stampedLock.validate(optimisticRead)) { + return ret; + } else { + //On invalid optimisticRead stamp, it means this pool instance was closed: + //let the caller re-try on a different instance + return null; + } + } else { + //Now acquire an exclusive readlock: + final long readLock = stampedLock.tryConvertToReadLock(optimisticRead); + //Again, on failure the pool was closed, return null in such case. + if (readLock == 0) + return null; + //else, we own the exclusive read lock and can now enter our slow path: + try { + log.debugf("Making pool for thread: %s", Thread.currentThread()); + ret = createThreadLocalPool(); + synchronized (threadLocalPools) { + threadLocalPools.add(ret); + } + threadLocal.set(ret); + return ret; + } finally { + stampedLock.unlockRead(readLock); + } + } + } + + public void close() { + final long lock = stampedLock.writeLock(); + try { + isOpen = false; + //While this synchronized block might take a while as we have to close all + //pool instances, it shouldn't block the getPool method as contention is + //prevented by the exclusive stamped lock. + synchronized (threadLocalPools) { + for (Pool pool : threadLocalPools) { + log.debugf("Closing pool: %s", pool); + pool.close(); + } + } + } finally { + stampedLock.unlockWrite(lock); } - threadLocalPools.clear(); } - // discard the TL to clear them all - pool.set(new ThreadLocal()); } + }