Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Annotate with Nullable, fields that can become null #277

Merged
merged 2 commits into from
Feb 12, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion reactor-pool/src/main/java/reactor/pool/PoolBuilder.java
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import java.util.function.Function;
import java.util.function.Predicate;

import org.jspecify.annotations.Nullable;
import org.reactivestreams.Publisher;

import reactor.core.Disposable;
Expand Down Expand Up @@ -67,7 +68,7 @@ public static <T> PoolBuilder<T, PoolConfig<T>> from(Publisher<? extends T> allo
final Mono<T> allocator;
final Function<PoolConfig<T>, CONF> configModifier;
int maxPending = -1;
AllocationStrategy allocationStrategy = null;
@Nullable AllocationStrategy allocationStrategy = null;
Function<T, ? extends Publisher<Void>> releaseHandler = noopHandler();
Function<T, ? extends Publisher<Void>> destroyHandler = noopHandler();
BiPredicate<T, PooledRefMetadata> evictionPredicate = neverPredicate();
Expand Down
15 changes: 9 additions & 6 deletions reactor-pool/src/main/java/reactor/pool/SimpleDequePool.java
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ public class SimpleDequePool<POOLABLE> extends AbstractPool<POOLABLE> {

volatile Deque<QueuePooledRef<POOLABLE>> idleResources;
@SuppressWarnings("rawtypes")
protected static final AtomicReferenceFieldUpdater<SimpleDequePool, Deque> IDLE_RESOURCES =
protected static final AtomicReferenceFieldUpdater<SimpleDequePool, @Nullable Deque> IDLE_RESOURCES =
AtomicReferenceFieldUpdater.newUpdater(SimpleDequePool.class, Deque.class, "idleResources");

volatile int acquired;
Expand Down Expand Up @@ -97,7 +97,7 @@ public class SimpleDequePool<POOLABLE> extends AbstractPool<POOLABLE> {
private static final AtomicIntegerFieldUpdater<SimpleDequePool> IDLE_SIZE =
AtomicIntegerFieldUpdater.newUpdater(SimpleDequePool.class, "idleSize");

Disposable evictionTask;
@Nullable Disposable evictionTask;

SimpleDequePool(PoolConfig<POOLABLE> poolConfig) {
super(poolConfig, Loggers.getLogger(SimpleDequePool.class));
Expand Down Expand Up @@ -187,7 +187,9 @@ public Mono<Void> disposeLater() {
PENDING.getAndSet(this, TERMINATED);
if (q != TERMINATED) {
//stop reaper thread
this.evictionTask.dispose();
if (this.evictionTask != null) {
this.evictionTask.dispose();
}

Borrower<POOLABLE> p;
while ((p = q.pollFirst()) != null) {
Expand Down Expand Up @@ -750,8 +752,8 @@ private static final class QueuePoolRecyclerInner<T>
final SimpleDequePool<T> pool;

//poolable can be checked for null to protect against protocol errors
QueuePooledRef<T> pooledRef;
Subscription upstream;
@Nullable QueuePooledRef<T> pooledRef;
@Nullable Subscription upstream;
long start;

//once protects against multiple requests
Expand Down Expand Up @@ -829,6 +831,7 @@ public void onSubscribe(Subscription s) {

@Override
public void request(long l) {
assert upstream != null;
if (Operators.validate(l)) {
upstream.request(l);
// we decrement ACQUIRED EXACTLY ONCE to indicate that the poolable was released by the user
Expand Down Expand Up @@ -867,7 +870,7 @@ private static final class QueuePoolRecyclerMono<T> extends Mono<Void>
implements Scannable {

final Publisher<Void> source;
final AtomicReference<QueuePooledRef<T>> slotRef;
final AtomicReference<@Nullable QueuePooledRef<T>> slotRef;

QueuePoolRecyclerMono(Publisher<Void> source, QueuePooledRef<T> poolSlot) {
this.source = source;
Expand Down
Loading