Skip to content

Commit

Permalink
cleanup unused code and remove added log lines
Browse files Browse the repository at this point in the history
Signed-off-by: Rahul Karajgikar <[email protected]>
  • Loading branch information
Rahul Karajgikar committed Sep 2, 2024
1 parent 8411788 commit f0cf40c
Show file tree
Hide file tree
Showing 15 changed files with 31 additions and 171 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -52,10 +52,8 @@
import org.opensearch.common.util.concurrent.AbstractRunnable;
import org.opensearch.core.action.ActionListener;
import org.opensearch.threadpool.ThreadPool;
import org.opensearch.transport.ConnectTransportException;
import org.opensearch.transport.TransportService;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
Expand Down Expand Up @@ -181,12 +179,6 @@ public void disconnectFromNodesExcept(DiscoveryNodes discoveryNodes) {
runnables.forEach(Runnable::run);
}

public void markPendingJoinsAsComplete(List<DiscoveryNode> nodesConnected) {
for (final DiscoveryNode discoveryNode : nodesConnected) {
transportService.markPendingJoinAsCompleted(discoveryNode);
}
}

void ensureConnections(Runnable onCompletion) {
// Called by tests after some disruption has concluded. It is possible that one or more targets are currently CONNECTING and have
// been since the disruption was active, and that the connection attempt was thwarted by a concurrent disruption to the connection.
Expand Down Expand Up @@ -343,30 +335,20 @@ private class ConnectionTarget {
final AbstractRunnable abstractRunnable = this;

@Override
protected void doRun() throws IOException {
protected void doRun() {
assert Thread.holdsLock(mutex) == false : "mutex unexpectedly held";
// if we are trying a connect activity while a node left is in progress, fail
if (transportService.getNodesLeftInProgress().contains(discoveryNode)) {
throw new ConnectTransportException(discoveryNode, "failed to connect while node-left in progress");
}
if (transportService.nodeConnected(discoveryNode)) {
// transportService.connectToNode is a no-op if already connected, but we don't want any DEBUG logging in this case
// since we run this for every node on every cluster state update.
logger.trace("still connected to {}", discoveryNode);
onConnected();
} else {
logger.info("connecting to {}", discoveryNode);
logger.debug("connecting to {}", discoveryNode);
transportService.connectToNode(discoveryNode, new ActionListener<Void>() {
@Override
public void onResponse(Void aVoid) {
assert Thread.holdsLock(mutex) == false : "mutex unexpectedly held";
logger.info("connected to {}", discoveryNode);
transportService.markPendingJoinAsCompleted(discoveryNode);
logger.info(
"marked pending join for {} as completed. new list of nodes pending join: {}",
discoveryNode,
transportService.getNodesJoinInProgress()
);
logger.debug("connected to {}", discoveryNode);
onConnected();
}

Expand Down Expand Up @@ -407,11 +389,10 @@ public String toString() {
@Override
protected void doRun() {
assert Thread.holdsLock(mutex) == false : "mutex unexpectedly held";
logger.info("disconnecting from {}", discoveryNode);
transportService.disconnectFromNode(discoveryNode);
transportService.markPendingLeftAsCompleted(discoveryNode);
consecutiveFailureCount.set(0);
logger.info(
logger.debug(
"disconnected from {} and marked pending left as completed. " + "pending lefts: [{}]",
discoveryNode,
transportService.getNodesLeftInProgress()
Expand Down Expand Up @@ -444,7 +425,6 @@ Runnable connect(@Nullable ActionListener<Void> listener) {
}

Runnable disconnect() {
logger.info("running runnable disconnect");
return addListenerAndStartActivity(
null,
ActivityType.DISCONNECTING,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -417,8 +417,7 @@ PublishWithJoinResponse handlePublishRequest(PublishRequest publishRequest) {

synchronized (mutex) {
final DiscoveryNode sourceNode = publishRequest.getAcceptedState().nodes().getClusterManagerNode();
logger.trace("handlePublishRequest: handling [{}] from [{}]", publishRequest, sourceNode);
logger.info(
logger.debug(
"handlePublishRequest: handling version [{}] from [{}]",
publishRequest.getAcceptedState().getVersion(),
sourceNode
Expand Down Expand Up @@ -631,7 +630,7 @@ private void handleJoinRequest(JoinRequest joinRequest, JoinHelper.JoinCallback
);
return;
}
// add a check here, if node-left is still in progress, we fail the connection
// if node-left is still in progress, we fail the joinRequest early
if (transportService.getNodesLeftInProgress().contains(joinRequest.getSourceNode())) {
joinCallback.onFailure(
new IllegalStateException(
Expand All @@ -642,7 +641,6 @@ private void handleJoinRequest(JoinRequest joinRequest, JoinHelper.JoinCallback
return;
}

// cluster manager connects to the node
transportService.connectToNode(joinRequest.getSourceNode(), ActionListener.wrap(ignore -> {
final ClusterState stateForJoinValidation = getStateForClusterManagerService();
if (stateForJoinValidation.nodes().isLocalNodeElectedClusterManager()) {
Expand Down Expand Up @@ -781,7 +779,7 @@ void becomeFollower(String method, DiscoveryNode leaderNode) {
if (mode == Mode.FOLLOWER && Optional.of(leaderNode).equals(lastKnownLeader)) {
logger.trace("{}: coordinator remaining FOLLOWER of [{}] in term {}", method, leaderNode, getCurrentTerm());
} else {
logger.info(
logger.debug(
"{}: coordinator becoming FOLLOWER of [{}] in term {} (was {}, lastKnownLeader was [{}])",
method,
leaderNode,
Expand Down Expand Up @@ -924,7 +922,7 @@ public DiscoveryStats stats() {
@Override
public void startInitialJoin() {
synchronized (mutex) {
logger.info("Starting initial join, becoming candidate");
logger.trace("Starting initial join, becoming candidate");
becomeCandidate("startInitialJoin");
}
clusterBootstrapService.scheduleUnconfiguredBootstrap();
Expand Down Expand Up @@ -1366,13 +1364,12 @@ assert getLocalNode().equals(clusterState.getNodes().get(getLocalNode().getId())
currentPublication = Optional.of(publication);

final DiscoveryNodes publishNodes = publishRequest.getAcceptedState().nodes();

leaderChecker.setCurrentNodes(publishNodes);
followersChecker.setCurrentNodes(publishNodes);
lagDetector.setTrackedNodes(publishNodes);
coordinationState.get().handlePrePublish(clusterState);
// trying to mark pending connects/disconnects before publish
// if we try to connect during pending disconnect or vice versa - fail
// trying to mark pending disconnects before publish
// if we try to joinRequest during pending disconnect, it should fail
transportService.markPendingConnections(clusterChangedEvent.nodesDelta());
publication.start(followersChecker.getFaultyNodes());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -176,7 +176,6 @@ private void setFollowerCheckTimeout(TimeValue followerCheckTimeout) {
* Update the set of known nodes, starting to check any new ones and stopping checking any previously-known-but-now-unknown ones.
*/
public void setCurrentNodes(DiscoveryNodes discoveryNodes) {
logger.info("[{}]Setting followerschecker currentnodes to {}", Thread.currentThread().getName(), discoveryNodes);
synchronized (mutex) {
final Predicate<DiscoveryNode> isUnknownNode = n -> discoveryNodes.nodeExists(n) == false;
followerCheckers.keySet().removeIf(isUnknownNode);
Expand Down Expand Up @@ -358,9 +357,6 @@ private void handleWakeUp() {

final FollowerCheckRequest request = new FollowerCheckRequest(fastResponseState.term, transportService.getLocalNode());
logger.trace("handleWakeUp: checking {} with {}", discoveryNode, request);
if (discoveryNode.getName().equals("node_t2")) {
logger.info("handleWakeUp: checking {} with {}", discoveryNode, request);
}

transportService.sendRequest(
discoveryNode,
Expand Down Expand Up @@ -396,19 +392,6 @@ public void handleException(TransportException exp) {

final String reason;

// if (exp instanceof NodeNotConnectedException || exp.getCause() instanceof NodeNotConnectedException){
// // NodeNotConnectedException will only happen if getConnection fails in TransportService.sendRequest
// // This only happens if clusterConnectionManager.getConnection() does not find the entry in connectedNodes list
// // This happens on node disconnection
// // Need to validate that this only gets triggered from node-left side. we want to ensure actual disconnections
// work
// failureCountSinceLastSuccess--;
// logger.info(() -> new ParameterizedMessage("{} cache entry not found, but node is still in cluster state.
// ignoring this failure", FollowerChecker.this), exp);
// scheduleNextWakeUp();
// return;
// }

if (exp instanceof ConnectTransportException || exp.getCause() instanceof ConnectTransportException) {
logger.info(() -> new ParameterizedMessage("{} disconnected", FollowerChecker.this), exp);
reason = "disconnected";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -379,7 +379,7 @@ public void sendJoinRequest(DiscoveryNode destination, long term, Optional<Join>
final JoinRequest joinRequest = new JoinRequest(transportService.getLocalNode(), term, optionalJoin);
final Tuple<DiscoveryNode, JoinRequest> dedupKey = Tuple.tuple(destination, joinRequest);
if (pendingOutgoingJoins.add(dedupKey)) {
logger.info("attempting to join {} with {}", destination, joinRequest);
logger.debug("attempting to join {} with {}", destination, joinRequest);
transportService.sendRequest(
destination,
JOIN_ACTION_NAME,
Expand All @@ -394,7 +394,7 @@ public Empty read(StreamInput in) {
@Override
public void handleResponse(Empty response) {
pendingOutgoingJoins.remove(dedupKey);
logger.info("successfully joined {} with {}", destination, joinRequest);
logger.debug("successfully joined {} with {}", destination, joinRequest);
lastFailedJoinAttempt.set(null);
nodeCommissioned.accept(true);
onCompletion.run();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -218,20 +218,20 @@ private void handleLeaderCheck(LeaderCheckRequest request) {
+ "since node is unhealthy ["
+ statusInfo.getInfo()
+ "]";
logger.info(message);
logger.debug(message);
throw new NodeHealthCheckFailureException(message);
} else if (discoveryNodes.isLocalNodeElectedClusterManager() == false) {
logger.info("rejecting leader check on non-cluster-manager {}", request);
logger.debug("rejecting leader check on non-cluster-manager {}", request);
throw new CoordinationStateRejectedException(
"rejecting leader check from [" + request.getSender() + "] sent to a node that is no longer the cluster-manager"
);
} else if (discoveryNodes.nodeExists(request.getSender()) == false) {
logger.info("rejecting leader check from removed node: {}", request);
logger.debug("rejecting leader check from removed node: {}", request);
throw new CoordinationStateRejectedException(
"rejecting leader check since [" + request.getSender() + "] has been removed from the cluster"
);
} else {
logger.info("handling {}", request);
logger.debug("handling {}", request);
}
}

Expand Down Expand Up @@ -306,17 +306,17 @@ public void handleException(TransportException exp) {
return;
}
if (exp instanceof ConnectTransportException || exp.getCause() instanceof ConnectTransportException) {
logger.info(new ParameterizedMessage("leader [{}] disconnected during check", leader), exp);
logger.debug(new ParameterizedMessage("leader [{}] disconnected during check", leader), exp);
leaderFailed(new ConnectTransportException(leader, "disconnected during check", exp));
return;
} else if (exp.getCause() instanceof NodeHealthCheckFailureException) {
logger.info(new ParameterizedMessage("leader [{}] health check failed", leader), exp);
logger.debug(new ParameterizedMessage("leader [{}] health check failed", leader), exp);
leaderFailed(new NodeHealthCheckFailureException("node [" + leader + "] failed health checks", exp));
return;
}
long failureCount = failureCountSinceLastSuccess.incrementAndGet();
if (failureCount >= leaderCheckRetryCount) {
logger.info(
logger.debug(
new ParameterizedMessage(
"leader [{}] has failed {} consecutive checks (limit [{}] is {}); last failure was:",
leader,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -86,10 +86,9 @@ public Publication(PublishRequest publishRequest, AckListener ackListener, LongS

public void start(Set<DiscoveryNode> faultyNodes) {
logger.trace("publishing {} to {}", publishRequest, publicationTargets);
logger.info("publishing {} to {}", publishRequest.getAcceptedState().getVersion(), publicationTargets);
logger.info("publishing version {} to {}", publishRequest.getAcceptedState().getVersion(), publicationTargets);

for (final DiscoveryNode faultyNode : faultyNodes) {
logger.info("in publish.start, found faulty node: [{}]", faultyNode);
onFaultyNode(faultyNode);
}
onPossibleCommitFailure();
Expand Down Expand Up @@ -334,7 +333,7 @@ void setFailed(Exception e) {

void onFaultyNode(DiscoveryNode faultyNode) {
if (isActive() && discoveryNode.equals(faultyNode)) {
logger.info("onFaultyNode: [{}] is faulty, failing target in publication {}", faultyNode, Publication.this);
logger.debug("onFaultyNode: [{}] is faulty, failing target in publication {}", faultyNode, Publication.this);
setFailed(new OpenSearchException("faulty node"));
onPossibleCommitFailure();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -473,10 +473,10 @@ public void onFailure(Exception e) {
logger.trace("sending remote cluster state version [{}] to [{}]", newState.version(), destination);
sendRemoteClusterState(destination, publishRequest.getAcceptedState(), responseActionListener);
} else if (sendFullVersion || previousState.nodes().nodeExists(destination) == false) {
logger.info("sending full cluster state version [{}] to [{}]", newState.version(), destination);
logger.trace("sending full cluster state version [{}] to [{}]", newState.version(), destination);
sendFullClusterState(destination, responseActionListener);
} else {
logger.info("sending cluster state diff for version [{}] to [{}]", newState.version(), destination);
logger.trace("sending cluster state diff for version [{}] to [{}]", newState.version(), destination);
sendClusterStateDiff(destination, responseActionListener);
}
}
Expand Down Expand Up @@ -613,7 +613,7 @@ private void sendClusterState(
logger.debug("resending full cluster state to node {} reason {}", destination, exp.getDetailedMessage());
sendFullClusterState(destination, listener);
} else {
logger.info(() -> new ParameterizedMessage("failed to send cluster state to {}", destination), exp);
logger.debug(() -> new ParameterizedMessage("failed to send cluster state to {}", destination), exp);
listener.onFailure(exp);
}
};
Expand Down
Loading

0 comments on commit f0cf40c

Please sign in to comment.