Skip to content

Commit

Permalink
[Zen2] Add warning if cluster fails to form fast enough (#35993)
Browse files Browse the repository at this point in the history
* Add warning if cluster fails to form fast enough

Today if a leader is not discovered or elected then nodes are essentially
silent at INFO and above, and log copiously at DEBUG and below. A short delay
when electing a leader is not unusual, for instance if other nodes have not yet
started, but a persistent failure to elect a leader is a problem worthy of log
messages in the default configuration.

With this change, while there is no leader each node outputs a WARN-level log
message every 10 seconds (by default) indicating as such, describing the
current discovery state and the current quorum(s).

* Add note about whether the discovered nodes form a quorum or not

* Introduce separate ClusterFormationFailureHelper

... and back out the unnecessary changes elsewhere

* It can be volatile
  • Loading branch information
DaveCTurner authored Dec 7, 2018
1 parent f2df0a5 commit 9d41798
Show file tree
Hide file tree
Showing 5 changed files with 532 additions and 5 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,209 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.cluster.coordination;

import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.elasticsearch.Version;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.coordination.CoordinationMetaData.VotingConfiguration;
import org.elasticsearch.cluster.coordination.CoordinationState.VoteCollection;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.settings.Setting;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.transport.TransportAddress;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.util.concurrent.AbstractRunnable;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.threadpool.ThreadPool.Names;

import java.util.List;
import java.util.Locale;
import java.util.Set;
import java.util.function.Supplier;
import java.util.stream.Collectors;
import java.util.stream.StreamSupport;

import static org.elasticsearch.cluster.coordination.ClusterBootstrapService.INITIAL_MASTER_NODES_SETTING;
import static org.elasticsearch.cluster.coordination.ClusterBootstrapService.INITIAL_MASTER_NODE_COUNT_SETTING;

public class ClusterFormationFailureHelper {
private static final Logger logger = LogManager.getLogger(ClusterFormationFailureHelper.class);

public static final Setting<TimeValue> DISCOVERY_CLUSTER_FORMATION_WARNING_TIMEOUT_SETTING =
Setting.timeSetting("discovery.cluster_formation_warning_timeout",
TimeValue.timeValueMillis(10000), TimeValue.timeValueMillis(1), Setting.Property.NodeScope);

private final Supplier<ClusterFormationState> clusterFormationStateSupplier;
private final ThreadPool threadPool;
private final TimeValue clusterFormationWarningTimeout;
@Nullable // if no warning is scheduled
private volatile WarningScheduler warningScheduler;

public ClusterFormationFailureHelper(Settings settings, Supplier<ClusterFormationState> clusterFormationStateSupplier,
ThreadPool threadPool) {
this.clusterFormationStateSupplier = clusterFormationStateSupplier;
this.threadPool = threadPool;
this.clusterFormationWarningTimeout = DISCOVERY_CLUSTER_FORMATION_WARNING_TIMEOUT_SETTING.get(settings);
}

public boolean isRunning() {
return warningScheduler != null;
}

public void start() {
assert warningScheduler == null;
warningScheduler = new WarningScheduler();
warningScheduler.scheduleNextWarning();
}

public void stop() {
warningScheduler = null;
}

private class WarningScheduler {

private boolean isActive() {
return warningScheduler == this;
}

void scheduleNextWarning() {
threadPool.scheduleUnlessShuttingDown(clusterFormationWarningTimeout, Names.GENERIC, new AbstractRunnable() {
@Override
public void onFailure(Exception e) {
logger.debug("unexpected exception scheduling cluster formation warning", e);
}

@Override
protected void doRun() {
if (isActive()) {
logger.warn(clusterFormationStateSupplier.get().getDescription());
}
}

@Override
public void onAfter() {
if (isActive()) {
scheduleNextWarning();
}
}

@Override
public String toString() {
return "emit warning if cluster not formed";
}
});
}
}

static class ClusterFormationState {
private final Settings settings;
private final ClusterState clusterState;
private final List<TransportAddress> resolvedAddresses;
private final List<DiscoveryNode> foundPeers;

ClusterFormationState(Settings settings, ClusterState clusterState, List<TransportAddress> resolvedAddresses,
List<DiscoveryNode> foundPeers) {
this.settings = settings;
this.clusterState = clusterState;
this.resolvedAddresses = resolvedAddresses;
this.foundPeers = foundPeers;
}

String getDescription() {
final List<String> clusterStateNodes
= StreamSupport.stream(clusterState.nodes().spliterator(), false).map(DiscoveryNode::toString).collect(Collectors.toList());

final String discoveryWillContinueDescription = String.format(Locale.ROOT,
"discovery will continue using %s from hosts providers and %s from last-known cluster state",
resolvedAddresses, clusterStateNodes);

final String discoveryStateIgnoringQuorum = String.format(Locale.ROOT, "have discovered %s; %s",
foundPeers, discoveryWillContinueDescription);

if (clusterState.nodes().getLocalNode().isMasterNode() == false) {
return String.format(Locale.ROOT, "master not discovered yet: %s", discoveryStateIgnoringQuorum);
}

if (clusterState.getLastAcceptedConfiguration().isEmpty()) {

// TODO handle the case that there is a 6.x node around here, when rolling upgrades are supported

final String bootstrappingDescription;

if (INITIAL_MASTER_NODE_COUNT_SETTING.get(Settings.EMPTY).equals(INITIAL_MASTER_NODE_COUNT_SETTING.get(settings))
&& INITIAL_MASTER_NODES_SETTING.get(Settings.EMPTY).equals(INITIAL_MASTER_NODES_SETTING.get(settings))) {
bootstrappingDescription = "cluster bootstrapping is disabled on this node";
} else if (INITIAL_MASTER_NODES_SETTING.get(Settings.EMPTY).equals(INITIAL_MASTER_NODES_SETTING.get(settings))) {
bootstrappingDescription = String.format(Locale.ROOT,
"this node must discover at least [%d] master-eligible nodes to bootstrap a cluster",
INITIAL_MASTER_NODE_COUNT_SETTING.get(settings));
} else if (INITIAL_MASTER_NODE_COUNT_SETTING.get(settings) <= INITIAL_MASTER_NODES_SETTING.get(settings).size()) {
// TODO update this when we can bootstrap on only a quorum of the initial nodes
bootstrappingDescription = String.format(Locale.ROOT,
"this node must discover master-eligible nodes %s to bootstrap a cluster",
INITIAL_MASTER_NODES_SETTING.get(settings));
} else {
// TODO update this when we can bootstrap on only a quorum of the initial nodes
bootstrappingDescription = String.format(Locale.ROOT,
"this node must discover at least [%d] master-eligible nodes, including %s, to bootstrap a cluster",
INITIAL_MASTER_NODE_COUNT_SETTING.get(settings), INITIAL_MASTER_NODES_SETTING.get(settings));
}

return String.format(Locale.ROOT,
"master not discovered yet, this node has not previously joined a bootstrapped (v%d+) cluster, and %s: %s",
Version.V_6_6_0.major + 1, bootstrappingDescription, discoveryStateIgnoringQuorum);
}

assert clusterState.getLastCommittedConfiguration().isEmpty() == false;

final String quorumDescription;
if (clusterState.getLastAcceptedConfiguration().equals(clusterState.getLastCommittedConfiguration())) {
quorumDescription = describeQuorum(clusterState.getLastAcceptedConfiguration());
} else {
quorumDescription = describeQuorum(clusterState.getLastAcceptedConfiguration())
+ " and "
+ describeQuorum(clusterState.getLastCommittedConfiguration());
}

final VoteCollection voteCollection = new VoteCollection();
foundPeers.forEach(voteCollection::addVote);
final String isQuorumOrNot
= CoordinationState.isElectionQuorum(voteCollection, clusterState) ? "is a quorum" : "is not a quorum";

return String.format(Locale.ROOT,
"master not discovered or elected yet, an election requires %s, have discovered %s which %s; %s",
quorumDescription, foundPeers, isQuorumOrNot, discoveryWillContinueDescription);
}

private String describeQuorum(VotingConfiguration votingConfiguration) {
final Set<String> nodeIds = votingConfiguration.getNodeIds();
assert nodeIds.isEmpty() == false;

if (nodeIds.size() == 1) {
return "a node with id " + nodeIds;
} else if (nodeIds.size() == 2) {
return "two nodes with ids " + nodeIds;
} else {
return "at least " + (nodeIds.size() / 2 + 1) + " nodes with ids from " + nodeIds;
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -32,8 +32,9 @@
import org.elasticsearch.cluster.ClusterStateTaskConfig;
import org.elasticsearch.cluster.ClusterStateUpdateTask;
import org.elasticsearch.cluster.block.ClusterBlocks;
import org.elasticsearch.cluster.coordination.CoordinationMetaData.VotingConfiguration;
import org.elasticsearch.cluster.coordination.ClusterFormationFailureHelper.ClusterFormationState;
import org.elasticsearch.cluster.coordination.CoordinationMetaData.VotingConfigExclusion;
import org.elasticsearch.cluster.coordination.CoordinationMetaData.VotingConfiguration;
import org.elasticsearch.cluster.coordination.FollowersChecker.FollowerCheckRequest;
import org.elasticsearch.cluster.coordination.JoinHelper.InitialJoinAccumulator;
import org.elasticsearch.cluster.metadata.MetaData;
Expand Down Expand Up @@ -121,6 +122,7 @@ public class Coordinator extends AbstractLifecycleComponent implements Discovery
private final Reconfigurator reconfigurator;
private final ClusterBootstrapService clusterBootstrapService;
private final LagDetector lagDetector;
private final ClusterFormationFailureHelper clusterFormationFailureHelper;

private Mode mode;
private Optional<DiscoveryNode> lastKnownLeader;
Expand Down Expand Up @@ -161,6 +163,13 @@ public Coordinator(String nodeName, Settings settings, ClusterSettings clusterSe
this.clusterBootstrapService = new ClusterBootstrapService(settings, transportService);
this.lagDetector = new LagDetector(settings, transportService.getThreadPool(), n -> removeNode(n, "lagging"),
transportService::getLocalNode);
this.clusterFormationFailureHelper = new ClusterFormationFailureHelper(settings, this::getClusterFormationState,
transportService.getThreadPool());
}

private ClusterFormationState getClusterFormationState() {
return new ClusterFormationState(settings, getStateForMasterService(), peerFinder.getLastResolvedAddresses(),
StreamSupport.stream(peerFinder.getFoundPeers().spliterator(), false).collect(Collectors.toList()));
}

private Runnable getOnLeaderFailure() {
Expand Down Expand Up @@ -374,6 +383,7 @@ void becomeCandidate(String method) {
joinAccumulator = joinHelper.new CandidateJoinAccumulator();

peerFinder.activate(coordinationState.get().getLastAcceptedState().nodes());
clusterFormationFailureHelper.start();
leaderChecker.setCurrentNodes(DiscoveryNodes.EMPTY_NODES);
leaderChecker.updateLeader(null);

Expand Down Expand Up @@ -404,6 +414,7 @@ void becomeLeader(String method) {

lastKnownLeader = Optional.of(getLocalNode());
peerFinder.deactivate(getLocalNode());
clusterFormationFailureHelper.stop();
closePrevotingAndElectionScheduler();
preVoteCollector.update(getPreVoteResponse(), getLocalNode());

Expand All @@ -428,6 +439,7 @@ void becomeFollower(String method, DiscoveryNode leaderNode) {

lastKnownLeader = Optional.of(leaderNode);
peerFinder.deactivate(leaderNode);
clusterFormationFailureHelper.stop();
closePrevotingAndElectionScheduler();
cancelActivePublication();
preVoteCollector.update(getPreVoteResponse(), leaderNode);
Expand Down Expand Up @@ -543,6 +555,7 @@ public void invariant() {
assert leaderChecker.leader() == null : leaderChecker.leader();
assert applierState.nodes().getMasterNodeId() == null || getLocalNode().equals(applierState.nodes().getMasterNode());
assert preVoteCollector.getLeader() == getLocalNode() : preVoteCollector;
assert clusterFormationFailureHelper.isRunning() == false;

final boolean activePublication = currentPublication.map(CoordinatorPublication::isActiveForCurrentLeader).orElse(false);
if (becomingMaster && activePublication == false) {
Expand Down Expand Up @@ -582,6 +595,7 @@ public void invariant() {
assert followersChecker.getKnownFollowers().isEmpty();
assert currentPublication.map(Publication::isCommitted).orElse(true);
assert preVoteCollector.getLeader().equals(lastKnownLeader.get()) : preVoteCollector;
assert clusterFormationFailureHelper.isRunning() == false;
} else {
assert mode == Mode.CANDIDATE;
assert joinAccumulator instanceof JoinHelper.CandidateJoinAccumulator;
Expand All @@ -594,6 +608,7 @@ public void invariant() {
assert applierState.nodes().getMasterNodeId() == null;
assert currentPublication.map(Publication::isCommitted).orElse(true);
assert preVoteCollector.getLeader() == null : preVoteCollector;
assert clusterFormationFailureHelper.isRunning();
}
}
}
Expand Down Expand Up @@ -823,7 +838,7 @@ public void publish(ClusterChangedEvent clusterChangedEvent, ActionListener<Void
Strings.toString(clusterChangedEvent.previousState()).equals(
Strings.toString(clusterStateWithNoMasterBlock(coordinationState.get().getLastAcceptedState())))
: Strings.toString(clusterChangedEvent.previousState()) + " vs "
+ Strings.toString(clusterStateWithNoMasterBlock(coordinationState.get().getLastAcceptedState()));
+ Strings.toString(clusterStateWithNoMasterBlock(coordinationState.get().getLastAcceptedState()));

final ClusterState clusterState = clusterChangedEvent.state();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,11 +33,12 @@
import org.elasticsearch.cluster.NodeConnectionsService;
import org.elasticsearch.cluster.action.index.MappingUpdatedAction;
import org.elasticsearch.cluster.coordination.ClusterBootstrapService;
import org.elasticsearch.cluster.coordination.LagDetector;
import org.elasticsearch.cluster.coordination.ClusterFormationFailureHelper;
import org.elasticsearch.cluster.coordination.Coordinator;
import org.elasticsearch.cluster.coordination.ElectionSchedulerFactory;
import org.elasticsearch.cluster.coordination.FollowersChecker;
import org.elasticsearch.cluster.coordination.JoinHelper;
import org.elasticsearch.cluster.coordination.LagDetector;
import org.elasticsearch.cluster.coordination.LeaderChecker;
import org.elasticsearch.cluster.coordination.Reconfigurator;
import org.elasticsearch.cluster.metadata.IndexGraveyard;
Expand Down Expand Up @@ -457,6 +458,7 @@ public void apply(Settings value, Settings current, Settings previous) {
EnableAssignmentDecider.CLUSTER_TASKS_ALLOCATION_ENABLE_SETTING,
PeerFinder.DISCOVERY_FIND_PEERS_INTERVAL_SETTING,
PeerFinder.DISCOVERY_REQUEST_PEERS_TIMEOUT_SETTING,
ClusterFormationFailureHelper.DISCOVERY_CLUSTER_FORMATION_WARNING_TIMEOUT_SETTING,
ElectionSchedulerFactory.ELECTION_INITIAL_TIMEOUT_SETTING,
ElectionSchedulerFactory.ELECTION_BACK_OFF_TIME_SETTING,
ElectionSchedulerFactory.ELECTION_MAX_TIMEOUT_SETTING,
Expand Down
11 changes: 9 additions & 2 deletions server/src/main/java/org/elasticsearch/discovery/PeerFinder.java
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@
import java.util.function.Consumer;
import java.util.stream.Collectors;

import static java.util.Collections.emptyList;
import static org.elasticsearch.common.util.concurrent.ConcurrentCollections.newConcurrentMap;

public abstract class PeerFinder {
Expand Down Expand Up @@ -94,6 +95,7 @@ public abstract class PeerFinder {
private DiscoveryNodes lastAcceptedNodes;
private final Map<TransportAddress, Peer> peersByAddress = newConcurrentMap();
private Optional<DiscoveryNode> leader = Optional.empty();
private volatile List<TransportAddress> lastResolvedAddresses = emptyList();

public PeerFinder(Settings settings, TransportService transportService, TransportAddressConnector transportAddressConnector,
ConfiguredHostsResolver configuredHostsResolver) {
Expand Down Expand Up @@ -164,7 +166,7 @@ PeersResponse handlePeersRequest(PeersRequest peersRequest) {
knownPeers = getFoundPeersUnderLock();
} else {
assert leader.isPresent() || lastAcceptedNodes == null;
knownPeers = Collections.emptyList();
knownPeers = emptyList();
}
return new PeersResponse(leader, knownPeers, currentTerm);
}
Expand Down Expand Up @@ -207,6 +209,10 @@ private DiscoveryNode getLocalNode() {
*/
protected abstract void onFoundPeersUpdated();

public List<TransportAddress> getLastResolvedAddresses() {
return lastResolvedAddresses;
}

public interface TransportAddressConnector {
/**
* Identify the node at the given address and, if it is a master node and not the local node then establish a full connection to it.
Expand Down Expand Up @@ -266,6 +272,7 @@ private boolean handleWakeUp() {

configuredHostsResolver.resolveConfiguredHosts(providedAddresses -> {
synchronized (mutex) {
lastResolvedAddresses = providedAddresses;
logger.trace("probing resolved transport addresses {}", providedAddresses);
providedAddresses.forEach(this::startProbe);
}
Expand Down Expand Up @@ -495,7 +502,7 @@ private class Zen1UnicastPingRequestHandler implements TransportRequestHandler<U
@Override
public void messageReceived(UnicastZenPing.UnicastPingRequest request, TransportChannel channel, Task task) throws Exception {
final PeersRequest peersRequest = new PeersRequest(request.pingResponse.node(),
Optional.ofNullable(request.pingResponse.master()).map(Collections::singletonList).orElse(Collections.emptyList()));
Optional.ofNullable(request.pingResponse.master()).map(Collections::singletonList).orElse(emptyList()));
final PeersResponse peersResponse = handlePeersRequest(peersRequest);
final List<ZenPing.PingResponse> pingResponses = new ArrayList<>();
final ClusterName clusterName = ClusterName.CLUSTER_NAME_SETTING.get(settings);
Expand Down
Loading

0 comments on commit 9d41798

Please sign in to comment.