From 1c2afec09b6e41410827061cc468b64eb4571ade Mon Sep 17 00:00:00 2001 From: Bryce Anderson Date: Tue, 30 Jul 2024 14:39:51 -0600 Subject: [PATCH] Remove some uses of `synchronized` (#3024) Motivation: With java-21 and v-threads synchronization can cause deadlocks because carrier threads cannot release a v-thread that is in a synchronized block. Modifications: Identify the cases where we're using synchronization and move them to the equivalent v-thread safe ReentrantLock. Result: Better v-thread safety. --- .../limiter/api/GradientCapacityLimiter.java | 31 ++++++++++++++----- .../http/utils/CacheConnectionFactory.java | 25 ++++++++++++--- 2 files changed, 45 insertions(+), 11 deletions(-) diff --git a/servicetalk-capacity-limiter-api/src/main/java/io/servicetalk/capacity/limiter/api/GradientCapacityLimiter.java b/servicetalk-capacity-limiter-api/src/main/java/io/servicetalk/capacity/limiter/api/GradientCapacityLimiter.java index b62f962f63..84e18ff0c7 100644 --- a/servicetalk-capacity-limiter-api/src/main/java/io/servicetalk/capacity/limiter/api/GradientCapacityLimiter.java +++ b/servicetalk-capacity-limiter-api/src/main/java/io/servicetalk/capacity/limiter/api/GradientCapacityLimiter.java @@ -38,6 +38,7 @@ import org.slf4j.LoggerFactory; import java.time.Duration; +import java.util.concurrent.locks.ReentrantLock; import java.util.function.BiFunction; import java.util.function.BiPredicate; import java.util.function.LongSupplier; @@ -67,7 +68,7 @@ final class GradientCapacityLimiter implements CapacityLimiter { private static final Logger LOGGER = LoggerFactory.getLogger(GradientCapacityLimiter.class); - private final Object lock = new Object(); + private final ReentrantLock lock = new ReentrantLock(); private final String name; private final int min; @@ -128,12 +129,15 @@ public Ticket tryAcquire(final Classification classification, @Nullable final Co int newLimit; Ticket ticket = null; - synchronized (lock) { + lock.lock(); + try { newLimit = (int) limit; if (pending < limit) { newPending = ++pending; ticket = new DefaultTicket(this, newLimit - newPending, newPending); } + } finally { + lock.unlock(); } if (ticket != null) { @@ -143,9 +147,10 @@ public Ticket tryAcquire(final Classification classification, @Nullable final Co } /** - * Needs to be called within a synchronized block. + * Needs to be called while holding the lock. */ private int updateLimit(final long timestampNs, final double shortLatencyMillis, final double longLatencyMillis) { + assert lock.isHeldByCurrentThread(); if (isNaN(longLatencyMillis) || isNaN(shortLatencyMillis) || shortLatencyMillis == 0) { return -1; } @@ -175,7 +180,8 @@ private int onSuccess(final long durationNs) { final long rttMillis = NANOSECONDS.toMillis(durationNs); int newPending; int limit; - synchronized (lock) { + lock.lock(); + try { limit = (int) this.limit; final double longLatencyMillis = longLatency.observe(nowNs, rttMillis); final double shortLatencyMillis = shortLatency.observe(nowNs, rttMillis); @@ -184,6 +190,8 @@ private int onSuccess(final long durationNs) { if ((nowNs - lastSamplingNs) >= limitUpdateIntervalNs) { limit = updateLimit(nowNs, shortLatencyMillis, longLatencyMillis); } + } finally { + lock.unlock(); } observer.onActiveRequestsDecr(); @@ -194,9 +202,12 @@ private int onDrop() { int newPending; double newLimit; - synchronized (lock) { + lock.lock(); + try { newLimit = limit = max(min, limit * (limit >= max ? backoffRatioOnLimit : backoffRatioOnLoss)); newPending = --pending; + } finally { + lock.unlock(); } observer.onActiveRequestsDecr(); @@ -207,9 +218,12 @@ private int onIgnore() { int newPending; double newLimit; - synchronized (lock) { + lock.lock(); + try { newLimit = limit; newPending = --pending; + } finally { + lock.unlock(); } observer.onActiveRequestsDecr(); return (int) (newLimit - newPending); @@ -217,7 +231,8 @@ private int onIgnore() { @Override public String toString() { - synchronized (lock) { + lock.lock(); + try { return "GradientCapacityLimiter{" + ", name='" + name + '\'' + ", min=" + min + @@ -232,6 +247,8 @@ public String toString() { ", limit=" + limit + ", lastSamplingNs=" + lastSamplingNs + '}'; + } finally { + lock.unlock(); } } diff --git a/servicetalk-http-utils/src/main/java/io/servicetalk/http/utils/CacheConnectionFactory.java b/servicetalk-http-utils/src/main/java/io/servicetalk/http/utils/CacheConnectionFactory.java index 6791273d5e..5af8aed19e 100644 --- a/servicetalk-http-utils/src/main/java/io/servicetalk/http/utils/CacheConnectionFactory.java +++ b/servicetalk-http-utils/src/main/java/io/servicetalk/http/utils/CacheConnectionFactory.java @@ -29,12 +29,17 @@ import java.util.HashMap; import java.util.Map; +import java.util.concurrent.locks.Lock; +import java.util.concurrent.locks.ReentrantLock; import java.util.function.ToIntFunction; import javax.annotation.Nullable; final class CacheConnectionFactory extends DelegatingConnectionFactory { private static final Logger LOGGER = LoggerFactory.getLogger(CacheConnectionFactory.class); + + private final Lock lock = new ReentrantLock(); + // access to `map` must be protected by `lock`. private final Map> map = new HashMap<>(); private final ToIntFunction maxConcurrencyFunc; @@ -63,7 +68,8 @@ public Single newConnection(final ResolvedAddress resolvedAddress, @Nullable } Single result; - synchronized (map) { + lock.lock(); + try { final Item item1 = map.get(resolvedAddress); if (item1 == null || (result = item1.addSubscriber(maxConcurrency)) == null) { final Item item2 = new Item<>(); @@ -110,8 +116,11 @@ public void onError(final Throwable t) { } private void lockRemoveFromMap() { - synchronized (map) { + lock.lock(); + try { map.remove(resolvedAddress, item2); + } finally { + lock.unlock(); } } }) @@ -123,8 +132,11 @@ public void onSubscribe(final Cancellable cancellable) { // Acquire the lock before cache operator processes cancel, so if it results // in an upstream cancel we will be holding the lock and able to remove the // map entry safely. - synchronized (map) { + lock.lock(); + try { cancellable.cancel(); + } finally { + lock.unlock(); } }); } @@ -153,12 +165,17 @@ private void lockRemoveFromMap() { // pending connection attempt is exceeded. When the single completes we don't need // to cache the connection anymore because the LoadBalancer above will cache the // connection. This also help keep memory down from the map. - synchronized (map) { + lock.lock(); + try { map.remove(resolvedAddress, item2); + } finally { + lock.unlock(); } } }); } + } finally { + lock.unlock(); } return result.shareContextOnSubscribe();