Skip to content

Commit

Permalink
Changes to mark disconnects as part of publish
Browse files Browse the repository at this point in the history
Signed-off-by: Rahul Karajgikar <[email protected]>
  • Loading branch information
Rahul Karajgikar committed Sep 2, 2024
1 parent 78b6fdd commit 8411788
Show file tree
Hide file tree
Showing 8 changed files with 124 additions and 166 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -52,8 +52,10 @@
import org.opensearch.common.util.concurrent.AbstractRunnable;
import org.opensearch.core.action.ActionListener;
import org.opensearch.threadpool.ThreadPool;
import org.opensearch.transport.ConnectTransportException;
import org.opensearch.transport.TransportService;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
Expand Down Expand Up @@ -185,27 +187,6 @@ public void markPendingJoinsAsComplete(List<DiscoveryNode> nodesConnected) {
}
}

public void disconnectFromNonBlockedNodesExcept(DiscoveryNodes discoveryNodes, DiscoveryNodes.Delta nodesDelta) {
final List<Runnable> runnables = new ArrayList<>();
synchronized (mutex) {
final Set<DiscoveryNode> nodesToDisconnect = new HashSet<>(targetsByNode.keySet());
for (final DiscoveryNode discoveryNode : discoveryNodes) {
nodesToDisconnect.remove(discoveryNode);
}

for (final DiscoveryNode discoveryNode : nodesToDisconnect) {
// if node is trying to be disconnected (node-left) and pendingjoin , skip disconnect and then remove the blocking
if (transportService.getNodesJoinInProgress().contains(discoveryNode)) {
logger.info("Skipping disconnection for node [{}] as it has a join in progress", discoveryNode);
continue;
}
logger.info("NodeConnectionsService - disconnecting from node [{}] in loop", discoveryNode);
runnables.add(targetsByNode.get(discoveryNode).disconnect());
}
}
runnables.forEach(Runnable::run);
}

void ensureConnections(Runnable onCompletion) {
// Called by tests after some disruption has concluded. It is possible that one or more targets are currently CONNECTING and have
// been since the disruption was active, and that the connection attempt was thwarted by a concurrent disruption to the connection.
Expand Down Expand Up @@ -362,20 +343,30 @@ private class ConnectionTarget {
final AbstractRunnable abstractRunnable = this;

@Override
protected void doRun() {
protected void doRun() throws IOException {
assert Thread.holdsLock(mutex) == false : "mutex unexpectedly held";
// if we are trying a connect activity while a node left is in progress, fail
if (transportService.getNodesLeftInProgress().contains(discoveryNode)) {
throw new ConnectTransportException(discoveryNode, "failed to connect while node-left in progress");
}
if (transportService.nodeConnected(discoveryNode)) {
// transportService.connectToNode is a no-op if already connected, but we don't want any DEBUG logging in this case
// since we run this for every node on every cluster state update.
logger.trace("still connected to {}", discoveryNode);
onConnected();
} else {
logger.debug("connecting to {}", discoveryNode);
logger.info("connecting to {}", discoveryNode);
transportService.connectToNode(discoveryNode, new ActionListener<Void>() {
@Override
public void onResponse(Void aVoid) {
assert Thread.holdsLock(mutex) == false : "mutex unexpectedly held";
logger.debug("connected to {}", discoveryNode);
logger.info("connected to {}", discoveryNode);
transportService.markPendingJoinAsCompleted(discoveryNode);
logger.info(
"marked pending join for {} as completed. new list of nodes pending join: {}",
discoveryNode,
transportService.getNodesJoinInProgress()
);
onConnected();
}

Expand Down Expand Up @@ -418,8 +409,13 @@ protected void doRun() {
assert Thread.holdsLock(mutex) == false : "mutex unexpectedly held";
logger.info("disconnecting from {}", discoveryNode);
transportService.disconnectFromNode(discoveryNode);
transportService.markPendingLeftAsCompleted(discoveryNode);
consecutiveFailureCount.set(0);
logger.info("disconnected from {}", discoveryNode);
logger.info(
"disconnected from {} and marked pending left as completed. " + "pending lefts: [{}]",
discoveryNode,
transportService.getNodesLeftInProgress()
);
onCompletion(ActivityType.DISCONNECTING, null, connectActivity);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -631,9 +631,19 @@ private void handleJoinRequest(JoinRequest joinRequest, JoinHelper.JoinCallback
);
return;
}
// add a check here, if node-left is still in progress, we fail the connection
if (transportService.getNodesLeftInProgress().contains(joinRequest.getSourceNode())) {
joinCallback.onFailure(
new IllegalStateException(
"cannot join node [" + joinRequest.getSourceNode() + "] because node-left is currently in progress for this node"

)
);
return;
}

// cluster manager connects to the node
transportService.connectToNodeAndBlockDisconnects(joinRequest.getSourceNode(), ActionListener.wrap(ignore -> {
transportService.connectToNode(joinRequest.getSourceNode(), ActionListener.wrap(ignore -> {
final ClusterState stateForJoinValidation = getStateForClusterManagerService();
if (stateForJoinValidation.nodes().isLocalNodeElectedClusterManager()) {
onJoinValidators.forEach(a -> a.accept(joinRequest.getSourceNode(), stateForJoinValidation));
Expand Down Expand Up @@ -1361,24 +1371,9 @@ assert getLocalNode().equals(clusterState.getNodes().get(getLocalNode().getId())
followersChecker.setCurrentNodes(publishNodes);
lagDetector.setTrackedNodes(publishNodes);
coordinationState.get().handlePrePublish(clusterState);
// join publish is failing
// before we publish, we might need
// can we recreate connections as part of publish if we don't find it?
// reconnect to any nodes that are trying to join, redundancy to avoid node connection wiping by concurrent node-join and
// left
// find diff of nodes from old state and new publishNodes
// this fails because we can't add blocking code to cluster manager thread
// for (DiscoveryNode addedNode : clusterChangedEvent.nodesDelta().addedNodes()) {
// // maybe add a listener here to handle failures
// try {
// transportService.connectToNode(addedNode);
// }
// catch (Exception e) {
// logger.info(() -> new ParameterizedMessage("[{}] failed reconnecting to [{}]", clusterChangedEvent.source(), addedNode),
// e);
// }
// }

// trying to mark pending connects/disconnects before publish
// if we try to connect during pending disconnect or vice versa - fail
transportService.markPendingConnections(clusterChangedEvent.nodesDelta());
publication.start(followersChecker.getFaultyNodes());
}
} catch (Exception e) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -569,7 +569,8 @@ private void applyChanges(UpdateTask task, ClusterState previousClusterState, Cl

logger.info("connecting to nodes of cluster state with version {}", newClusterState.version());
try (TimingHandle ignored = stopWatch.timing("connecting to new nodes")) {
connectToNodesAndWaitAndMarkCompletedJoins(newClusterState, clusterChangedEvent.nodesDelta().addedNodes());
// connectToNodesAndWaitAndMarkCompletedJoins(newClusterState, clusterChangedEvent.nodesDelta().addedNodes());
connectToNodesAndWait(newClusterState);
}

// nothing to do until we actually recover from the gateway or any other block indicates we need to disable persistency
Expand All @@ -584,8 +585,8 @@ private void applyChanges(UpdateTask task, ClusterState previousClusterState, Cl
logger.info("apply cluster state with version {}", newClusterState.version());
callClusterStateAppliers(clusterChangedEvent, stopWatch);

// nodeConnectionsService.disconnectFromNodesExcept(newClusterState.nodes());
nodeConnectionsService.disconnectFromNonBlockedNodesExcept(newClusterState.nodes(), clusterChangedEvent.nodesDelta());
nodeConnectionsService.disconnectFromNodesExcept(newClusterState.nodes());
// nodeConnectionsService.disconnectFromNonBlockedNodesExcept(newClusterState.nodes(), clusterChangedEvent.nodesDelta());

assert newClusterState.coordinationMetadata()
.getLastAcceptedConfiguration()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,8 +44,8 @@
import org.opensearch.core.action.ActionListener;

import java.util.Collections;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentMap;
Expand All @@ -65,7 +65,8 @@ public class ClusterConnectionManager implements ConnectionManager {

private final ConcurrentMap<DiscoveryNode, Transport.Connection> connectedNodes = ConcurrentCollections.newConcurrentMap();
private final ConcurrentMap<DiscoveryNode, ListenableFuture<Void>> pendingConnections = ConcurrentCollections.newConcurrentMap();
private final Set<DiscoveryNode> pendingJoins = new HashSet<>();
private final Set<DiscoveryNode> pendingJoins = ConcurrentCollections.newConcurrentSet();
private final Set<DiscoveryNode> pendingLeft = ConcurrentCollections.newConcurrentSet();
private final AbstractRefCounted connectingRefCounter = new AbstractRefCounted("connection manager") {
@Override
protected void closeInternal() {
Expand Down Expand Up @@ -117,11 +118,33 @@ public Set<DiscoveryNode> getNodesJoinInProgress() {
return this.pendingJoins;
}

@Override
public Set<DiscoveryNode> getNodesLeftInProgress() {
return this.pendingLeft;
}

@Override
public void markPendingJoins(List<DiscoveryNode> nodes) {
logger.info("marking pending join for nodes: [{}]", nodes);
pendingJoins.addAll(nodes);
}

@Override
public void markPendingLefts(List<DiscoveryNode> nodes) {
logger.info("marking pending left for nodes: [{}]", nodes);
pendingLeft.addAll(nodes);
}

@Override
public boolean markPendingJoinCompleted(DiscoveryNode discoveryNode) {
return pendingJoins.remove(discoveryNode);
}

@Override
public boolean markPendingLeftCompleted(DiscoveryNode discoveryNode) {
return pendingLeft.remove(discoveryNode);
}

/**
* Connects to a node with the given connection profile. If the node is already connected this method has no effect.
* Once a successful is established, it can be validated before being exposed.
Expand Down Expand Up @@ -205,89 +228,6 @@ public void connectToNode(
}));
}

// THIS IS ALMOST A COMPLETE COPY OF connectToNode, with a few lines added for pendingJoins list tracking
@Override
public void connectToNodeAndBlockDisconnects(
DiscoveryNode node,
ConnectionProfile connectionProfile,
ConnectionValidator connectionValidator,
ActionListener<Void> listener
) throws ConnectTransportException {
logger.info("[{}]connecting to node [{}] while blocking disconnects", Thread.currentThread().getName(), node);
ConnectionProfile resolvedProfile = ConnectionProfile.resolveConnectionProfile(connectionProfile, defaultProfile);
if (node == null) {
listener.onFailure(new ConnectTransportException(null, "can't connect to a null node"));
return;
}

if (connectingRefCounter.tryIncRef() == false) {
listener.onFailure(new IllegalStateException("connection manager is closed"));
return;
}

if (connectedNodes.containsKey(node)) {
logger.info("connectedNodes already has key for node [{}]", node);
pendingJoins.add(node);
connectingRefCounter.decRef();
listener.onResponse(null);
return;
}

final ListenableFuture<Void> currentListener = new ListenableFuture<>();
final ListenableFuture<Void> existingListener = pendingConnections.putIfAbsent(node, currentListener);
if (existingListener != null) {
try {
// wait on previous entry to complete connection attempt
existingListener.addListener(listener, OpenSearchExecutors.newDirectExecutorService());
} finally {
connectingRefCounter.decRef();
}
return;
}

currentListener.addListener(listener, OpenSearchExecutors.newDirectExecutorService());

final RunOnce releaseOnce = new RunOnce(connectingRefCounter::decRef);
internalOpenConnection(node, resolvedProfile, ActionListener.wrap(conn -> {
connectionValidator.validate(conn, resolvedProfile, ActionListener.wrap(ignored -> {
assert Transports.assertNotTransportThread("connection validator success");
try {
if (connectedNodes.putIfAbsent(node, conn) != null) {
logger.info("existing connection to node [{}], marking as blocked and closing new redundant connection", node);
pendingJoins.add(node);
IOUtils.closeWhileHandlingException(conn);
} else {
logger.info("connected to node [{}]", node);
try {
connectionListener.onNodeConnected(node, conn);
} finally {
pendingJoins.add(node);
final Transport.Connection finalConnection = conn;
conn.addCloseListener(ActionListener.wrap(() -> {
logger.info("unregistering {} after connection close and marking as disconnected", node);
connectedNodes.remove(node, finalConnection);
pendingJoins.remove(node);
connectionListener.onNodeDisconnected(node, conn);
}));
}
}
} finally {
ListenableFuture<Void> future = pendingConnections.remove(node);
assert future == currentListener : "Listener in pending map is different than the expected listener";
releaseOnce.run();
future.onResponse(null);
}
}, e -> {
assert Transports.assertNotTransportThread("connection validator failure");
IOUtils.closeWhileHandlingException(conn);
failConnectionListeners(node, releaseOnce, e, currentListener);
}));
}, e -> {
assert Transports.assertNotTransportThread("internalOpenConnection failure");
failConnectionListeners(node, releaseOnce, e, currentListener);
}));
}

/**
* Returns a connection for the given node if the node is connected.
* Connections returned from this method must not be closed. The lifecycle of this connection is
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
import org.opensearch.core.action.ActionListener;

import java.io.Closeable;
import java.util.List;
import java.util.Set;
import java.util.concurrent.CopyOnWriteArrayList;

Expand All @@ -59,21 +60,22 @@ void connectToNode(
ActionListener<Void> listener
) throws ConnectTransportException;

void connectToNodeAndBlockDisconnects(
DiscoveryNode node,
ConnectionProfile connectionProfile,
ConnectionValidator connectionValidator,
ActionListener<Void> listener
) throws ConnectTransportException;

Transport.Connection getConnection(DiscoveryNode node);

boolean nodeConnected(DiscoveryNode node);

Set<DiscoveryNode> getNodesJoinInProgress();

Set<DiscoveryNode> getNodesLeftInProgress();

boolean markPendingJoinCompleted(DiscoveryNode node);

boolean markPendingLeftCompleted(DiscoveryNode node);

void markPendingJoins(List<DiscoveryNode> nodes);

void markPendingLefts(List<DiscoveryNode> nodes);

void disconnectFromNode(DiscoveryNode node);

Set<DiscoveryNode> getAllConnectedNodes();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -120,17 +120,12 @@ public ConnectionProfile getConnectionProfile() {
}

@Override
public void connectToNodeAndBlockDisconnects(
DiscoveryNode node,
ConnectionProfile connectionProfile,
ConnectionValidator connectionValidator,
ActionListener<Void> listener
) throws ConnectTransportException {
public Set<DiscoveryNode> getNodesJoinInProgress() {
throw new UnsupportedOperationException("not implemented");
}

@Override
public Set<DiscoveryNode> getNodesJoinInProgress() {
public Set<DiscoveryNode> getNodesLeftInProgress() {
throw new UnsupportedOperationException("not implemented");
}

Expand All @@ -139,6 +134,21 @@ public boolean markPendingJoinCompleted(DiscoveryNode node) {
throw new UnsupportedOperationException("not implemented");
}

@Override
public boolean markPendingLeftCompleted(DiscoveryNode node) {
throw new UnsupportedOperationException("not implemented");
}

@Override
public void markPendingJoins(List<DiscoveryNode> nodes) {
throw new UnsupportedOperationException("not implemented");
}

@Override
public void markPendingLefts(List<DiscoveryNode> nodes) {
throw new UnsupportedOperationException("not implemented");
}

public Transport.Connection getAnyRemoteConnection() {
List<DiscoveryNode> localConnectedNodes = this.connectedNodes;
long curr;
Expand Down
Loading

0 comments on commit 8411788

Please sign in to comment.