Skip to content

Commit

Permalink
Fixes #4904 - WebsocketClient creates more connections than needed.
Browse files Browse the repository at this point in the history
More fixes to the connection pool logic.
Now the connection creation is conditional, triggered by
explicit send() or failures.
The connection creation is not triggered _after_ a send(),
where we aggressively send more queued requests - or
in release(), where we send queued request after a previous
one was completed.

Signed-off-by: Simone Bordet <[email protected]>
  • Loading branch information
sbordet committed May 28, 2020
1 parent 05f538d commit f33a12b
Show file tree
Hide file tree
Showing 5 changed files with 219 additions and 50 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -98,11 +98,17 @@ public boolean isClosed()

@Override
public Connection acquire()
{
return acquire(true);
}

protected Connection acquire(boolean create)
{
Connection connection = activate();
if (connection == null)
{
tryCreate(destination.getQueuedRequestCount());
if (create)
tryCreate(destination.getQueuedRequestCount());
connection = activate();
}
return connection;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -244,7 +244,7 @@ public ConnectionPool getConnectionPool()
@Override
public void succeeded()
{
send();
send(false);
}

@Override
Expand Down Expand Up @@ -307,26 +307,36 @@ protected boolean enqueue(Queue<HttpExchange> queue, HttpExchange exchange)
}

public void send()
{
send(true);
}

private void send(boolean create)
{
if (getHttpExchanges().isEmpty())
return;
process();
process(create);
}

private void process()
private void process(boolean create)
{
while (true)
{
Connection connection = connectionPool.acquire();
Connection connection;
if (connectionPool instanceof AbstractConnectionPool)
connection = ((AbstractConnectionPool)connectionPool).acquire(create);
else
connection = connectionPool.acquire();
if (connection == null)
break;
boolean proceed = process(connection);
if (!proceed)
ProcessResult result = process(connection);
if (result == ProcessResult.FINISH)
break;
create = result == ProcessResult.RESTART;
}
}

public boolean process(Connection connection)
public ProcessResult process(Connection connection)
{
HttpClient client = getHttpClient();
HttpExchange exchange = getHttpExchanges().poll();
Expand All @@ -342,7 +352,7 @@ public boolean process(Connection connection)
LOG.debug("{} is stopping", client);
connection.close();
}
return false;
return ProcessResult.FINISH;
}
else
{
Expand All @@ -359,25 +369,28 @@ public boolean process(Connection connection)
// is created. Aborting the exchange a second time will result in
// a no-operation, so we just abort here to cover that edge case.
exchange.abort(cause);
return getHttpExchanges().size() > 0 ? ProcessResult.CONTINUE : ProcessResult.FINISH;
}
else

SendFailure result = send(connection, exchange);
if (result == null)
{
SendFailure result = send(connection, exchange);
if (result != null)
{
if (LOG.isDebugEnabled())
LOG.debug("Send failed {} for {}", result, exchange);
if (result.retry)
{
// Resend this exchange, likely on another connection,
// and return false to avoid to re-enter this method.
send(exchange);
return false;
}
request.abort(result.failure);
}
// Aggressively send other queued requests
// in case connections are multiplexed.
return getHttpExchanges().size() > 0 ? ProcessResult.CONTINUE : ProcessResult.FINISH;
}
return getHttpExchanges().peek() != null;

if (LOG.isDebugEnabled())
LOG.debug("Send failed {} for {}", result, exchange);
if (result.retry)
{
// Resend this exchange, likely on another connection,
// and return false to avoid to re-enter this method.
send(exchange);
return ProcessResult.FINISH;
}
request.abort(result.failure);
return getHttpExchanges().size() > 0 ? ProcessResult.RESTART : ProcessResult.FINISH;
}
}

Expand Down Expand Up @@ -419,7 +432,7 @@ public void release(Connection connection)
if (connectionPool.isActive(connection))
{
if (connectionPool.release(connection))
send();
send(false);
else
connection.close();
}
Expand Down Expand Up @@ -456,7 +469,7 @@ public void close(Connection connection)
// We may create a connection that is not needed, but it will eventually
// idle timeout, so no worries.
if (removed)
process();
process(true);
}
}

Expand Down Expand Up @@ -581,4 +594,9 @@ private void schedule(long expiresAt)
}
}
}

private enum ProcessResult
{
RESTART, CONTINUE, FINISH
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,14 @@ public void setMaxMultiplex(int maxMultiplex)
}
}

@Override
protected Connection acquire(boolean create)
{
// The nature of this connection pool is such that a
// connection must always be present in the next slot.
return super.acquire(true);
}

@Override
protected void onCreated(Connection connection)
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,10 +45,14 @@
import org.eclipse.jetty.util.IO;
import org.eclipse.jetty.util.Promise;
import org.eclipse.jetty.util.SocketAddressResolver;
import org.eclipse.jetty.util.thread.QueuedThreadPool;
import org.hamcrest.Matchers;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.Assumptions;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.MethodSource;

import static org.hamcrest.MatcherAssert.assertThat;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertTrue;

Expand All @@ -61,9 +65,9 @@ public class ConnectionPoolTest
public static Stream<ConnectionPoolFactory> pools()
{
return Stream.of(
new ConnectionPoolFactory("duplex", destination -> new DuplexConnectionPool(destination, 8, destination)),
new ConnectionPoolFactory("round-robin", destination -> new RoundRobinConnectionPool(destination, 8, destination)),
new ConnectionPoolFactory("multiplex", destination -> new MultiplexConnectionPool(destination, 8, destination, 1))
new ConnectionPoolFactory("duplex", destination -> new DuplexConnectionPool(destination, destination.getHttpClient().getMaxConnectionsPerDestination(), destination)),
new ConnectionPoolFactory("round-robin", destination -> new RoundRobinConnectionPool(destination, destination.getHttpClient().getMaxConnectionsPerDestination(), destination)),
new ConnectionPoolFactory("multiplex", destination -> new MultiplexConnectionPool(destination, destination.getHttpClient().getMaxConnectionsPerDestination(), destination, 1))
);
}

Expand Down Expand Up @@ -238,8 +242,11 @@ else if (serverClose)
@MethodSource("pools")
public void testQueuedRequestsDontOpenTooManyConnections(ConnectionPoolFactory factory) throws Exception
{
start(factory.factory, new EmptyServerHandler());
startServer(new EmptyServerHandler());

HttpClientTransport transport = new HttpClientTransportOverHTTP(1);
transport.setConnectionPoolFactory(factory.factory);
client = new HttpClient(transport, null);
long delay = 1000;
client.setSocketAddressResolver(new SocketAddressResolver.Sync()
{
Expand All @@ -260,6 +267,7 @@ public void resolve(String host, int port, Promise<List<InetSocketAddress>> prom
});
}
});
client.start();

CountDownLatch latch = new CountDownLatch(2);
client.newRequest("localhost", connector.getLocalPort())
Expand All @@ -286,6 +294,63 @@ public void resolve(String host, int port, Promise<List<InetSocketAddress>> prom
assertEquals(2, connectionPool.getConnectionCount());
}

@ParameterizedTest
@MethodSource("pools")
public void testConcurrentRequestsDontOpenTooManyConnections(ConnectionPoolFactory factory) throws Exception
{
// Round robin connection pool does open a few more connections than expected.
Assumptions.assumeFalse(factory.name.equals("round-robin"));

startServer(new EmptyServerHandler());

int count = 500;
QueuedThreadPool clientThreads = new QueuedThreadPool(2 * count);
clientThreads.setName("client");
HttpClientTransport transport = new HttpClientTransportOverHTTP(1);
transport.setConnectionPoolFactory(factory.factory);
client = new HttpClient(transport, null);
client.setExecutor(clientThreads);
client.setMaxConnectionsPerDestination(2 * count);
client.setSocketAddressResolver(new SocketAddressResolver.Sync()
{
@Override
public void resolve(String host, int port, Promise<List<InetSocketAddress>> promise)
{
client.getExecutor().execute(() ->
{
try
{
Thread.sleep(100);
super.resolve(host, port, promise);
}
catch (InterruptedException x)
{
promise.failed(x);
}
});
}
});
client.start();

CountDownLatch latch = new CountDownLatch(count);
for (int i = 0; i < count; ++i)
{
clientThreads.execute(() -> client.newRequest("localhost", connector.getLocalPort())
.send(result ->
{
if (result.isSucceeded())
latch.countDown();
}));
}

assertTrue(latch.await(count, TimeUnit.SECONDS));
List<Destination> destinations = client.getDestinations();
assertEquals(1, destinations.size());
HttpDestination destination = (HttpDestination)destinations.get(0);
AbstractConnectionPool connectionPool = (AbstractConnectionPool)destination.getConnectionPool();
assertThat(connectionPool.getConnectionCount(), Matchers.lessThanOrEqualTo(count));
}

private static class ConnectionPoolFactory
{
private final String name;
Expand Down
Loading

0 comments on commit f33a12b

Please sign in to comment.