Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Zen2: Fail fast on disconnects #34503

Merged
merged 11 commits into from
Oct 22, 2018
Original file line number Diff line number Diff line change
Expand Up @@ -101,8 +101,6 @@ public class Coordinator extends AbstractLifecycleComponent implements Discovery
private Releasable electionScheduler;
@Nullable
private Releasable prevotingRound;
@Nullable
private Releasable leaderCheckScheduler;
private long maxTermSeen;

private Mode mode;
Expand Down Expand Up @@ -132,7 +130,7 @@ public Coordinator(Settings settings, TransportService transportService, Allocat
this.publicationHandler = new PublicationTransportHandler(settings, transportService, this::handlePublishRequest,
this::handleApplyCommit);
this.leaderChecker = new LeaderChecker(settings, transportService, getOnLeaderFailure());
this.followersChecker = new FollowersChecker(settings, transportService, this::onFollowerCheckRequest, this::onFollowerFailure);
this.followersChecker = new FollowersChecker(settings, transportService, this::onFollowerCheckRequest, this::removeNode);
this.nodeRemovalExecutor = new NodeRemovalClusterStateTaskExecutor(allocationService, logger);
this.clusterApplier = clusterApplier;
masterService.setClusterStateSupplier(this::getStateForMasterService);
Expand All @@ -154,11 +152,11 @@ public String toString() {
};
}

private void onFollowerFailure(DiscoveryNode discoveryNode) {
private void removeNode(DiscoveryNode discoveryNode, String reason) {
synchronized (mutex) {
if (mode == Mode.LEADER) {
masterService.submitStateUpdateTask("node-left",
new NodeRemovalClusterStateTaskExecutor.Task(discoveryNode, "node left"),
new NodeRemovalClusterStateTaskExecutor.Task(discoveryNode, reason),
ClusterStateTaskConfig.build(Priority.IMMEDIATE),
nodeRemovalExecutor,
nodeRemovalExecutor);
Expand Down Expand Up @@ -344,11 +342,7 @@ void becomeCandidate(String method) {

peerFinder.activate(coordinationState.get().getLastAcceptedState().nodes());
leaderChecker.setCurrentNodes(DiscoveryNodes.EMPTY_NODES);

if (leaderCheckScheduler != null) {
leaderCheckScheduler.close();
leaderCheckScheduler = null;
}
leaderChecker.updateLeader(null);

followersChecker.clearCurrentNodes();
followersChecker.updateFastResponseState(getCurrentTerm(), mode);
Expand Down Expand Up @@ -377,7 +371,7 @@ void becomeLeader(String method) {
closePrevotingAndElectionScheduler();
preVoteCollector.update(getPreVoteResponse(), getLocalNode());

assert leaderCheckScheduler == null : leaderCheckScheduler;
assert leaderChecker.leader() == null : leaderChecker.leader();
followersChecker.updateFastResponseState(getCurrentTerm(), mode);
}

Expand All @@ -401,10 +395,7 @@ void becomeFollower(String method, DiscoveryNode leaderNode) {
preVoteCollector.update(getPreVoteResponse(), leaderNode);

if (restartLeaderChecker) {
if (leaderCheckScheduler != null) {
leaderCheckScheduler.close();
}
leaderCheckScheduler = leaderChecker.startLeaderChecker(leaderNode);
leaderChecker.updateLeader(leaderNode);
}

followersChecker.clearCurrentNodes();
Expand Down Expand Up @@ -501,7 +492,7 @@ public void invariant() {
assert electionScheduler == null : electionScheduler;
assert prevotingRound == null : prevotingRound;
assert becomingMaster || getStateForMasterService().nodes().getMasterNodeId() != null : getStateForMasterService();
assert leaderCheckScheduler == null : leaderCheckScheduler;
assert leaderChecker.leader() == null : leaderChecker.leader();
assert applierState.nodes().getMasterNodeId() == null || getLocalNode().equals(applierState.nodes().getMasterNode());
assert preVoteCollector.getLeader() == getLocalNode() : preVoteCollector;

Expand Down Expand Up @@ -533,7 +524,7 @@ public void invariant() {
assert prevotingRound == null : prevotingRound;
assert getStateForMasterService().nodes().getMasterNodeId() == null : getStateForMasterService();
assert leaderChecker.currentNodeIsMaster() == false;
assert leaderCheckScheduler != null;
assert lastKnownLeader.equals(Optional.of(leaderChecker.leader()));
assert followersChecker.getKnownFollowers().isEmpty();
assert currentPublication.map(Publication::isCommitted).orElse(true);
assert preVoteCollector.getLeader().equals(lastKnownLeader.get()) : preVoteCollector;
Expand All @@ -544,7 +535,7 @@ public void invariant() {
assert prevotingRound == null || electionScheduler != null;
assert getStateForMasterService().nodes().getMasterNodeId() == null : getStateForMasterService();
assert leaderChecker.currentNodeIsMaster() == false;
assert leaderCheckScheduler == null : leaderCheckScheduler;
assert leaderChecker.leader() == null : leaderChecker.leader();
assert followersChecker.getKnownFollowers().isEmpty();
assert applierState.nodes().getMasterNodeId() == null;
assert currentPublication.map(Publication::isCommitted).orElse(true);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
import org.elasticsearch.threadpool.ThreadPool.Names;
import org.elasticsearch.transport.ConnectTransportException;
import org.elasticsearch.transport.TransportChannel;
import org.elasticsearch.transport.TransportConnectionListener;
import org.elasticsearch.transport.TransportException;
import org.elasticsearch.transport.TransportRequest;
import org.elasticsearch.transport.TransportRequestOptions;
Expand All @@ -46,6 +47,7 @@
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.function.BiConsumer;
import java.util.function.Consumer;
import java.util.function.Predicate;

Expand Down Expand Up @@ -78,7 +80,7 @@ public class FollowersChecker extends AbstractComponent {
private final TimeValue followerCheckInterval;
private final TimeValue followerCheckTimeout;
private final int followerCheckRetryCount;
private final Consumer<DiscoveryNode> onNodeFailure;
private final BiConsumer<DiscoveryNode, String> onNodeFailure;
private final Consumer<FollowerCheckRequest> handleRequestAndUpdateState;

private final Object mutex = new Object(); // protects writes to this state; read access does not need sync
Expand All @@ -91,7 +93,7 @@ public class FollowersChecker extends AbstractComponent {

public FollowersChecker(Settings settings, TransportService transportService,
Consumer<FollowerCheckRequest> handleRequestAndUpdateState,
Consumer<DiscoveryNode> onNodeFailure) {
BiConsumer<DiscoveryNode, String> onNodeFailure) {
super(settings);
this.transportService = transportService;
this.handleRequestAndUpdateState = handleRequestAndUpdateState;
Expand All @@ -104,6 +106,12 @@ public FollowersChecker(Settings settings, TransportService transportService,
updateFastResponseState(0, Mode.CANDIDATE);
transportService.registerRequestHandler(FOLLOWER_CHECK_ACTION_NAME, Names.SAME, FollowerCheckRequest::new,
(request, transportChannel, task) -> handleFollowerCheck(request, transportChannel));
transportService.addConnectionListener(new TransportConnectionListener() {
@Override
public void onNodeDisconnected(DiscoveryNode node) {
handleDisconnectedNode(node);
}
});
}

/**
Expand Down Expand Up @@ -228,6 +236,15 @@ Set<DiscoveryNode> getKnownFollowers() {
}
}

private void handleDisconnectedNode(DiscoveryNode discoveryNode) {
synchronized (mutex) {
FollowerChecker followerChecker = followerCheckers.get(discoveryNode);
if (followerChecker != null && followerChecker.running()) {
followerChecker.failNode();
}
}
}

static class FastResponseState {
final long term;
final Mode mode;
Expand Down Expand Up @@ -314,25 +331,7 @@ public void handleException(TransportException exp) {
return;
}

transportService.getThreadPool().generic().execute(new Runnable() {
@Override
public void run() {
synchronized (mutex) {
if (running() == false) {
logger.debug("{} no longer running, not marking faulty", FollowerChecker.this);
return;
}
faultyNodes.add(discoveryNode);
followerCheckers.remove(discoveryNode);
}
onNodeFailure.accept(discoveryNode);
}

@Override
public String toString() {
return "detected failure of " + discoveryNode;
}
});
failNode();
}


Expand All @@ -343,6 +342,28 @@ public String executor() {
});
}

void failNode() {
transportService.getThreadPool().generic().execute(new Runnable() {
@Override
public void run() {
synchronized (mutex) {
if (running() == false) {
logger.debug("{} condition no longer applies, not marking faulty", discoveryNode);
return;
}
faultyNodes.add(discoveryNode);
followerCheckers.remove(discoveryNode);
}
onNodeFailure.accept(discoveryNode, "followers_checker");
}

@Override
public String toString() {
return "detected failure of " + discoveryNode;
}
});
}

private void scheduleNextWakeUp() {
transportService.getThreadPool().schedule(followerCheckInterval, Names.SAME, new Runnable() {
@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import org.apache.logging.log4j.message.ParameterizedMessage;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.node.DiscoveryNodes;
import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.component.AbstractComponent;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
Expand All @@ -33,6 +34,7 @@
import org.elasticsearch.threadpool.ThreadPool.Names;
import org.elasticsearch.transport.ConnectTransportException;
import org.elasticsearch.transport.TransportChannel;
import org.elasticsearch.transport.TransportConnectionListener;
import org.elasticsearch.transport.TransportException;
import org.elasticsearch.transport.TransportRequest;
import org.elasticsearch.transport.TransportRequestOptions;
Expand All @@ -46,6 +48,7 @@
import java.util.Objects;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;

/**
* The LeaderChecker is responsible for allowing followers to check that the currently elected leader is still connected and healthy. We are
Expand Down Expand Up @@ -77,6 +80,8 @@ public class LeaderChecker extends AbstractComponent {
private final TransportService transportService;
private final Runnable onLeaderFailure;

private AtomicReference<CheckScheduler> currentChecker = new AtomicReference<>();

private volatile DiscoveryNodes discoveryNodes;

public LeaderChecker(final Settings settings, final TransportService transportService, final Runnable onLeaderFailure) {
Expand All @@ -88,19 +93,39 @@ public LeaderChecker(final Settings settings, final TransportService transportSe
this.onLeaderFailure = onLeaderFailure;

transportService.registerRequestHandler(LEADER_CHECK_ACTION_NAME, Names.SAME, LeaderCheckRequest::new, this::handleLeaderCheck);
transportService.addConnectionListener(new TransportConnectionListener() {
@Override
public void onNodeDisconnected(DiscoveryNode node) {
handleDisconnectedNode(node);
}
});
}

public DiscoveryNode leader() {
CheckScheduler checkScheduler = currentChecker.get();
return checkScheduler == null ? null : checkScheduler.leader;
}

/**
* Start a leader checker for the given leader. Should only be called after successfully joining this leader.
* Starts and / or stops a leader checker for the given leader. Should only be called after successfully joining this leader.
*
* @param leader the node to be checked as leader
* @return a `Releasable` that can be used to stop this checker.
* @param leader the node to be checked as leader, or null if checks should be disabled
*/
public Releasable startLeaderChecker(final DiscoveryNode leader) {
assert transportService.getLocalNode().equals(leader) == false;
CheckScheduler checkScheduler = new CheckScheduler(leader);
checkScheduler.handleWakeUp();
return checkScheduler;
public void updateLeader(@Nullable final DiscoveryNode leader) {
assert leader == null || transportService.getLocalNode().equals(leader) == false;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

leader == null is redundant?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

right

final CheckScheduler checkScheduler;
if (leader != null) {
checkScheduler = new CheckScheduler(leader);
} else {
checkScheduler = null;
}
CheckScheduler previousChecker = currentChecker.getAndSet(checkScheduler);
if (previousChecker != null) {
previousChecker.close();
}
if (checkScheduler != null) {
checkScheduler.handleWakeUp();
}
}

/**
Expand Down Expand Up @@ -137,6 +162,15 @@ private void handleLeaderCheck(LeaderCheckRequest request, TransportChannel tran
}
}

private void handleDisconnectedNode(DiscoveryNode discoveryNode) {
CheckScheduler checkScheduler = currentChecker.get();
if (checkScheduler != null) {
checkScheduler.handleDisconnectedNode(discoveryNode);
} else {
logger.trace("disconnect event ignored for {}, no check scheduler", discoveryNode);
}
}

private class CheckScheduler implements Releasable {

private final AtomicBoolean isClosed = new AtomicBoolean();
Expand Down Expand Up @@ -222,14 +256,20 @@ public String executor() {
});
}

private void leaderFailed() {
void leaderFailed() {
if (isClosed.compareAndSet(false, true)) {
transportService.getThreadPool().generic().execute(onLeaderFailure);
} else {
logger.debug("already closed, not failing leader");
}
}

void handleDisconnectedNode(DiscoveryNode discoveryNode) {
if (discoveryNode.equals(leader)) {
leaderFailed();
}
}

private void scheduleNextWakeUp() {
logger.trace("scheduling next check of {} for [{}] = {}", leader, LEADER_CHECK_INTERVAL_SETTING.getKey(), leaderCheckInterval);
transportService.getThreadPool().schedule(leaderCheckInterval, Names.SAME, new Runnable() {
Expand Down
Loading