Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Zen2] Add warning if cluster fails to form fast enough #35993

Original file line number Diff line number Diff line change
@@ -0,0 +1,209 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.cluster.coordination;

import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.elasticsearch.Version;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.coordination.CoordinationMetaData.VotingConfiguration;
import org.elasticsearch.cluster.coordination.CoordinationState.VoteCollection;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.settings.Setting;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.transport.TransportAddress;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.util.concurrent.AbstractRunnable;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.threadpool.ThreadPool.Names;

import java.util.List;
import java.util.Locale;
import java.util.Set;
import java.util.function.Supplier;
import java.util.stream.Collectors;
import java.util.stream.StreamSupport;

import static org.elasticsearch.cluster.coordination.ClusterBootstrapService.INITIAL_MASTER_NODES_SETTING;
import static org.elasticsearch.cluster.coordination.ClusterBootstrapService.INITIAL_MASTER_NODE_COUNT_SETTING;

public class ClusterFormationFailureHelper {
private static final Logger logger = LogManager.getLogger(ClusterFormationFailureHelper.class);

public static final Setting<TimeValue> DISCOVERY_CLUSTER_FORMATION_WARNING_TIMEOUT_SETTING =
Setting.timeSetting("discovery.cluster_formation_warning_timeout",
TimeValue.timeValueMillis(10000), TimeValue.timeValueMillis(1), Setting.Property.NodeScope);

private final Supplier<ClusterFormationState> clusterFormationStateSupplier;
private final ThreadPool threadPool;
private final TimeValue clusterFormationWarningTimeout;
@Nullable // if no warning is scheduled
private volatile WarningScheduler warningScheduler;

public ClusterFormationFailureHelper(Settings settings, Supplier<ClusterFormationState> clusterFormationStateSupplier,
ThreadPool threadPool) {
this.clusterFormationStateSupplier = clusterFormationStateSupplier;
this.threadPool = threadPool;
this.clusterFormationWarningTimeout = DISCOVERY_CLUSTER_FORMATION_WARNING_TIMEOUT_SETTING.get(settings);
}

public boolean isRunning() {
return warningScheduler != null;
}

public void start() {
assert warningScheduler == null;
warningScheduler = new WarningScheduler();
warningScheduler.scheduleNextWarning();
}

public void stop() {
warningScheduler = null;
}

private class WarningScheduler {

private boolean isActive() {
return warningScheduler == this;
}

void scheduleNextWarning() {
threadPool.scheduleUnlessShuttingDown(clusterFormationWarningTimeout, Names.GENERIC, new AbstractRunnable() {
@Override
public void onFailure(Exception e) {
logger.debug("unexpected exception scheduling cluster formation warning", e);
}

@Override
protected void doRun() {
if (isActive()) {
logger.warn(clusterFormationStateSupplier.get().getDescription());
}
}

@Override
public void onAfter() {
if (isActive()) {
scheduleNextWarning();
}
}

@Override
public String toString() {
return "emit warning if cluster not formed";
}
});
}
}

static class ClusterFormationState {
private final Settings settings;
private final ClusterState clusterState;
private final List<TransportAddress> resolvedAddresses;
private final List<DiscoveryNode> foundPeers;

ClusterFormationState(Settings settings, ClusterState clusterState, List<TransportAddress> resolvedAddresses,
List<DiscoveryNode> foundPeers) {
this.settings = settings;
this.clusterState = clusterState;
this.resolvedAddresses = resolvedAddresses;
this.foundPeers = foundPeers;
}

String getDescription() {
final List<String> clusterStateNodes
= StreamSupport.stream(clusterState.nodes().spliterator(), false).map(DiscoveryNode::toString).collect(Collectors.toList());

final String discoveryWillContinueDescription = String.format(Locale.ROOT,
"discovery will continue using %s from hosts providers and %s from last-known cluster state",
resolvedAddresses, clusterStateNodes);

final String discoveryStateIgnoringQuorum = String.format(Locale.ROOT, "have discovered %s; %s",
foundPeers, discoveryWillContinueDescription);

if (clusterState.nodes().getLocalNode().isMasterNode() == false) {
return String.format(Locale.ROOT, "master not discovered yet: %s", discoveryStateIgnoringQuorum);
}

if (clusterState.getLastAcceptedConfiguration().isEmpty()) {

// TODO handle the case that there is a 6.x node around here, when rolling upgrades are supported

final String bootstrappingDescription;

if (INITIAL_MASTER_NODE_COUNT_SETTING.get(Settings.EMPTY).equals(INITIAL_MASTER_NODE_COUNT_SETTING.get(settings))
&& INITIAL_MASTER_NODES_SETTING.get(Settings.EMPTY).equals(INITIAL_MASTER_NODES_SETTING.get(settings))) {
bootstrappingDescription = "cluster bootstrapping is disabled on this node";
} else if (INITIAL_MASTER_NODES_SETTING.get(Settings.EMPTY).equals(INITIAL_MASTER_NODES_SETTING.get(settings))) {
bootstrappingDescription = String.format(Locale.ROOT,
"this node must discover at least [%d] master-eligible nodes to bootstrap a cluster",
INITIAL_MASTER_NODE_COUNT_SETTING.get(settings));
} else if (INITIAL_MASTER_NODE_COUNT_SETTING.get(settings) <= INITIAL_MASTER_NODES_SETTING.get(settings).size()) {
// TODO update this when we can bootstrap on only a quorum of the initial nodes
bootstrappingDescription = String.format(Locale.ROOT,
"this node must discover master-eligible nodes %s to bootstrap a cluster",
INITIAL_MASTER_NODES_SETTING.get(settings));
} else {
// TODO update this when we can bootstrap on only a quorum of the initial nodes
bootstrappingDescription = String.format(Locale.ROOT,
"this node must discover at least [%d] master-eligible nodes, including %s, to bootstrap a cluster",
INITIAL_MASTER_NODE_COUNT_SETTING.get(settings), INITIAL_MASTER_NODES_SETTING.get(settings));
}

return String.format(Locale.ROOT,
"master not discovered yet, this node has not previously joined a bootstrapped (v%d+) cluster, and %s: %s",
Version.V_6_6_0.major + 1, bootstrappingDescription, discoveryStateIgnoringQuorum);
}

assert clusterState.getLastCommittedConfiguration().isEmpty() == false;

final String quorumDescription;
if (clusterState.getLastAcceptedConfiguration().equals(clusterState.getLastCommittedConfiguration())) {
quorumDescription = describeQuorum(clusterState.getLastAcceptedConfiguration());
} else {
quorumDescription = describeQuorum(clusterState.getLastAcceptedConfiguration())
+ " and "
+ describeQuorum(clusterState.getLastCommittedConfiguration());
}

final VoteCollection voteCollection = new VoteCollection();
foundPeers.forEach(voteCollection::addVote);
final String isQuorumOrNot
= CoordinationState.isElectionQuorum(voteCollection, clusterState) ? "is a quorum" : "is not a quorum";

return String.format(Locale.ROOT,
"master not discovered or elected yet, an election requires %s, have discovered %s which %s; %s",
quorumDescription, foundPeers, isQuorumOrNot, discoveryWillContinueDescription);
}

private String describeQuorum(VotingConfiguration votingConfiguration) {
final Set<String> nodeIds = votingConfiguration.getNodeIds();
assert nodeIds.isEmpty() == false;

if (nodeIds.size() == 1) {
return "a node with id " + nodeIds;
} else if (nodeIds.size() == 2) {
return "two nodes with ids " + nodeIds;
} else {
return "at least " + (nodeIds.size() / 2 + 1) + " nodes with ids from " + nodeIds;
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -32,8 +32,9 @@
import org.elasticsearch.cluster.ClusterStateTaskConfig;
import org.elasticsearch.cluster.ClusterStateUpdateTask;
import org.elasticsearch.cluster.block.ClusterBlocks;
import org.elasticsearch.cluster.coordination.CoordinationMetaData.VotingConfiguration;
import org.elasticsearch.cluster.coordination.ClusterFormationFailureHelper.ClusterFormationState;
import org.elasticsearch.cluster.coordination.CoordinationMetaData.VotingConfigExclusion;
import org.elasticsearch.cluster.coordination.CoordinationMetaData.VotingConfiguration;
import org.elasticsearch.cluster.coordination.FollowersChecker.FollowerCheckRequest;
import org.elasticsearch.cluster.coordination.JoinHelper.InitialJoinAccumulator;
import org.elasticsearch.cluster.metadata.MetaData;
Expand Down Expand Up @@ -121,6 +122,7 @@ public class Coordinator extends AbstractLifecycleComponent implements Discovery
private final Reconfigurator reconfigurator;
private final ClusterBootstrapService clusterBootstrapService;
private final LagDetector lagDetector;
private final ClusterFormationFailureHelper clusterFormationFailureHelper;

private Mode mode;
private Optional<DiscoveryNode> lastKnownLeader;
Expand Down Expand Up @@ -161,6 +163,13 @@ public Coordinator(String nodeName, Settings settings, ClusterSettings clusterSe
this.clusterBootstrapService = new ClusterBootstrapService(settings, transportService);
this.lagDetector = new LagDetector(settings, transportService.getThreadPool(), n -> removeNode(n, "lagging"),
transportService::getLocalNode);
this.clusterFormationFailureHelper = new ClusterFormationFailureHelper(settings, this::getClusterFormationState,
transportService.getThreadPool());
}

private ClusterFormationState getClusterFormationState() {
return new ClusterFormationState(settings, getStateForMasterService(), peerFinder.getLastResolvedAddresses(),
StreamSupport.stream(peerFinder.getFoundPeers().spliterator(), false).collect(Collectors.toList()));
}

private Runnable getOnLeaderFailure() {
Expand Down Expand Up @@ -374,6 +383,7 @@ void becomeCandidate(String method) {
joinAccumulator = joinHelper.new CandidateJoinAccumulator();

peerFinder.activate(coordinationState.get().getLastAcceptedState().nodes());
clusterFormationFailureHelper.start();
leaderChecker.setCurrentNodes(DiscoveryNodes.EMPTY_NODES);
leaderChecker.updateLeader(null);

Expand Down Expand Up @@ -404,6 +414,7 @@ void becomeLeader(String method) {

lastKnownLeader = Optional.of(getLocalNode());
peerFinder.deactivate(getLocalNode());
clusterFormationFailureHelper.stop();
closePrevotingAndElectionScheduler();
preVoteCollector.update(getPreVoteResponse(), getLocalNode());

Expand All @@ -428,6 +439,7 @@ void becomeFollower(String method, DiscoveryNode leaderNode) {

lastKnownLeader = Optional.of(leaderNode);
peerFinder.deactivate(leaderNode);
clusterFormationFailureHelper.stop();
closePrevotingAndElectionScheduler();
cancelActivePublication();
preVoteCollector.update(getPreVoteResponse(), leaderNode);
Expand Down Expand Up @@ -541,6 +553,7 @@ public void invariant() {
assert leaderChecker.leader() == null : leaderChecker.leader();
assert applierState.nodes().getMasterNodeId() == null || getLocalNode().equals(applierState.nodes().getMasterNode());
assert preVoteCollector.getLeader() == getLocalNode() : preVoteCollector;
assert clusterFormationFailureHelper.isRunning() == false;

final boolean activePublication = currentPublication.map(CoordinatorPublication::isActiveForCurrentLeader).orElse(false);
if (becomingMaster && activePublication == false) {
Expand Down Expand Up @@ -580,6 +593,7 @@ public void invariant() {
assert followersChecker.getKnownFollowers().isEmpty();
assert currentPublication.map(Publication::isCommitted).orElse(true);
assert preVoteCollector.getLeader().equals(lastKnownLeader.get()) : preVoteCollector;
assert clusterFormationFailureHelper.isRunning() == false;
} else {
assert mode == Mode.CANDIDATE;
assert joinAccumulator instanceof JoinHelper.CandidateJoinAccumulator;
Expand All @@ -592,6 +606,7 @@ public void invariant() {
assert applierState.nodes().getMasterNodeId() == null;
assert currentPublication.map(Publication::isCommitted).orElse(true);
assert preVoteCollector.getLeader() == null : preVoteCollector;
assert clusterFormationFailureHelper.isRunning();
}
}
}
Expand Down Expand Up @@ -821,7 +836,7 @@ public void publish(ClusterChangedEvent clusterChangedEvent, ActionListener<Void
Strings.toString(clusterChangedEvent.previousState()).equals(
Strings.toString(clusterStateWithNoMasterBlock(coordinationState.get().getLastAcceptedState())))
: Strings.toString(clusterChangedEvent.previousState()) + " vs "
+ Strings.toString(clusterStateWithNoMasterBlock(coordinationState.get().getLastAcceptedState()));
+ Strings.toString(clusterStateWithNoMasterBlock(coordinationState.get().getLastAcceptedState()));

final ClusterState clusterState = clusterChangedEvent.state();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,11 +33,12 @@
import org.elasticsearch.cluster.NodeConnectionsService;
import org.elasticsearch.cluster.action.index.MappingUpdatedAction;
import org.elasticsearch.cluster.coordination.ClusterBootstrapService;
import org.elasticsearch.cluster.coordination.LagDetector;
import org.elasticsearch.cluster.coordination.ClusterFormationFailureHelper;
import org.elasticsearch.cluster.coordination.Coordinator;
import org.elasticsearch.cluster.coordination.ElectionSchedulerFactory;
import org.elasticsearch.cluster.coordination.FollowersChecker;
import org.elasticsearch.cluster.coordination.JoinHelper;
import org.elasticsearch.cluster.coordination.LagDetector;
import org.elasticsearch.cluster.coordination.LeaderChecker;
import org.elasticsearch.cluster.coordination.Reconfigurator;
import org.elasticsearch.cluster.metadata.IndexGraveyard;
Expand Down Expand Up @@ -457,6 +458,7 @@ public void apply(Settings value, Settings current, Settings previous) {
EnableAssignmentDecider.CLUSTER_TASKS_ALLOCATION_ENABLE_SETTING,
PeerFinder.DISCOVERY_FIND_PEERS_INTERVAL_SETTING,
PeerFinder.DISCOVERY_REQUEST_PEERS_TIMEOUT_SETTING,
ClusterFormationFailureHelper.DISCOVERY_CLUSTER_FORMATION_WARNING_TIMEOUT_SETTING,
ElectionSchedulerFactory.ELECTION_INITIAL_TIMEOUT_SETTING,
ElectionSchedulerFactory.ELECTION_BACK_OFF_TIME_SETTING,
ElectionSchedulerFactory.ELECTION_MAX_TIMEOUT_SETTING,
Expand Down
11 changes: 9 additions & 2 deletions server/src/main/java/org/elasticsearch/discovery/PeerFinder.java
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@
import java.util.function.Consumer;
import java.util.stream.Collectors;

import static java.util.Collections.emptyList;
import static org.elasticsearch.common.util.concurrent.ConcurrentCollections.newConcurrentMap;

public abstract class PeerFinder {
Expand Down Expand Up @@ -94,6 +95,7 @@ public abstract class PeerFinder {
private DiscoveryNodes lastAcceptedNodes;
private final Map<TransportAddress, Peer> peersByAddress = newConcurrentMap();
private Optional<DiscoveryNode> leader = Optional.empty();
private volatile List<TransportAddress> lastResolvedAddresses = emptyList();

public PeerFinder(Settings settings, TransportService transportService, TransportAddressConnector transportAddressConnector,
ConfiguredHostsResolver configuredHostsResolver) {
Expand Down Expand Up @@ -164,7 +166,7 @@ PeersResponse handlePeersRequest(PeersRequest peersRequest) {
knownPeers = getFoundPeersUnderLock();
} else {
assert leader.isPresent() || lastAcceptedNodes == null;
knownPeers = Collections.emptyList();
knownPeers = emptyList();
}
return new PeersResponse(leader, knownPeers, currentTerm);
}
Expand Down Expand Up @@ -207,6 +209,10 @@ private DiscoveryNode getLocalNode() {
*/
protected abstract void onFoundPeersUpdated();

public List<TransportAddress> getLastResolvedAddresses() {
return lastResolvedAddresses;
}

public interface TransportAddressConnector {
/**
* Identify the node at the given address and, if it is a master node and not the local node then establish a full connection to it.
Expand Down Expand Up @@ -266,6 +272,7 @@ private boolean handleWakeUp() {

configuredHostsResolver.resolveConfiguredHosts(providedAddresses -> {
synchronized (mutex) {
lastResolvedAddresses = providedAddresses;
logger.trace("probing resolved transport addresses {}", providedAddresses);
providedAddresses.forEach(this::startProbe);
}
Expand Down Expand Up @@ -495,7 +502,7 @@ private class Zen1UnicastPingRequestHandler implements TransportRequestHandler<U
@Override
public void messageReceived(UnicastZenPing.UnicastPingRequest request, TransportChannel channel, Task task) throws Exception {
final PeersRequest peersRequest = new PeersRequest(request.pingResponse.node(),
Optional.ofNullable(request.pingResponse.master()).map(Collections::singletonList).orElse(Collections.emptyList()));
Optional.ofNullable(request.pingResponse.master()).map(Collections::singletonList).orElse(emptyList()));
final PeersResponse peersResponse = handlePeersRequest(peersRequest);
final List<ZenPing.PingResponse> pingResponses = new ArrayList<>();
final ClusterName clusterName = ClusterName.CLUSTER_NAME_SETTING.get(settings);
Expand Down
Loading