Skip to content

Commit

Permalink
Move connection listener to ConnectionManager (#32992)
Browse files Browse the repository at this point in the history
This is a followup to #31886. After that commit the
TransportConnectionListener had to be propogated to both the
Transport and the ConnectionManager. This commit moves that listener
to completely live in the ConnectionManager. The request and response
related methods are moved to a TransportMessageListener. That listener
continues to live in the Transport class.
  • Loading branch information
Tim-Brooks authored Aug 21, 2018
1 parent 50426a6 commit 2c75504
Show file tree
Hide file tree
Showing 14 changed files with 163 additions and 170 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -86,14 +86,6 @@ protected MockTransportService build(Settings settings, Version version, Cluster
return transportService;
}

@Override
protected void closeConnectionChannel(Transport transport, Transport.Connection connection) throws IOException {
final Netty4Transport t = (Netty4Transport) transport;
@SuppressWarnings("unchecked")
final TcpTransport.NodeChannels channels = (TcpTransport.NodeChannels) connection;
TcpChannel.closeChannels(channels.getChannels().subList(0, randomIntBetween(1, channels.getChannels().size())), true);
}

public void testConnectException() throws UnknownHostException {
try {
serviceA.connectToNode(new DiscoveryNode("C", new TransportAddress(InetAddress.getByName("localhost"), 9876),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,8 @@ public void removeListener(TransportConnectionListener listener) {
}

public Transport.Connection openConnection(DiscoveryNode node, ConnectionProfile connectionProfile) {
return transport.openConnection(node, ConnectionProfile.resolveConnectionProfile(connectionProfile, defaultProfile));
ConnectionProfile resolvedProfile = ConnectionProfile.resolveConnectionProfile(connectionProfile, defaultProfile);
return internalOpenConnection(node, resolvedProfile);
}

/**
Expand All @@ -115,7 +116,7 @@ public void connectToNode(DiscoveryNode node, ConnectionProfile connectionProfil
}
boolean success = false;
try {
connection = transport.openConnection(node, resolvedProfile);
connection = internalOpenConnection(node, resolvedProfile);
connectionValidator.accept(connection, resolvedProfile);
// we acquire a connection lock, so no way there is an existing connection
connectedNodes.put(node, connection);
Expand Down Expand Up @@ -227,6 +228,19 @@ public void close() {
}
}

private Transport.Connection internalOpenConnection(DiscoveryNode node, ConnectionProfile connectionProfile) {
Transport.Connection connection = transport.openConnection(node, connectionProfile);
try {
connectionListener.onConnectionOpened(connection);
} finally {
connection.addCloseListener(ActionListener.wrap(() -> connectionListener.onConnectionClosed(connection)));
}
if (connection.isClosed()) {
throw new ConnectTransportException(node, "a channel closed while connecting");
}
return connection;
}

private void ensureOpen() {
if (lifecycle.started() == false) {
throw new IllegalStateException("connection manager is closed");
Expand Down Expand Up @@ -289,6 +303,20 @@ public void onNodeConnected(DiscoveryNode node) {
listener.onNodeConnected(node);
}
}

@Override
public void onConnectionOpened(Transport.Connection connection) {
for (TransportConnectionListener listener : listeners) {
listener.onConnectionOpened(connection);
}
}

@Override
public void onConnectionClosed(Transport.Connection connection) {
for (TransportConnectionListener listener : listeners) {
listener.onConnectionClosed(connection);
}
}
}

static ConnectionProfile buildDefaultConnectionProfile(Settings settings) {
Expand Down
78 changes: 18 additions & 60 deletions server/src/main/java/org/elasticsearch/transport/TcpTransport.java
Original file line number Diff line number Diff line change
Expand Up @@ -180,7 +180,7 @@ public abstract class TcpTransport extends AbstractLifecycleComponent implements
protected final NetworkService networkService;
protected final Set<ProfileSettings> profileSettings;

private final DelegatingTransportConnectionListener transportListener = new DelegatingTransportConnectionListener();
private final DelegatingTransportMessageListener messageListener = new DelegatingTransportMessageListener();

private final ConcurrentMap<String, BoundTransportAddress> profileBoundAddresses = newConcurrentMap();

Expand Down Expand Up @@ -246,14 +246,12 @@ public TcpTransport(String transportName, Settings settings, ThreadPool threadPo
protected void doStart() {
}

@Override
public void addConnectionListener(TransportConnectionListener listener) {
transportListener.listeners.add(listener);
public void addMessageListener(TransportMessageListener listener) {
messageListener.listeners.add(listener);
}

@Override
public boolean removeConnectionListener(TransportConnectionListener listener) {
return transportListener.listeners.remove(listener);
public boolean removeMessageListener(TransportMessageListener listener) {
return messageListener.listeners.remove(listener);
}

@Override
Expand Down Expand Up @@ -342,10 +340,6 @@ public TcpChannel channel(TransportRequestOptions.Type type) {
return connectionTypeHandle.getChannel(channels);
}

boolean allChannelsOpen() {
return channels.stream().allMatch(TcpChannel::isOpen);
}

@Override
public boolean sendPing() {
for (TcpChannel channel : channels) {
Expand Down Expand Up @@ -479,22 +473,13 @@ public NodeChannels openConnection(DiscoveryNode node, ConnectionProfile connect
// underlying channels.
nodeChannels = new NodeChannels(node, channels, connectionProfile, version);
final NodeChannels finalNodeChannels = nodeChannels;
try {
transportListener.onConnectionOpened(nodeChannels);
} finally {
nodeChannels.addCloseListener(ActionListener.wrap(() -> transportListener.onConnectionClosed(finalNodeChannels)));
}

Consumer<TcpChannel> onClose = c -> {
assert c.isOpen() == false : "channel is still open when onClose is called";
finalNodeChannels.close();
};

nodeChannels.channels.forEach(ch -> ch.addCloseListener(ActionListener.wrap(() -> onClose.accept(ch))));

if (nodeChannels.allChannelsOpen() == false) {
throw new ConnectTransportException(node, "a channel closed while connecting");
}
success = true;
return nodeChannels;
} catch (ConnectTransportException e) {
Expand Down Expand Up @@ -895,7 +880,7 @@ private void sendRequestToChannel(final DiscoveryNode node, final TcpChannel cha
final TransportRequestOptions finalOptions = options;
// this might be called in a different thread
SendListener onRequestSent = new SendListener(channel, stream,
() -> transportListener.onRequestSent(node, requestId, action, request, finalOptions), message.length());
() -> messageListener.onRequestSent(node, requestId, action, request, finalOptions), message.length());
internalSendMessage(channel, message, onRequestSent);
addedReleaseListener = true;
} finally {
Expand Down Expand Up @@ -949,7 +934,7 @@ public void sendErrorResponse(
final BytesReference header = buildHeader(requestId, status, nodeVersion, bytes.length());
CompositeBytesReference message = new CompositeBytesReference(header, bytes);
SendListener onResponseSent = new SendListener(channel, null,
() -> transportListener.onResponseSent(requestId, action, error), message.length());
() -> messageListener.onResponseSent(requestId, action, error), message.length());
internalSendMessage(channel, message, onResponseSent);
}
}
Expand Down Expand Up @@ -998,7 +983,7 @@ private void sendResponse(
final TransportResponseOptions finalOptions = options;
// this might be called in a different thread
SendListener listener = new SendListener(channel, stream,
() -> transportListener.onResponseSent(requestId, action, response, finalOptions), message.length());
() -> messageListener.onResponseSent(requestId, action, response, finalOptions), message.length());
internalSendMessage(channel, message, listener);
addedReleaseListener = true;
} finally {
Expand Down Expand Up @@ -1193,7 +1178,7 @@ public final void messageReceived(BytesReference reference, TcpChannel channel,
if (isHandshake) {
handler = pendingHandshakes.remove(requestId);
} else {
TransportResponseHandler theHandler = responseHandlers.onResponseReceived(requestId, transportListener);
TransportResponseHandler theHandler = responseHandlers.onResponseReceived(requestId, messageListener);
if (theHandler == null && TransportStatus.isError(status)) {
handler = pendingHandshakes.remove(requestId);
} else {
Expand Down Expand Up @@ -1300,7 +1285,7 @@ protected String handleRequest(TcpChannel channel, String profileName, final Str
features = Collections.emptySet();
}
final String action = stream.readString();
transportListener.onRequestReceived(requestId, action);
messageListener.onRequestReceived(requestId, action);
TransportChannel transportChannel = null;
try {
if (TransportStatus.isHandshake(status)) {
Expand Down Expand Up @@ -1609,69 +1594,42 @@ public ProfileSettings(Settings settings, String profileName) {
}
}

private static final class DelegatingTransportConnectionListener implements TransportConnectionListener {
private final List<TransportConnectionListener> listeners = new CopyOnWriteArrayList<>();
private static final class DelegatingTransportMessageListener implements TransportMessageListener {

private final List<TransportMessageListener> listeners = new CopyOnWriteArrayList<>();

@Override
public void onRequestReceived(long requestId, String action) {
for (TransportConnectionListener listener : listeners) {
for (TransportMessageListener listener : listeners) {
listener.onRequestReceived(requestId, action);
}
}

@Override
public void onResponseSent(long requestId, String action, TransportResponse response, TransportResponseOptions finalOptions) {
for (TransportConnectionListener listener : listeners) {
for (TransportMessageListener listener : listeners) {
listener.onResponseSent(requestId, action, response, finalOptions);
}
}

@Override
public void onResponseSent(long requestId, String action, Exception error) {
for (TransportConnectionListener listener : listeners) {
for (TransportMessageListener listener : listeners) {
listener.onResponseSent(requestId, action, error);
}
}

@Override
public void onRequestSent(DiscoveryNode node, long requestId, String action, TransportRequest request,
TransportRequestOptions finalOptions) {
for (TransportConnectionListener listener : listeners) {
for (TransportMessageListener listener : listeners) {
listener.onRequestSent(node, requestId, action, request, finalOptions);
}
}

@Override
public void onNodeDisconnected(DiscoveryNode key) {
for (TransportConnectionListener listener : listeners) {
listener.onNodeDisconnected(key);
}
}

@Override
public void onConnectionOpened(Connection nodeChannels) {
for (TransportConnectionListener listener : listeners) {
listener.onConnectionOpened(nodeChannels);
}
}

@Override
public void onNodeConnected(DiscoveryNode node) {
for (TransportConnectionListener listener : listeners) {
listener.onNodeConnected(node);
}
}

@Override
public void onConnectionClosed(Connection nodeChannels) {
for (TransportConnectionListener listener : listeners) {
listener.onConnectionClosed(nodeChannels);
}
}

@Override
public void onResponseReceived(long requestId, ResponseContext holder) {
for (TransportConnectionListener listener : listeners) {
for (TransportMessageListener listener : listeners) {
listener.onResponseReceived(requestId, holder);
}
}
Expand Down
15 changes: 3 additions & 12 deletions server/src/main/java/org/elasticsearch/transport/Transport.java
Original file line number Diff line number Diff line change
Expand Up @@ -56,18 +56,9 @@ public interface Transport extends LifecycleComponent {
*/
RequestHandlerRegistry getRequestHandler(String action);

/**
* Adds a new event listener
* @param listener the listener to add
*/
void addConnectionListener(TransportConnectionListener listener);
void addMessageListener(TransportMessageListener listener);

/**
* Removes an event listener
* @param listener the listener to remove
* @return <code>true</code> iff the listener was removed otherwise <code>false</code>
*/
boolean removeConnectionListener(TransportConnectionListener listener);
boolean removeMessageListener(TransportMessageListener listener);

/**
* The address the transport is bound on.
Expand Down Expand Up @@ -254,7 +245,7 @@ public List<ResponseContext> prune(Predicate<ResponseContext> predicate) {
* sent request (before any processing or deserialization was done). Returns the appropriate response handler or null if not
* found.
*/
public TransportResponseHandler onResponseReceived(final long requestId, TransportConnectionListener listener) {
public TransportResponseHandler onResponseReceived(final long requestId, TransportMessageListener listener) {
ResponseContext context = handlers.remove(requestId);
listener.onResponseReceived(requestId, context);
if (context == null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,42 +28,6 @@
*/
public interface TransportConnectionListener {

/**
* Called once a request is received
* @param requestId the internal request ID
* @param action the request action
*
*/
default void onRequestReceived(long requestId, String action) {}

/**
* Called for every action response sent after the response has been passed to the underlying network implementation.
* @param requestId the request ID (unique per client)
* @param action the request action
* @param response the response send
* @param finalOptions the response options
*/
default void onResponseSent(long requestId, String action, TransportResponse response, TransportResponseOptions finalOptions) {}

/***
* Called for every failed action response after the response has been passed to the underlying network implementation.
* @param requestId the request ID (unique per client)
* @param action the request action
* @param error the error sent back to the caller
*/
default void onResponseSent(long requestId, String action, Exception error) {}

/**
* Called for every request sent to a server after the request has been passed to the underlying network implementation
* @param node the node the request was sent to
* @param requestId the internal request id
* @param action the action name
* @param request the actual request
* @param finalOptions the request options
*/
default void onRequestSent(DiscoveryNode node, long requestId, String action, TransportRequest request,
TransportRequestOptions finalOptions) {}

/**
* Called once a connection was opened
* @param connection the connection
Expand All @@ -76,13 +40,6 @@ default void onConnectionOpened(Transport.Connection connection) {}
*/
default void onConnectionClosed(Transport.Connection connection) {}

/**
* Called for every response received
* @param requestId the request id for this reponse
* @param context the response context or null if the context was already processed ie. due to a timeout.
*/
default void onResponseReceived(long requestId, Transport.ResponseContext context) {}

/**
* Called once a node connection is opened and registered.
*/
Expand Down
Loading

0 comments on commit 2c75504

Please sign in to comment.