diff --git a/jetty-client/src/main/java/org/eclipse/jetty/client/AbstractConnectionPool.java b/jetty-client/src/main/java/org/eclipse/jetty/client/AbstractConnectionPool.java
index 84e9b3790dbf..fc05424eeaa5 100644
--- a/jetty-client/src/main/java/org/eclipse/jetty/client/AbstractConnectionPool.java
+++ b/jetty-client/src/main/java/org/eclipse/jetty/client/AbstractConnectionPool.java
@@ -20,7 +20,9 @@
import java.io.IOException;
import java.util.ArrayDeque;
+import java.util.ArrayList;
import java.util.Collection;
+import java.util.List;
import java.util.Queue;
import java.util.concurrent.CompletableFuture;
import java.util.stream.Collectors;
@@ -82,12 +84,63 @@ protected void doStop() throws Exception
@Override
public CompletableFuture preCreateConnections(int connectionCount)
{
- CompletableFuture>[] futures = new CompletableFuture[connectionCount];
- for (int i = 0; i < connectionCount; i++)
+ if (LOG.isDebugEnabled())
+ LOG.debug("Precreating connections {}/{}", connectionCount, getMaxConnectionCount());
+
+ List> futures = new ArrayList<>();
+ loop : for (int i = 0; i < connectionCount; i++)
{
- futures[i] = tryCreateAsync(getMaxConnectionCount());
+ Pool.Entry entry = pool.reserve();
+ while (entry == null)
+ {
+ if (pool.size() >= pool.getMaxEntries())
+ break loop;
+ if (pool.getMaxMultiplex() <= 1)
+ throw new IllegalStateException();
+ entry = pool.reserve();
+ }
+
+ final Pool.Entry reserved = entry;
+
+ Promise.Completable future = new Promise.Completable()
+ {
+ @Override
+ public void succeeded(Connection connection)
+ {
+ if (LOG.isDebugEnabled())
+ LOG.debug("Connection creation succeeded {}: {}", reserved, connection);
+ if (connection instanceof Attachable)
+ {
+ ((Attachable)connection).setAttachment(reserved);
+ onCreated(connection);
+ reserved.enable(connection, false);
+ idle(connection, false);
+ complete(null);
+ proceed();
+ }
+ else
+ {
+ failed(new IllegalArgumentException("Invalid connection object: " + connection));
+ }
+ }
+
+ @Override
+ public void failed(Throwable x)
+ {
+ if (LOG.isDebugEnabled())
+ LOG.debug("Connection creation failed {}", reserved, x);
+ reserved.remove();
+ completeExceptionally(x);
+ requester.failed(x);
+ }
+ };
+
+ futures.add(future);
+ if (LOG.isDebugEnabled())
+ LOG.debug("Creating connection {}/{} at {}", futures.size() + 1, getMaxConnectionCount(), reserved);
+ destination.newConnection(future);
}
- return CompletableFuture.allOf(futures);
+ return CompletableFuture.allOf(futures.toArray(new CompletableFuture[0]));
}
protected int getMaxMultiplex()
@@ -200,7 +253,7 @@ public Connection acquire()
*
* @param create a hint to attempt to open a new connection if no idle connections are available
* @return an idle connection or {@code null} if no idle connections are available
- * @see #tryCreate(int)
+ * @see #tryCreate()
*/
protected Connection acquire(boolean create)
{
@@ -210,20 +263,7 @@ protected Connection acquire(boolean create)
if (connection == null)
{
if (create || isMaximizeConnections())
- {
- // Try to forcibly create a connection if none is available.
- tryCreate(-1);
- }
- else
- {
- // QueuedRequests may be stale and different from pool.pending.
- // So tryCreate() may be a no-operation (when queuedRequests < pool.pending);
- // or tryCreate() may create more connections than necessary, when
- // queuedRequests read below is stale and some request has just been
- // dequeued to be processed causing queuedRequests > pool.pending.
- int queuedRequests = destination.getQueuedRequestCount();
- tryCreate(queuedRequests);
- }
+ tryCreate();
connection = activate();
}
return connection;
@@ -236,28 +276,22 @@ protected Connection acquire(boolean create)
* then this method returns without scheduling the opening of a new connection;
* if {@code maxPending} is negative, a new connection is always scheduled for opening.
*
- * @param maxPending the max desired number of connections scheduled for opening,
* or a negative number to always trigger the opening of a new connection
*/
- protected void tryCreate(int maxPending)
- {
- tryCreateAsync(maxPending);
- }
-
- private CompletableFuture> tryCreateAsync(int maxPending)
+ protected void tryCreate()
{
int connectionCount = getConnectionCount();
if (LOG.isDebugEnabled())
- LOG.debug("Try creating connection {}/{} with {}/{} pending", connectionCount, getMaxConnectionCount(), getPendingConnectionCount(), maxPending);
+ LOG.debug("Try creating connection {}/{} with {} pending", connectionCount, getMaxConnectionCount(), getPendingConnectionCount());
- Pool.Entry entry = pool.reserve(maxPending);
+ Pool.Entry entry = pool.reserve();
if (entry == null)
- return CompletableFuture.completedFuture(null);
+ return;
if (LOG.isDebugEnabled())
LOG.debug("Creating connection {}/{} at {}", connectionCount, getMaxConnectionCount(), entry);
- Promise.Completable future = new Promise.Completable()
+ Promise future = new Promise()
{
@Override
public void succeeded(Connection connection)
@@ -270,7 +304,6 @@ public void succeeded(Connection connection)
onCreated(connection);
entry.enable(connection, false);
idle(connection, false);
- complete(null);
proceed();
}
else
@@ -285,14 +318,11 @@ public void failed(Throwable x)
if (LOG.isDebugEnabled())
LOG.debug("Connection {}/{} creation failed at {}", connectionCount, getMaxConnectionCount(), entry, x);
entry.remove();
- completeExceptionally(x);
requester.failed(x);
}
};
destination.newConnection(future);
-
- return future;
}
protected void proceed()
diff --git a/jetty-client/src/test/java/org/eclipse/jetty/client/ConnectionPoolHelper.java b/jetty-client/src/test/java/org/eclipse/jetty/client/ConnectionPoolHelper.java
index ffe72e1ee279..4f8a0a11e5d0 100644
--- a/jetty-client/src/test/java/org/eclipse/jetty/client/ConnectionPoolHelper.java
+++ b/jetty-client/src/test/java/org/eclipse/jetty/client/ConnectionPoolHelper.java
@@ -19,7 +19,6 @@
package org.eclipse.jetty.client;
import org.eclipse.jetty.client.api.Connection;
-import org.eclipse.jetty.client.api.Destination;
public class ConnectionPoolHelper
{
@@ -30,6 +29,6 @@ public static Connection acquire(AbstractConnectionPool connectionPool, boolean
public static void tryCreate(AbstractConnectionPool connectionPool, int pending)
{
- connectionPool.tryCreate(pending);
+ connectionPool.tryCreate();
}
}
diff --git a/jetty-util/src/main/java/org/eclipse/jetty/util/Pool.java b/jetty-util/src/main/java/org/eclipse/jetty/util/Pool.java
index 9a9752cad47e..eefa4b1fa84d 100644
--- a/jetty-util/src/main/java/org/eclipse/jetty/util/Pool.java
+++ b/jetty-util/src/main/java/org/eclipse/jetty/util/Pool.java
@@ -54,7 +54,7 @@ public class Pool implements AutoCloseable, Dumpable
private final List entries = new CopyOnWriteArrayList<>();
private final int maxEntries;
- private final AtomicInteger pending = new AtomicInteger();
+ private final AtomicBiInteger pending = new AtomicBiInteger(); // Lo reserved; Hi demand
private final StrategyType strategyType;
/*
@@ -137,7 +137,12 @@ public Pool(StrategyType strategyType, int maxEntries, boolean cache)
public int getReservedCount()
{
- return pending.get();
+ return pending.getLo();
+ }
+
+ public int getDemand()
+ {
+ return pending.getHi();
}
public int getIdleCount()
@@ -216,7 +221,9 @@ public final void setMaxUsageCount(int maxUsageCount)
* @return a disabled entry that is contained in the pool,
* or null if the pool is closed or if the pool already contains
* {@link #getMaxEntries()} entries, or the allotment has already been reserved
+ * @deprecated Use {@link #reserve()}
*/
+ @Deprecated
public Entry reserve(int allotment)
{
try (Locker.Lock l = locker.lock())
@@ -231,9 +238,59 @@ public Entry reserve(int allotment)
// The pending count is an AtomicInteger that is only ever incremented here with
// the lock held. Thus the pending count can be reduced immediately after the
// test below, but never incremented. Thus the allotment limit can be enforced.
- if (allotment >= 0 && (pending.get() * getMaxMultiplex()) >= allotment)
+ if (allotment >= 0 && (pending.getLo() * getMaxMultiplex()) >= allotment)
+ return null;
+ pending.addAndGetLo(1);
+
+ Entry entry = new Entry();
+ entries.add(entry);
+ return entry;
+ }
+ }
+
+ /**
+ * Create a new disabled slot into the pool.
+ * The returned entry must ultimately have the {@link Entry#enable(Object, boolean)}
+ * method called or be removed via {@link Pool.Entry#remove()} or
+ * {@link Pool#remove(Pool.Entry)}.
+ *
For multiplexed entries, a call to reserve may return null if a previously
+ * reserved entry has excess capacity, which is determined by each call to
+ * reserve() incrementing demand.
+ *
+ * @return a disabled entry that is contained in the pool,
+ * or null if the pool is closed or if the pool already contains
+ * {@link #getMaxEntries()} entries, or the allotment has already been reserved
+ */
+ public Entry reserve()
+ {
+ try (Locker.Lock l = locker.lock())
+ {
+ if (closed)
return null;
- pending.incrementAndGet();
+
+ int space = maxEntries - entries.size();
+ if (space <= 0)
+ return null;
+
+ while (true)
+ {
+ long encoded = pending.get();
+ int reserved = AtomicBiInteger.getLo(encoded);
+ int demand = AtomicBiInteger.getHi(encoded);
+
+ if (reserved * getMaxMultiplex() <= demand)
+ {
+ // we need a new connection
+ if (pending.compareAndSet(encoded, demand + 1, reserved + 1))
+ break;
+ }
+ else
+ {
+ // We increment demand on existing reservations
+ if (pending.compareAndSet(encoded, demand + 1, reserved))
+ return null;
+ }
+ }
Entry entry = new Entry();
entries.add(entry);
@@ -342,7 +399,7 @@ public Entry acquire(Function.Entry, T> creator)
if (entry != null)
return entry;
- entry = reserve(-1);
+ entry = reserve();
if (entry == null)
return null;
@@ -457,12 +514,14 @@ public void dump(Appendable out, String indent) throws IOException
@Override
public String toString()
{
- return String.format("%s@%x[size=%d closed=%s pending=%d]",
+ long encoded = pending.get();
+ return String.format("%s@%x[size=%d closed=%s reserved=%d, demand=%d]",
getClass().getSimpleName(),
hashCode(),
entries.size(),
closed,
- pending.get());
+ AtomicBiInteger.getLo(encoded),
+ AtomicBiInteger.getHi(encoded));
}
public class Entry
@@ -488,7 +547,7 @@ void setUsageCount(int usageCount)
}
/** Enable a reserved entry {@link Entry}.
- * An entry returned from the {@link #reserve(int)} method must be enabled with this method,
+ * An entry returned from the {@link #reserve()} method must be enabled with this method,
* once and only once, before it is usable by the pool.
* The entry may be enabled and not acquired, in which case it is immediately available to be
* acquired, potentially by another thread; or it can be enabled and acquired atomically so that
@@ -517,7 +576,17 @@ public boolean enable(T pooled, boolean acquire)
return false; // Pool has been closed
throw new IllegalStateException("Entry already enabled: " + this);
}
- pending.decrementAndGet();
+
+ while (true)
+ {
+ long encoded = pending.get();
+ int reserved = AtomicBiInteger.getLo(encoded);
+ int demand = AtomicBiInteger.getHi(encoded);
+ reserved = reserved - 1;
+ demand = Math.max(0, demand - getMaxMultiplex());
+ if (pending.compareAndSet(encoded, demand, reserved))
+ break;
+ }
return true;
}
@@ -620,7 +689,7 @@ boolean tryRemove()
if (removed)
{
if (usageCount == Integer.MIN_VALUE)
- pending.decrementAndGet();
+ pending.addAndGetLo(-1);
return newMultiplexCount == 0;
}
}
diff --git a/jetty-util/src/test/java/org/eclipse/jetty/util/PoolTest.java b/jetty-util/src/test/java/org/eclipse/jetty/util/PoolTest.java
index a046b395d5a8..add28d11b9d9 100644
--- a/jetty-util/src/test/java/org/eclipse/jetty/util/PoolTest.java
+++ b/jetty-util/src/test/java/org/eclipse/jetty/util/PoolTest.java
@@ -50,7 +50,6 @@
public class PoolTest
{
-
interface Factory
{
Pool getPool(int maxSize);
@@ -71,7 +70,7 @@ public static Stream