From 86f75889a16273e33bf1da47df39a9c46673ad31 Mon Sep 17 00:00:00 2001 From: Violeta Georgieva Date: Wed, 12 Feb 2025 11:39:09 +0200 Subject: [PATCH 1/2] Annotate with Nullable, fields that can become null Signed-off-by: Violeta Georgieva --- .../src/main/java/reactor/pool/PoolBuilder.java | 3 ++- .../main/java/reactor/pool/SimpleDequePool.java | 16 +++++++++------- 2 files changed, 11 insertions(+), 8 deletions(-) diff --git a/reactor-pool/src/main/java/reactor/pool/PoolBuilder.java b/reactor-pool/src/main/java/reactor/pool/PoolBuilder.java index d8496d3..2f1584d 100644 --- a/reactor-pool/src/main/java/reactor/pool/PoolBuilder.java +++ b/reactor-pool/src/main/java/reactor/pool/PoolBuilder.java @@ -26,6 +26,7 @@ import java.util.function.Function; import java.util.function.Predicate; +import org.jspecify.annotations.Nullable; import org.reactivestreams.Publisher; import reactor.core.Disposable; @@ -67,7 +68,7 @@ public static PoolBuilder> from(Publisher allo final Mono allocator; final Function, CONF> configModifier; int maxPending = -1; - AllocationStrategy allocationStrategy = null; + @Nullable AllocationStrategy allocationStrategy = null; Function> releaseHandler = noopHandler(); Function> destroyHandler = noopHandler(); BiPredicate evictionPredicate = neverPredicate(); diff --git a/reactor-pool/src/main/java/reactor/pool/SimpleDequePool.java b/reactor-pool/src/main/java/reactor/pool/SimpleDequePool.java index c39eb55..38381c7 100644 --- a/reactor-pool/src/main/java/reactor/pool/SimpleDequePool.java +++ b/reactor-pool/src/main/java/reactor/pool/SimpleDequePool.java @@ -68,7 +68,7 @@ public class SimpleDequePool extends AbstractPool { volatile Deque> idleResources; @SuppressWarnings("rawtypes") - protected static final AtomicReferenceFieldUpdater IDLE_RESOURCES = + protected static final AtomicReferenceFieldUpdater IDLE_RESOURCES = AtomicReferenceFieldUpdater.newUpdater(SimpleDequePool.class, Deque.class, "idleResources"); volatile int acquired; @@ -97,7 +97,7 @@ public class SimpleDequePool extends AbstractPool { private static final AtomicIntegerFieldUpdater IDLE_SIZE = AtomicIntegerFieldUpdater.newUpdater(SimpleDequePool.class, "idleSize"); - Disposable evictionTask; + @Nullable Disposable evictionTask; SimpleDequePool(PoolConfig poolConfig) { super(poolConfig, Loggers.getLogger(SimpleDequePool.class)); @@ -187,7 +187,9 @@ public Mono disposeLater() { PENDING.getAndSet(this, TERMINATED); if (q != TERMINATED) { //stop reaper thread - this.evictionTask.dispose(); + if (this.evictionTask != null) { + this.evictionTask.dispose(); + } Borrower p; while ((p = q.pollFirst()) != null) { @@ -750,8 +752,8 @@ private static final class QueuePoolRecyclerInner final SimpleDequePool pool; //poolable can be checked for null to protect against protocol errors - QueuePooledRef pooledRef; - Subscription upstream; + @Nullable QueuePooledRef pooledRef; + @Nullable Subscription upstream; long start; //once protects against multiple requests @@ -829,7 +831,7 @@ public void onSubscribe(Subscription s) { @Override public void request(long l) { - if (Operators.validate(l)) { + if (upstream != null && Operators.validate(l)) { upstream.request(l); // we decrement ACQUIRED EXACTLY ONCE to indicate that the poolable was released by the user if (ONCE.compareAndSet(this, 0, 1)) { @@ -867,7 +869,7 @@ private static final class QueuePoolRecyclerMono extends Mono implements Scannable { final Publisher source; - final AtomicReference> slotRef; + final AtomicReference<@Nullable QueuePooledRef> slotRef; QueuePoolRecyclerMono(Publisher source, QueuePooledRef poolSlot) { this.source = source; From c2db5a5c065a4910675812b53079a7d83cb3cda1 Mon Sep 17 00:00:00 2001 From: Violeta Georgieva Date: Wed, 12 Feb 2025 12:14:59 +0200 Subject: [PATCH 2/2] Address feedback Signed-off-by: Violeta Georgieva --- reactor-pool/src/main/java/reactor/pool/SimpleDequePool.java | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/reactor-pool/src/main/java/reactor/pool/SimpleDequePool.java b/reactor-pool/src/main/java/reactor/pool/SimpleDequePool.java index 38381c7..adda097 100644 --- a/reactor-pool/src/main/java/reactor/pool/SimpleDequePool.java +++ b/reactor-pool/src/main/java/reactor/pool/SimpleDequePool.java @@ -831,7 +831,8 @@ public void onSubscribe(Subscription s) { @Override public void request(long l) { - if (upstream != null && Operators.validate(l)) { + assert upstream != null; + if (Operators.validate(l)) { upstream.request(l); // we decrement ACQUIRED EXACTLY ONCE to indicate that the poolable was released by the user if (ONCE.compareAndSet(this, 0, 1)) {