From 3968ae432f2214fc541b0e89af1425fca0ba4d67 Mon Sep 17 00:00:00 2001 From: David Turner Date: Wed, 28 Nov 2018 11:42:01 +0000 Subject: [PATCH 1/4] Add warning if cluster fails to form fast enough Today if a leader is not discovered or elected then nodes are essentially silent at INFO and above, and log copiously at DEBUG and below. A short delay when electing a leader is not unusual, for instance if other nodes have not yet started, but a persistent failure to elect a leader is a problem worthy of log messages in the default configuration. With this change, while there is no leader each node outputs a WARN-level log message every 10 seconds (by default) indicating as such, describing the current discovery state and the current quorum(s). --- .../coordination/ClusterBootstrapService.java | 8 ++++ .../coordination/CoordinationMetaData.java | 14 +++++- .../coordination/CoordinationState.java | 8 ++++ .../cluster/coordination/Coordinator.java | 18 ++++++++ .../common/settings/ClusterSettings.java | 1 + .../elasticsearch/discovery/PeerFinder.java | 45 ++++++++++++++++++- .../ClusterBootstrapServiceTests.java | 9 ++++ .../CoordinationMetaDataTests.java | 10 +++++ .../coordination/CoordinationStateTests.java | 13 +++++- .../discovery/PeerFinderTests.java | 40 +++++++++++++++++ 10 files changed, 162 insertions(+), 4 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/cluster/coordination/ClusterBootstrapService.java b/server/src/main/java/org/elasticsearch/cluster/coordination/ClusterBootstrapService.java index 572631a8f61cd..7f795a531b415 100644 --- a/server/src/main/java/org/elasticsearch/cluster/coordination/ClusterBootstrapService.java +++ b/server/src/main/java/org/elasticsearch/cluster/coordination/ClusterBootstrapService.java @@ -158,4 +158,12 @@ public BootstrapClusterResponse read(StreamInput in) throws IOException { } }); } + + public String getBootstrapDescription() { + if (initialMasterNodeCount == 0) { + return "external cluster bootstrapping"; + } else { + return "discovery of at least " + initialMasterNodeCount + " master-eligible nodes for cluster bootstrapping"; + } + } } diff --git a/server/src/main/java/org/elasticsearch/cluster/coordination/CoordinationMetaData.java b/server/src/main/java/org/elasticsearch/cluster/coordination/CoordinationMetaData.java index 7c57ccc908c12..a200f9dcd3573 100644 --- a/server/src/main/java/org/elasticsearch/cluster/coordination/CoordinationMetaData.java +++ b/server/src/main/java/org/elasticsearch/cluster/coordination/CoordinationMetaData.java @@ -190,7 +190,7 @@ public static class Builder { public Builder() { } - + public Builder(CoordinationMetaData state) { this.term = state.term; this.lastCommittedConfiguration = state.lastCommittedConfiguration; @@ -386,5 +386,17 @@ public static VotingConfiguration of(DiscoveryNode... nodes) { // this could be used in many more places - TODO use this where appropriate return new VotingConfiguration(Arrays.stream(nodes).map(DiscoveryNode::getId).collect(Collectors.toSet())); } + + public String getQuorumDescription() { + if (nodeIds.isEmpty()) { + return "cluster bootstrapping"; + } else if (nodeIds.size() == 1) { + return "node with id " + nodeIds; + } else if (nodeIds.size() == 2) { + return "two nodes with ids " + nodeIds; + } else { + return "at least " + (nodeIds.size() / 2 + 1) + " nodes with ids from " + nodeIds; + } + } } } diff --git a/server/src/main/java/org/elasticsearch/cluster/coordination/CoordinationState.java b/server/src/main/java/org/elasticsearch/cluster/coordination/CoordinationState.java index b145e68827236..22e5e904d05a7 100644 --- a/server/src/main/java/org/elasticsearch/cluster/coordination/CoordinationState.java +++ b/server/src/main/java/org/elasticsearch/cluster/coordination/CoordinationState.java @@ -421,6 +421,14 @@ public void invariant() { assert publishVotes.isEmpty() || electionWon(); } + public String getQuorumDescription() { + if (getLastAcceptedConfiguration().equals(getLastCommittedConfiguration())) { + return getLastAcceptedConfiguration().getQuorumDescription(); + } else { + return getLastAcceptedConfiguration().getQuorumDescription() + " and " + getLastCommittedConfiguration().getQuorumDescription(); + } + } + /** * Pluggable persistence layer for {@link CoordinationState}. * diff --git a/server/src/main/java/org/elasticsearch/cluster/coordination/Coordinator.java b/server/src/main/java/org/elasticsearch/cluster/coordination/Coordinator.java index 17601aeedf05c..d3b5232f334aa 100644 --- a/server/src/main/java/org/elasticsearch/cluster/coordination/Coordinator.java +++ b/server/src/main/java/org/elasticsearch/cluster/coordination/Coordinator.java @@ -52,6 +52,7 @@ import org.elasticsearch.common.settings.ClusterSettings; import org.elasticsearch.common.settings.Setting; import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.common.transport.TransportAddress; import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.common.util.concurrent.EsExecutors; import org.elasticsearch.common.util.concurrent.ListenableFuture; @@ -923,6 +924,23 @@ protected void onFoundPeersUpdated() { discoveredNodesListener.accept(foundPeers); } } + + @Override + protected void warnClusterFormationFailed(DiscoveryNodes clusterStateNodes, List resolvedAddresses, + List foundPeers) { + final String quorumDescription; + synchronized (mutex) { + if (isInitialConfigurationSet()) { + quorumDescription = coordinationState.get().getQuorumDescription(); + } else { + quorumDescription = clusterBootstrapService.getBootstrapDescription(); + } + } + + logger.warn("leader not discovered or elected yet: election requires {}, have discovered {}; discovery continues using " + + "{} from hosts providers and {} from last-known cluster state", quorumDescription, foundPeers, resolvedAddresses, + StreamSupport.stream(clusterStateNodes.spliterator(), false).map(DiscoveryNode::toString).collect(Collectors.toList())); + } } private void startElectionScheduler() { diff --git a/server/src/main/java/org/elasticsearch/common/settings/ClusterSettings.java b/server/src/main/java/org/elasticsearch/common/settings/ClusterSettings.java index 69c63682b9de7..c0d9c9200712b 100644 --- a/server/src/main/java/org/elasticsearch/common/settings/ClusterSettings.java +++ b/server/src/main/java/org/elasticsearch/common/settings/ClusterSettings.java @@ -456,6 +456,7 @@ public void apply(Settings value, Settings current, Settings previous) { EnableAssignmentDecider.CLUSTER_TASKS_ALLOCATION_ENABLE_SETTING, PeerFinder.DISCOVERY_FIND_PEERS_INTERVAL_SETTING, PeerFinder.DISCOVERY_REQUEST_PEERS_TIMEOUT_SETTING, + PeerFinder.DISCOVERY_CLUSTER_FORMATION_WARNING_TIMEOUT_SETTING, ElectionSchedulerFactory.ELECTION_INITIAL_TIMEOUT_SETTING, ElectionSchedulerFactory.ELECTION_BACK_OFF_TIME_SETTING, ElectionSchedulerFactory.ELECTION_MAX_TIMEOUT_SETTING, diff --git a/server/src/main/java/org/elasticsearch/discovery/PeerFinder.java b/server/src/main/java/org/elasticsearch/discovery/PeerFinder.java index 92483167bb75f..65a86796679b3 100644 --- a/server/src/main/java/org/elasticsearch/discovery/PeerFinder.java +++ b/server/src/main/java/org/elasticsearch/discovery/PeerFinder.java @@ -62,6 +62,7 @@ import java.util.function.Consumer; import java.util.stream.Collectors; +import static java.util.Collections.emptyList; import static org.elasticsearch.common.util.concurrent.ConcurrentCollections.newConcurrentMap; public abstract class PeerFinder { @@ -79,10 +80,15 @@ public abstract class PeerFinder { Setting.timeSetting("discovery.request_peers_timeout", TimeValue.timeValueMillis(3000), TimeValue.timeValueMillis(1), Setting.Property.NodeScope); + public static final Setting DISCOVERY_CLUSTER_FORMATION_WARNING_TIMEOUT_SETTING = + Setting.timeSetting("discovery.cluster_formation_warning_timeout", + TimeValue.timeValueMillis(10000), TimeValue.timeValueMillis(1), Setting.Property.NodeScope); + private final Settings settings; private final TimeValue findPeersInterval; private final TimeValue requestPeersTimeout; + private final TimeValue clusterFormationWarningTimeout; private final Object mutex = new Object(); private final TransportService transportService; @@ -94,12 +100,15 @@ public abstract class PeerFinder { private DiscoveryNodes lastAcceptedNodes; private final Map peersByAddress = newConcurrentMap(); private Optional leader = Optional.empty(); + private List lastResolvedAddresses = emptyList(); + private long nextWarningTimeMillis; public PeerFinder(Settings settings, TransportService transportService, TransportAddressConnector transportAddressConnector, ConfiguredHostsResolver configuredHostsResolver) { this.settings = settings; findPeersInterval = DISCOVERY_FIND_PEERS_INTERVAL_SETTING.get(settings); requestPeersTimeout = DISCOVERY_REQUEST_PEERS_TIMEOUT_SETTING.get(settings); + clusterFormationWarningTimeout = DISCOVERY_CLUSTER_FORMATION_WARNING_TIMEOUT_SETTING.get(settings); this.transportService = transportService; this.transportAddressConnector = transportAddressConnector; this.configuredHostsResolver = configuredHostsResolver; @@ -120,12 +129,18 @@ public void activate(final DiscoveryNodes lastAcceptedNodes) { active = true; this.lastAcceptedNodes = lastAcceptedNodes; leader = Optional.empty(); + updateNextWarningTime(); handleWakeUp(); // return value discarded: there are no known peers, so none can be disconnected } onFoundPeersUpdated(); // trigger a check for a quorum already } + private void updateNextWarningTime() { + assert holdsLock() : "PeerFinder mutex not held"; + nextWarningTimeMillis = transportService.getThreadPool().relativeTimeInMillis() + clusterFormationWarningTimeout.millis(); + } + public void deactivate(DiscoveryNode leader) { final boolean peersRemoved; synchronized (mutex) { @@ -162,7 +177,7 @@ PeersResponse handlePeersRequest(PeersRequest peersRequest) { knownPeers = getFoundPeersUnderLock(); } else { assert leader.isPresent(); - knownPeers = Collections.emptyList(); + knownPeers = emptyList(); } return new PeersResponse(leader, knownPeers, currentTerm); } @@ -264,6 +279,7 @@ private boolean handleWakeUp() { configuredHostsResolver.resolveConfiguredHosts(providedAddresses -> { synchronized (mutex) { + lastResolvedAddresses = providedAddresses; logger.trace("probing resolved transport addresses {}", providedAddresses); providedAddresses.forEach(this::startProbe); } @@ -291,6 +307,11 @@ protected void doRun() { onFoundPeersUpdated(); } + @Override + public void onAfter() { + maybeWarnClusterFormationFailed(); + } + @Override public String toString() { return "PeerFinder handling wakeup"; @@ -300,6 +321,26 @@ public String toString() { return peersRemoved; } + private void maybeWarnClusterFormationFailed() { + assert holdsLock() == false : "PeerFinder mutex held in error"; + final DiscoveryNodes lastAcceptedNodes; + final List lastResolvedAddresses; + final List foundPeers; + synchronized (mutex) { + if (nextWarningTimeMillis >= transportService.getThreadPool().relativeTimeInMillis()) { + return; + } + updateNextWarningTime(); + lastAcceptedNodes = PeerFinder.this.lastAcceptedNodes; + lastResolvedAddresses = PeerFinder.this.lastResolvedAddresses; + foundPeers = getFoundPeersUnderLock(); + } + warnClusterFormationFailed(lastAcceptedNodes, lastResolvedAddresses, foundPeers); + } + + protected abstract void warnClusterFormationFailed(DiscoveryNodes clusterStateNodes, List resolvedAddresses, + List foundPeers); + private void startProbe(TransportAddress transportAddress) { assert holdsLock() : "PeerFinder mutex not held"; if (active == false) { @@ -493,7 +534,7 @@ private class Zen1UnicastPingRequestHandler implements TransportRequestHandler pingResponses = new ArrayList<>(); final ClusterName clusterName = ClusterName.CLUSTER_NAME_SETTING.get(settings); diff --git a/server/src/test/java/org/elasticsearch/cluster/coordination/ClusterBootstrapServiceTests.java b/server/src/test/java/org/elasticsearch/cluster/coordination/ClusterBootstrapServiceTests.java index 952d7c0e7528a..f893192c77dd8 100644 --- a/server/src/test/java/org/elasticsearch/cluster/coordination/ClusterBootstrapServiceTests.java +++ b/server/src/test/java/org/elasticsearch/cluster/coordination/ClusterBootstrapServiceTests.java @@ -50,6 +50,7 @@ import static java.util.Collections.singleton; import static org.elasticsearch.cluster.coordination.ClusterBootstrapService.INITIAL_MASTER_NODE_COUNT_SETTING; import static org.elasticsearch.node.Node.NODE_NAME_SETTING; +import static org.hamcrest.Matchers.is; public class ClusterBootstrapServiceTests extends ESTestCase { @@ -197,4 +198,12 @@ public String toString() { deterministicTaskQueue.runAllTasks(); // termination means success } + + public void testBootstrapDescription() { + assertThat(clusterBootstrapService.getBootstrapDescription(), + is("discovery of at least 3 master-eligible nodes for cluster bootstrapping")); + + assertThat(new ClusterBootstrapService(Settings.EMPTY, transportService).getBootstrapDescription(), + is("external cluster bootstrapping")); + } } diff --git a/server/src/test/java/org/elasticsearch/cluster/coordination/CoordinationMetaDataTests.java b/server/src/test/java/org/elasticsearch/cluster/coordination/CoordinationMetaDataTests.java index 698ee18efc2a3..239b63c34fa9d 100644 --- a/server/src/test/java/org/elasticsearch/cluster/coordination/CoordinationMetaDataTests.java +++ b/server/src/test/java/org/elasticsearch/cluster/coordination/CoordinationMetaDataTests.java @@ -35,7 +35,10 @@ import java.util.HashSet; import java.util.Set; +import static org.hamcrest.Matchers.anyOf; import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.is; +import static org.hamcrest.Matchers.startsWith; public class CoordinationMetaDataTests extends ESTestCase { @@ -46,6 +49,7 @@ public void testVotingConfiguration() { assertThat(config0.isEmpty(), equalTo(true)); assertThat(config0.hasQuorum(Sets.newHashSet()), equalTo(false)); assertThat(config0.hasQuorum(Sets.newHashSet("id1")), equalTo(false)); + assertThat(config0.getQuorumDescription(), is("cluster bootstrapping")); VotingConfiguration config1 = new VotingConfiguration(Sets.newHashSet("id1")); assertThat(config1.getNodeIds(), equalTo(Sets.newHashSet("id1"))); @@ -54,6 +58,7 @@ public void testVotingConfiguration() { assertThat(config1.hasQuorum(Sets.newHashSet("id1", "id2")), equalTo(true)); assertThat(config1.hasQuorum(Sets.newHashSet("id2")), equalTo(false)); assertThat(config1.hasQuorum(Sets.newHashSet()), equalTo(false)); + assertThat(config1.getQuorumDescription(), is("node with id [id1]")); VotingConfiguration config2 = new VotingConfiguration(Sets.newHashSet("id1", "id2")); assertThat(config2.getNodeIds(), equalTo(Sets.newHashSet("id1", "id2"))); @@ -65,6 +70,7 @@ public void testVotingConfiguration() { assertThat(config2.hasQuorum(Sets.newHashSet("id3")), equalTo(false)); assertThat(config2.hasQuorum(Sets.newHashSet("id1", "id3")), equalTo(false)); assertThat(config2.hasQuorum(Sets.newHashSet()), equalTo(false)); + assertThat(config2.getQuorumDescription(), anyOf(is("two nodes with ids [id1, id2]"), is("two nodes with ids [id2, id1]"))); VotingConfiguration config3 = new VotingConfiguration(Sets.newHashSet("id1", "id2", "id3")); assertThat(config3.getNodeIds(), equalTo(Sets.newHashSet("id1", "id2", "id3"))); @@ -80,6 +86,10 @@ public void testVotingConfiguration() { assertThat(config3.hasQuorum(Sets.newHashSet("id1", "id4")), equalTo(false)); assertThat(config3.hasQuorum(Sets.newHashSet("id1", "id4", "id5")), equalTo(false)); assertThat(config3.hasQuorum(Sets.newHashSet()), equalTo(false)); + assertThat(config3.getQuorumDescription(), startsWith("at least 2 nodes with ids from [")); + + VotingConfiguration config4 = new VotingConfiguration(Sets.newHashSet("id1", "id2", "id3", "id4")); + assertThat(config4.getQuorumDescription(), startsWith("at least 3 nodes with ids from [")); } public void testVotingConfigurationSerializationEqualsHashCode() { diff --git a/server/src/test/java/org/elasticsearch/cluster/coordination/CoordinationStateTests.java b/server/src/test/java/org/elasticsearch/cluster/coordination/CoordinationStateTests.java index ab2ad25e72152..7fa113c071ea6 100644 --- a/server/src/test/java/org/elasticsearch/cluster/coordination/CoordinationStateTests.java +++ b/server/src/test/java/org/elasticsearch/cluster/coordination/CoordinationStateTests.java @@ -50,6 +50,7 @@ import static org.hamcrest.Matchers.containsString; import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.hasSize; +import static org.hamcrest.Matchers.is; public class CoordinationStateTests extends ESTestCase { @@ -766,6 +767,16 @@ public void testVoteCollection() { }); } + public void testQuorumDescription() { + VotingConfiguration node1Config = new VotingConfiguration(Collections.singleton(node1.getId())); + cs1.setInitialState(clusterState(0L, 1L, node1, node1Config, node1Config, 42L)); + assertThat(cs1.getQuorumDescription(), is("node with id [node1]")); + + VotingConfiguration node2Config = new VotingConfiguration(Collections.singleton(node2.getId())); + cs2.setInitialState(clusterState(0L, 1L, node1, node1Config, node2Config, 42L)); + assertThat(cs2.getQuorumDescription(), is("node with id [node2] and node with id [node1]")); + } + public void testSafety() { new Cluster(randomIntBetween(1, 5)).runRandomly(); } @@ -811,7 +822,7 @@ public static ClusterState setValue(ClusterState clusterState, long value) { public static long value(ClusterState clusterState) { return clusterState.metaData().persistentSettings().getAsLong("value", 0L); } - + static class ClusterNode { final DiscoveryNode localNode; diff --git a/server/src/test/java/org/elasticsearch/discovery/PeerFinderTests.java b/server/src/test/java/org/elasticsearch/discovery/PeerFinderTests.java index a7b8490cd9e80..f764ddacd69c5 100644 --- a/server/src/test/java/org/elasticsearch/discovery/PeerFinderTests.java +++ b/server/src/test/java/org/elasticsearch/discovery/PeerFinderTests.java @@ -152,6 +152,7 @@ public String toString() { class TestPeerFinder extends PeerFinder { DiscoveryNode discoveredMasterNode; OptionalLong discoveredMasterTerm = OptionalLong.empty(); + boolean emittedWarning; TestPeerFinder(Settings settings, TransportService transportService, TransportAddressConnector transportAddressConnector) { super(settings, transportService, transportAddressConnector, PeerFinderTests.this::resolveConfiguredHosts); @@ -172,6 +173,12 @@ protected void onFoundPeersUpdated() { foundPeersFromNotification = getFoundPeers(); logger.trace("onFoundPeersUpdated({})", foundPeersFromNotification); } + + @Override + protected void warnClusterFormationFailed(DiscoveryNodes clusterStateNodes, List resolvedAddresses, + List foundPeers) { + emittedWarning = true; + } } private void resolveConfiguredHosts(Consumer> onResult) { @@ -759,6 +766,39 @@ public void testReconnectsToDisconnectedNodes() { assertFoundPeers(rebootedOtherNode); } + public void testEmitsWarningsIfClusterDoesNotFormFastEnough() { + peerFinder.activate(lastAcceptedNodes); + assertFalse(peerFinder.emittedWarning); + + final long warningTimeout = PeerFinder.DISCOVERY_CLUSTER_FORMATION_WARNING_TIMEOUT_SETTING.get(Settings.EMPTY).millis() + + PeerFinder.DISCOVERY_FIND_PEERS_INTERVAL_SETTING.get(Settings.EMPTY).millis(); + + final long expectedWarningTime1 = deterministicTaskQueue.getCurrentTimeMillis() + warningTimeout; + while (deterministicTaskQueue.getCurrentTimeMillis() < expectedWarningTime1) { + deterministicTaskQueue.advanceTime(); + runAllRunnableTasks(); + } + + assertTrue(peerFinder.emittedWarning); + peerFinder.emittedWarning = false; + + final long expectedNoWarningTime = deterministicTaskQueue.getCurrentTimeMillis() + + PeerFinder.DISCOVERY_FIND_PEERS_INTERVAL_SETTING.get(Settings.EMPTY).millis() * 3; + final long expectedWarningTime2 = deterministicTaskQueue.getCurrentTimeMillis() + warningTimeout; + + while (deterministicTaskQueue.getCurrentTimeMillis() < expectedNoWarningTime) { + deterministicTaskQueue.advanceTime(); + runAllRunnableTasks(); + } + assertFalse(peerFinder.emittedWarning); + + while (deterministicTaskQueue.getCurrentTimeMillis() < expectedWarningTime2) { + deterministicTaskQueue.advanceTime(); + runAllRunnableTasks(); + } + assertTrue(peerFinder.emittedWarning); + } + private void respondToRequests(Function responseFactory) { final CapturedRequest[] capturedRequests = capturingTransport.getCapturedRequestsAndClear(); for (final CapturedRequest capturedRequest : capturedRequests) { From ec8323616cb9bd9117f8a460c482a28102656345 Mon Sep 17 00:00:00 2001 From: David Turner Date: Mon, 3 Dec 2018 15:49:17 +0000 Subject: [PATCH 2/4] Add note about whether the discovered nodes form a quorum or not --- .../cluster/coordination/Coordinator.java | 10 ++++++++-- 1 file changed, 8 insertions(+), 2 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/cluster/coordination/Coordinator.java b/server/src/main/java/org/elasticsearch/cluster/coordination/Coordinator.java index 6b7c14b979877..ef33515d0dd6a 100644 --- a/server/src/main/java/org/elasticsearch/cluster/coordination/Coordinator.java +++ b/server/src/main/java/org/elasticsearch/cluster/coordination/Coordinator.java @@ -34,6 +34,7 @@ import org.elasticsearch.cluster.block.ClusterBlocks; import org.elasticsearch.cluster.coordination.CoordinationMetaData.VotingConfiguration; import org.elasticsearch.cluster.coordination.CoordinationMetaData.VotingTombstone; +import org.elasticsearch.cluster.coordination.CoordinationState.VoteCollection; import org.elasticsearch.cluster.coordination.FollowersChecker.FollowerCheckRequest; import org.elasticsearch.cluster.coordination.JoinHelper.InitialJoinAccumulator; import org.elasticsearch.cluster.metadata.MetaData; @@ -942,8 +943,13 @@ protected void warnClusterFormationFailed(DiscoveryNodes clusterStateNodes, List } } - logger.warn("leader not discovered or elected yet: election requires {}, have discovered {}; discovery continues using " + - "{} from hosts providers and {} from last-known cluster state", quorumDescription, foundPeers, resolvedAddresses, + final VoteCollection possibleVotes = new VoteCollection(); + foundPeers.forEach(possibleVotes::addVote); + final String isQuorumOrNot = coordinationState.get().isElectionQuorum(possibleVotes) ? "is a quorum" : "is not a quorum"; + + logger.warn("leader not discovered or elected yet: election requires {}, have discovered {} which {}; discovery " + + "continues using {} from hosts providers and {} from last-known cluster state", quorumDescription, foundPeers, + isQuorumOrNot, resolvedAddresses, StreamSupport.stream(clusterStateNodes.spliterator(), false).map(DiscoveryNode::toString).collect(Collectors.toList())); } } From f6197d7789a676edd3c9700bf486ebb29dd2dd4f Mon Sep 17 00:00:00 2001 From: David Turner Date: Fri, 7 Dec 2018 14:16:05 +0000 Subject: [PATCH 3/4] Introduce separate ClusterFormationFailureHelper ... and back out the unnecessary changes elsewhere --- .../coordination/ClusterBootstrapService.java | 8 - .../ClusterFormationFailureHelper.java | 210 ++++++++++++++ .../coordination/CoordinationMetaData.java | 14 +- .../coordination/CoordinationState.java | 8 - .../cluster/coordination/Coordinator.java | 41 ++- .../common/settings/ClusterSettings.java | 5 +- .../elasticsearch/discovery/PeerFinder.java | 44 +-- .../ClusterBootstrapServiceTests.java | 9 - .../ClusterFormationFailureHelperTests.java | 272 ++++++++++++++++++ .../CoordinationMetaDataTests.java | 10 - .../coordination/CoordinationStateTests.java | 13 +- .../discovery/PeerFinderTests.java | 40 --- 12 files changed, 508 insertions(+), 166 deletions(-) create mode 100644 server/src/main/java/org/elasticsearch/cluster/coordination/ClusterFormationFailureHelper.java create mode 100644 server/src/test/java/org/elasticsearch/cluster/coordination/ClusterFormationFailureHelperTests.java diff --git a/server/src/main/java/org/elasticsearch/cluster/coordination/ClusterBootstrapService.java b/server/src/main/java/org/elasticsearch/cluster/coordination/ClusterBootstrapService.java index b3683d0bd0661..e6e31d6d773ab 100644 --- a/server/src/main/java/org/elasticsearch/cluster/coordination/ClusterBootstrapService.java +++ b/server/src/main/java/org/elasticsearch/cluster/coordination/ClusterBootstrapService.java @@ -169,12 +169,4 @@ public BootstrapClusterResponse read(StreamInput in) throws IOException { } }); } - - public String getBootstrapDescription() { - if (initialMasterNodeCount == 0) { - return "external cluster bootstrapping"; - } else { - return "discovery of at least " + initialMasterNodeCount + " master-eligible nodes for cluster bootstrapping"; - } - } } diff --git a/server/src/main/java/org/elasticsearch/cluster/coordination/ClusterFormationFailureHelper.java b/server/src/main/java/org/elasticsearch/cluster/coordination/ClusterFormationFailureHelper.java new file mode 100644 index 0000000000000..317991dc2d69b --- /dev/null +++ b/server/src/main/java/org/elasticsearch/cluster/coordination/ClusterFormationFailureHelper.java @@ -0,0 +1,210 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.elasticsearch.cluster.coordination; + +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; +import org.elasticsearch.Version; +import org.elasticsearch.cluster.ClusterState; +import org.elasticsearch.cluster.coordination.CoordinationMetaData.VotingConfiguration; +import org.elasticsearch.cluster.coordination.CoordinationState.VoteCollection; +import org.elasticsearch.cluster.node.DiscoveryNode; +import org.elasticsearch.common.settings.Setting; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.common.transport.TransportAddress; +import org.elasticsearch.common.unit.TimeValue; +import org.elasticsearch.common.util.concurrent.AbstractRunnable; +import org.elasticsearch.threadpool.ThreadPool; +import org.elasticsearch.threadpool.ThreadPool.Names; + +import java.util.List; +import java.util.Locale; +import java.util.Set; +import java.util.concurrent.atomic.AtomicReference; +import java.util.function.Supplier; +import java.util.stream.Collectors; +import java.util.stream.StreamSupport; + +import static org.elasticsearch.cluster.coordination.ClusterBootstrapService.INITIAL_MASTER_NODES_SETTING; +import static org.elasticsearch.cluster.coordination.ClusterBootstrapService.INITIAL_MASTER_NODE_COUNT_SETTING; + +public class ClusterFormationFailureHelper { + private static final Logger logger = LogManager.getLogger(ClusterFormationFailureHelper.class); + + public static final Setting DISCOVERY_CLUSTER_FORMATION_WARNING_TIMEOUT_SETTING = + Setting.timeSetting("discovery.cluster_formation_warning_timeout", + TimeValue.timeValueMillis(10000), TimeValue.timeValueMillis(1), Setting.Property.NodeScope); + + private final Supplier clusterFormationStateSupplier; + private final ThreadPool threadPool; + private final TimeValue clusterFormationWarningTimeout; + private final AtomicReference warningScheduler; // null if no warning is scheduled + + public ClusterFormationFailureHelper(Settings settings, Supplier clusterFormationStateSupplier, + ThreadPool threadPool) { + this.clusterFormationStateSupplier = clusterFormationStateSupplier; + this.threadPool = threadPool; + this.clusterFormationWarningTimeout = DISCOVERY_CLUSTER_FORMATION_WARNING_TIMEOUT_SETTING.get(settings); + warningScheduler = new AtomicReference<>(); + } + + public boolean isRunning() { + return warningScheduler.get() != null; + } + + public void start() { + final WarningScheduler newWarningScheduler = new WarningScheduler(); + if (warningScheduler.compareAndSet(null, newWarningScheduler)) { + newWarningScheduler.scheduleNextWarning(); + } + } + + public void stop() { + warningScheduler.set(null); + } + + private class WarningScheduler { + + private boolean isActive() { + return warningScheduler.get() == this; + } + + void scheduleNextWarning() { + threadPool.scheduleUnlessShuttingDown(clusterFormationWarningTimeout, Names.GENERIC, new AbstractRunnable() { + @Override + public void onFailure(Exception e) { + logger.debug("unexpected exception scheduling cluster formation warning", e); + } + + @Override + protected void doRun() { + if (isActive()) { + logger.warn(clusterFormationStateSupplier.get().getDescription()); + } + } + + @Override + public void onAfter() { + if (isActive()) { + scheduleNextWarning(); + } + } + + @Override + public String toString() { + return "emit warning if cluster not formed"; + } + }); + } + } + + static class ClusterFormationState { + private final Settings settings; + private final ClusterState clusterState; + private final List resolvedAddresses; + private final List foundPeers; + + ClusterFormationState(Settings settings, ClusterState clusterState, List resolvedAddresses, + List foundPeers) { + this.settings = settings; + this.clusterState = clusterState; + this.resolvedAddresses = resolvedAddresses; + this.foundPeers = foundPeers; + } + + String getDescription() { + final List clusterStateNodes + = StreamSupport.stream(clusterState.nodes().spliterator(), false).map(DiscoveryNode::toString).collect(Collectors.toList()); + + final String discoveryWillContinueDescription = String.format(Locale.ROOT, + "discovery will continue using %s from hosts providers and %s from last-known cluster state", + resolvedAddresses, clusterStateNodes); + + final String discoveryStateIgnoringQuorum = String.format(Locale.ROOT, "have discovered %s; %s", + foundPeers, discoveryWillContinueDescription); + + if (clusterState.nodes().getLocalNode().isMasterNode() == false) { + return String.format(Locale.ROOT, "master not discovered yet: %s", discoveryStateIgnoringQuorum); + } + + if (clusterState.getLastAcceptedConfiguration().isEmpty()) { + + // TODO handle the case that there is a 6.x node around here, when rolling upgrades are supported + + final String bootstrappingDescription; + + if (INITIAL_MASTER_NODE_COUNT_SETTING.get(Settings.EMPTY).equals(INITIAL_MASTER_NODE_COUNT_SETTING.get(settings)) + && INITIAL_MASTER_NODES_SETTING.get(Settings.EMPTY).equals(INITIAL_MASTER_NODES_SETTING.get(settings))) { + bootstrappingDescription = "cluster bootstrapping is disabled on this node"; + } else if (INITIAL_MASTER_NODES_SETTING.get(Settings.EMPTY).equals(INITIAL_MASTER_NODES_SETTING.get(settings))) { + bootstrappingDescription = String.format(Locale.ROOT, + "this node must discover at least [%d] master-eligible nodes to bootstrap a cluster", + INITIAL_MASTER_NODE_COUNT_SETTING.get(settings)); + } else if (INITIAL_MASTER_NODE_COUNT_SETTING.get(settings) <= INITIAL_MASTER_NODES_SETTING.get(settings).size()) { + // TODO update this when we can bootstrap on only a quorum of the initial nodes + bootstrappingDescription = String.format(Locale.ROOT, + "this node must discover master-eligible nodes %s to bootstrap a cluster", + INITIAL_MASTER_NODES_SETTING.get(settings)); + } else { + // TODO update this when we can bootstrap on only a quorum of the initial nodes + bootstrappingDescription = String.format(Locale.ROOT, + "this node must discover at least [%d] master-eligible nodes, including %s, to bootstrap a cluster", + INITIAL_MASTER_NODE_COUNT_SETTING.get(settings), INITIAL_MASTER_NODES_SETTING.get(settings)); + } + + return String.format(Locale.ROOT, + "master not discovered yet, this node has not previously joined a bootstrapped (v%d+) cluster, and %s: %s", + Version.V_6_6_0.major + 1, bootstrappingDescription, discoveryStateIgnoringQuorum); + } + + assert clusterState.getLastCommittedConfiguration().isEmpty() == false; + + final String quorumDescription; + if (clusterState.getLastAcceptedConfiguration().equals(clusterState.getLastCommittedConfiguration())) { + quorumDescription = describeQuorum(clusterState.getLastAcceptedConfiguration()); + } else { + quorumDescription = describeQuorum(clusterState.getLastAcceptedConfiguration()) + + " and " + + describeQuorum(clusterState.getLastCommittedConfiguration()); + } + + final VoteCollection voteCollection = new VoteCollection(); + foundPeers.forEach(voteCollection::addVote); + final String isQuorumOrNot + = CoordinationState.isElectionQuorum(voteCollection, clusterState) ? "is a quorum" : "is not a quorum"; + + return String.format(Locale.ROOT, + "master not discovered or elected yet, an election requires %s, have discovered %s which %s; %s", + quorumDescription, foundPeers, isQuorumOrNot, discoveryWillContinueDescription); + } + + private String describeQuorum(VotingConfiguration votingConfiguration) { + final Set nodeIds = votingConfiguration.getNodeIds(); + assert nodeIds.isEmpty() == false; + + if (nodeIds.size() == 1) { + return "a node with id " + nodeIds; + } else if (nodeIds.size() == 2) { + return "two nodes with ids " + nodeIds; + } else { + return "at least " + (nodeIds.size() / 2 + 1) + " nodes with ids from " + nodeIds; + } + } + } +} diff --git a/server/src/main/java/org/elasticsearch/cluster/coordination/CoordinationMetaData.java b/server/src/main/java/org/elasticsearch/cluster/coordination/CoordinationMetaData.java index 2c80b15173ac5..2bf5755df3144 100644 --- a/server/src/main/java/org/elasticsearch/cluster/coordination/CoordinationMetaData.java +++ b/server/src/main/java/org/elasticsearch/cluster/coordination/CoordinationMetaData.java @@ -190,7 +190,7 @@ public static class Builder { public Builder() { } - + public Builder(CoordinationMetaData state) { this.term = state.term; this.lastCommittedConfiguration = state.lastCommittedConfiguration; @@ -386,17 +386,5 @@ public static VotingConfiguration of(DiscoveryNode... nodes) { // this could be used in many more places - TODO use this where appropriate return new VotingConfiguration(Arrays.stream(nodes).map(DiscoveryNode::getId).collect(Collectors.toSet())); } - - public String getQuorumDescription() { - if (nodeIds.isEmpty()) { - return "cluster bootstrapping"; - } else if (nodeIds.size() == 1) { - return "node with id " + nodeIds; - } else if (nodeIds.size() == 2) { - return "two nodes with ids " + nodeIds; - } else { - return "at least " + (nodeIds.size() / 2 + 1) + " nodes with ids from " + nodeIds; - } - } } } diff --git a/server/src/main/java/org/elasticsearch/cluster/coordination/CoordinationState.java b/server/src/main/java/org/elasticsearch/cluster/coordination/CoordinationState.java index 22e5e904d05a7..b145e68827236 100644 --- a/server/src/main/java/org/elasticsearch/cluster/coordination/CoordinationState.java +++ b/server/src/main/java/org/elasticsearch/cluster/coordination/CoordinationState.java @@ -421,14 +421,6 @@ public void invariant() { assert publishVotes.isEmpty() || electionWon(); } - public String getQuorumDescription() { - if (getLastAcceptedConfiguration().equals(getLastCommittedConfiguration())) { - return getLastAcceptedConfiguration().getQuorumDescription(); - } else { - return getLastAcceptedConfiguration().getQuorumDescription() + " and " + getLastCommittedConfiguration().getQuorumDescription(); - } - } - /** * Pluggable persistence layer for {@link CoordinationState}. * diff --git a/server/src/main/java/org/elasticsearch/cluster/coordination/Coordinator.java b/server/src/main/java/org/elasticsearch/cluster/coordination/Coordinator.java index a97fbb7992956..5c8f2983ff75a 100644 --- a/server/src/main/java/org/elasticsearch/cluster/coordination/Coordinator.java +++ b/server/src/main/java/org/elasticsearch/cluster/coordination/Coordinator.java @@ -32,9 +32,9 @@ import org.elasticsearch.cluster.ClusterStateTaskConfig; import org.elasticsearch.cluster.ClusterStateUpdateTask; import org.elasticsearch.cluster.block.ClusterBlocks; +import org.elasticsearch.cluster.coordination.ClusterFormationFailureHelper.ClusterFormationState; import org.elasticsearch.cluster.coordination.CoordinationMetaData.VotingConfigExclusion; import org.elasticsearch.cluster.coordination.CoordinationMetaData.VotingConfiguration; -import org.elasticsearch.cluster.coordination.CoordinationState.VoteCollection; import org.elasticsearch.cluster.coordination.FollowersChecker.FollowerCheckRequest; import org.elasticsearch.cluster.coordination.JoinHelper.InitialJoinAccumulator; import org.elasticsearch.cluster.metadata.MetaData; @@ -53,7 +53,6 @@ import org.elasticsearch.common.settings.ClusterSettings; import org.elasticsearch.common.settings.Setting; import org.elasticsearch.common.settings.Settings; -import org.elasticsearch.common.transport.TransportAddress; import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.common.util.concurrent.EsExecutors; import org.elasticsearch.common.util.concurrent.ListenableFuture; @@ -123,6 +122,7 @@ public class Coordinator extends AbstractLifecycleComponent implements Discovery private final Reconfigurator reconfigurator; private final ClusterBootstrapService clusterBootstrapService; private final LagDetector lagDetector; + private final ClusterFormationFailureHelper clusterFormationFailureHelper; private Mode mode; private Optional lastKnownLeader; @@ -163,6 +163,13 @@ public Coordinator(String nodeName, Settings settings, ClusterSettings clusterSe this.clusterBootstrapService = new ClusterBootstrapService(settings, transportService); this.lagDetector = new LagDetector(settings, transportService.getThreadPool(), n -> removeNode(n, "lagging"), transportService::getLocalNode); + this.clusterFormationFailureHelper = new ClusterFormationFailureHelper(settings, this::getClusterFormationState, + transportService.getThreadPool()); + } + + private ClusterFormationState getClusterFormationState() { + return new ClusterFormationState(settings, getStateForMasterService(), peerFinder.getLastResolvedAddresses(), + StreamSupport.stream(peerFinder.getFoundPeers().spliterator(), false).collect(Collectors.toList())); } private Runnable getOnLeaderFailure() { @@ -376,6 +383,7 @@ void becomeCandidate(String method) { joinAccumulator = joinHelper.new CandidateJoinAccumulator(); peerFinder.activate(coordinationState.get().getLastAcceptedState().nodes()); + clusterFormationFailureHelper.start(); leaderChecker.setCurrentNodes(DiscoveryNodes.EMPTY_NODES); leaderChecker.updateLeader(null); @@ -406,6 +414,7 @@ void becomeLeader(String method) { lastKnownLeader = Optional.of(getLocalNode()); peerFinder.deactivate(getLocalNode()); + clusterFormationFailureHelper.stop(); closePrevotingAndElectionScheduler(); preVoteCollector.update(getPreVoteResponse(), getLocalNode()); @@ -430,6 +439,7 @@ void becomeFollower(String method, DiscoveryNode leaderNode) { lastKnownLeader = Optional.of(leaderNode); peerFinder.deactivate(leaderNode); + clusterFormationFailureHelper.stop(); closePrevotingAndElectionScheduler(); cancelActivePublication(); preVoteCollector.update(getPreVoteResponse(), leaderNode); @@ -543,6 +553,7 @@ public void invariant() { assert leaderChecker.leader() == null : leaderChecker.leader(); assert applierState.nodes().getMasterNodeId() == null || getLocalNode().equals(applierState.nodes().getMasterNode()); assert preVoteCollector.getLeader() == getLocalNode() : preVoteCollector; + assert clusterFormationFailureHelper.isRunning() == false; final boolean activePublication = currentPublication.map(CoordinatorPublication::isActiveForCurrentLeader).orElse(false); if (becomingMaster && activePublication == false) { @@ -582,6 +593,7 @@ public void invariant() { assert followersChecker.getKnownFollowers().isEmpty(); assert currentPublication.map(Publication::isCommitted).orElse(true); assert preVoteCollector.getLeader().equals(lastKnownLeader.get()) : preVoteCollector; + assert clusterFormationFailureHelper.isRunning() == false; } else { assert mode == Mode.CANDIDATE; assert joinAccumulator instanceof JoinHelper.CandidateJoinAccumulator; @@ -594,6 +606,7 @@ public void invariant() { assert applierState.nodes().getMasterNodeId() == null; assert currentPublication.map(Publication::isCommitted).orElse(true); assert preVoteCollector.getLeader() == null : preVoteCollector; + assert clusterFormationFailureHelper.isRunning(); } } } @@ -823,7 +836,7 @@ public void publish(ClusterChangedEvent clusterChangedEvent, ActionListener resolvedAddresses, - List foundPeers) { - final String quorumDescription; - synchronized (mutex) { - if (isInitialConfigurationSet()) { - quorumDescription = coordinationState.get().getQuorumDescription(); - } else { - quorumDescription = clusterBootstrapService.getBootstrapDescription(); - } - } - - final VoteCollection possibleVotes = new VoteCollection(); - foundPeers.forEach(possibleVotes::addVote); - final String isQuorumOrNot = coordinationState.get().isElectionQuorum(possibleVotes) ? "is a quorum" : "is not a quorum"; - - logger.warn("leader not discovered or elected yet: election requires {}, have discovered {} which {}; discovery " + - "continues using {} from hosts providers and {} from last-known cluster state", quorumDescription, foundPeers, - isQuorumOrNot, resolvedAddresses, - StreamSupport.stream(clusterStateNodes.spliterator(), false).map(DiscoveryNode::toString).collect(Collectors.toList())); - } } private void startElectionScheduler() { diff --git a/server/src/main/java/org/elasticsearch/common/settings/ClusterSettings.java b/server/src/main/java/org/elasticsearch/common/settings/ClusterSettings.java index cd8f9e3bbcf2c..2e2e2394bb22e 100644 --- a/server/src/main/java/org/elasticsearch/common/settings/ClusterSettings.java +++ b/server/src/main/java/org/elasticsearch/common/settings/ClusterSettings.java @@ -33,11 +33,12 @@ import org.elasticsearch.cluster.NodeConnectionsService; import org.elasticsearch.cluster.action.index.MappingUpdatedAction; import org.elasticsearch.cluster.coordination.ClusterBootstrapService; -import org.elasticsearch.cluster.coordination.LagDetector; +import org.elasticsearch.cluster.coordination.ClusterFormationFailureHelper; import org.elasticsearch.cluster.coordination.Coordinator; import org.elasticsearch.cluster.coordination.ElectionSchedulerFactory; import org.elasticsearch.cluster.coordination.FollowersChecker; import org.elasticsearch.cluster.coordination.JoinHelper; +import org.elasticsearch.cluster.coordination.LagDetector; import org.elasticsearch.cluster.coordination.LeaderChecker; import org.elasticsearch.cluster.coordination.Reconfigurator; import org.elasticsearch.cluster.metadata.IndexGraveyard; @@ -457,7 +458,7 @@ public void apply(Settings value, Settings current, Settings previous) { EnableAssignmentDecider.CLUSTER_TASKS_ALLOCATION_ENABLE_SETTING, PeerFinder.DISCOVERY_FIND_PEERS_INTERVAL_SETTING, PeerFinder.DISCOVERY_REQUEST_PEERS_TIMEOUT_SETTING, - PeerFinder.DISCOVERY_CLUSTER_FORMATION_WARNING_TIMEOUT_SETTING, + ClusterFormationFailureHelper.DISCOVERY_CLUSTER_FORMATION_WARNING_TIMEOUT_SETTING, ElectionSchedulerFactory.ELECTION_INITIAL_TIMEOUT_SETTING, ElectionSchedulerFactory.ELECTION_BACK_OFF_TIME_SETTING, ElectionSchedulerFactory.ELECTION_MAX_TIMEOUT_SETTING, diff --git a/server/src/main/java/org/elasticsearch/discovery/PeerFinder.java b/server/src/main/java/org/elasticsearch/discovery/PeerFinder.java index f9cbfb0a162f7..36090f645f600 100644 --- a/server/src/main/java/org/elasticsearch/discovery/PeerFinder.java +++ b/server/src/main/java/org/elasticsearch/discovery/PeerFinder.java @@ -80,15 +80,10 @@ public abstract class PeerFinder { Setting.timeSetting("discovery.request_peers_timeout", TimeValue.timeValueMillis(3000), TimeValue.timeValueMillis(1), Setting.Property.NodeScope); - public static final Setting DISCOVERY_CLUSTER_FORMATION_WARNING_TIMEOUT_SETTING = - Setting.timeSetting("discovery.cluster_formation_warning_timeout", - TimeValue.timeValueMillis(10000), TimeValue.timeValueMillis(1), Setting.Property.NodeScope); - private final Settings settings; private final TimeValue findPeersInterval; private final TimeValue requestPeersTimeout; - private final TimeValue clusterFormationWarningTimeout; private final Object mutex = new Object(); private final TransportService transportService; @@ -100,15 +95,13 @@ public abstract class PeerFinder { private DiscoveryNodes lastAcceptedNodes; private final Map peersByAddress = newConcurrentMap(); private Optional leader = Optional.empty(); - private List lastResolvedAddresses = emptyList(); - private long nextWarningTimeMillis; + private volatile List lastResolvedAddresses = emptyList(); public PeerFinder(Settings settings, TransportService transportService, TransportAddressConnector transportAddressConnector, ConfiguredHostsResolver configuredHostsResolver) { this.settings = settings; findPeersInterval = DISCOVERY_FIND_PEERS_INTERVAL_SETTING.get(settings); requestPeersTimeout = DISCOVERY_REQUEST_PEERS_TIMEOUT_SETTING.get(settings); - clusterFormationWarningTimeout = DISCOVERY_CLUSTER_FORMATION_WARNING_TIMEOUT_SETTING.get(settings); this.transportService = transportService; this.transportAddressConnector = transportAddressConnector; this.configuredHostsResolver = configuredHostsResolver; @@ -129,18 +122,12 @@ public void activate(final DiscoveryNodes lastAcceptedNodes) { active = true; this.lastAcceptedNodes = lastAcceptedNodes; leader = Optional.empty(); - updateNextWarningTime(); handleWakeUp(); // return value discarded: there are no known peers, so none can be disconnected } onFoundPeersUpdated(); // trigger a check for a quorum already } - private void updateNextWarningTime() { - assert holdsLock() : "PeerFinder mutex not held"; - nextWarningTimeMillis = transportService.getThreadPool().relativeTimeInMillis() + clusterFormationWarningTimeout.millis(); - } - public void deactivate(DiscoveryNode leader) { final boolean peersRemoved; synchronized (mutex) { @@ -222,6 +209,10 @@ private DiscoveryNode getLocalNode() { */ protected abstract void onFoundPeersUpdated(); + public List getLastResolvedAddresses() { + return lastResolvedAddresses; + } + public interface TransportAddressConnector { /** * Identify the node at the given address and, if it is a master node and not the local node then establish a full connection to it. @@ -309,11 +300,6 @@ protected void doRun() { onFoundPeersUpdated(); } - @Override - public void onAfter() { - maybeWarnClusterFormationFailed(); - } - @Override public String toString() { return "PeerFinder handling wakeup"; @@ -323,26 +309,6 @@ public String toString() { return peersRemoved; } - private void maybeWarnClusterFormationFailed() { - assert holdsLock() == false : "PeerFinder mutex held in error"; - final DiscoveryNodes lastAcceptedNodes; - final List lastResolvedAddresses; - final List foundPeers; - synchronized (mutex) { - if (nextWarningTimeMillis >= transportService.getThreadPool().relativeTimeInMillis()) { - return; - } - updateNextWarningTime(); - lastAcceptedNodes = PeerFinder.this.lastAcceptedNodes; - lastResolvedAddresses = PeerFinder.this.lastResolvedAddresses; - foundPeers = getFoundPeersUnderLock(); - } - warnClusterFormationFailed(lastAcceptedNodes, lastResolvedAddresses, foundPeers); - } - - protected abstract void warnClusterFormationFailed(DiscoveryNodes clusterStateNodes, List resolvedAddresses, - List foundPeers); - private void startProbe(TransportAddress transportAddress) { assert holdsLock() : "PeerFinder mutex not held"; if (active == false) { diff --git a/server/src/test/java/org/elasticsearch/cluster/coordination/ClusterBootstrapServiceTests.java b/server/src/test/java/org/elasticsearch/cluster/coordination/ClusterBootstrapServiceTests.java index 2b9ffdb994c05..23030b9500fa3 100644 --- a/server/src/test/java/org/elasticsearch/cluster/coordination/ClusterBootstrapServiceTests.java +++ b/server/src/test/java/org/elasticsearch/cluster/coordination/ClusterBootstrapServiceTests.java @@ -51,7 +51,6 @@ import static org.elasticsearch.cluster.coordination.ClusterBootstrapService.INITIAL_MASTER_NODES_SETTING; import static org.elasticsearch.cluster.coordination.ClusterBootstrapService.INITIAL_MASTER_NODE_COUNT_SETTING; import static org.elasticsearch.node.Node.NODE_NAME_SETTING; -import static org.hamcrest.Matchers.is; public class ClusterBootstrapServiceTests extends ESTestCase { @@ -205,12 +204,4 @@ public String toString() { deterministicTaskQueue.runAllTasks(); // termination means success } - - public void testBootstrapDescription() { - assertThat(clusterBootstrapService.getBootstrapDescription(), - is("discovery of at least 3 master-eligible nodes for cluster bootstrapping")); - - assertThat(new ClusterBootstrapService(Settings.EMPTY, transportService).getBootstrapDescription(), - is("external cluster bootstrapping")); - } } diff --git a/server/src/test/java/org/elasticsearch/cluster/coordination/ClusterFormationFailureHelperTests.java b/server/src/test/java/org/elasticsearch/cluster/coordination/ClusterFormationFailureHelperTests.java new file mode 100644 index 0000000000000..38c81f5d45a19 --- /dev/null +++ b/server/src/test/java/org/elasticsearch/cluster/coordination/ClusterFormationFailureHelperTests.java @@ -0,0 +1,272 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.elasticsearch.cluster.coordination; + +import org.elasticsearch.Version; +import org.elasticsearch.cluster.ClusterName; +import org.elasticsearch.cluster.ClusterState; +import org.elasticsearch.cluster.coordination.ClusterFormationFailureHelper.ClusterFormationState; +import org.elasticsearch.cluster.coordination.CoordinationMetaData.VotingConfiguration; +import org.elasticsearch.cluster.metadata.MetaData; +import org.elasticsearch.cluster.node.DiscoveryNode; +import org.elasticsearch.cluster.node.DiscoveryNodes; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.common.transport.TransportAddress; +import org.elasticsearch.test.ESTestCase; + +import java.util.Arrays; +import java.util.concurrent.atomic.AtomicLong; +import java.util.stream.Collectors; + +import static java.util.Collections.emptyList; +import static java.util.Collections.emptyMap; +import static java.util.Collections.emptySet; +import static java.util.Collections.singletonList; +import static org.elasticsearch.cluster.coordination.ClusterBootstrapService.INITIAL_MASTER_NODES_SETTING; +import static org.elasticsearch.cluster.coordination.ClusterBootstrapService.INITIAL_MASTER_NODE_COUNT_SETTING; +import static org.elasticsearch.node.Node.NODE_NAME_SETTING; +import static org.hamcrest.Matchers.greaterThanOrEqualTo; +import static org.hamcrest.Matchers.is; + +public class ClusterFormationFailureHelperTests extends ESTestCase { + public void testScheduling() { + final long expectedDelayMillis; + final Settings.Builder settingsBuilder = Settings.builder(); + if (randomBoolean()) { + expectedDelayMillis + = ClusterFormationFailureHelper.DISCOVERY_CLUSTER_FORMATION_WARNING_TIMEOUT_SETTING.get(Settings.EMPTY).millis(); + } else { + expectedDelayMillis = randomLongBetween(100, 100000); + settingsBuilder.put(ClusterFormationFailureHelper.DISCOVERY_CLUSTER_FORMATION_WARNING_TIMEOUT_SETTING.getKey(), + expectedDelayMillis + "ms"); + } + + final DiscoveryNode localNode = new DiscoveryNode("local", buildNewFakeTransportAddress(), emptyMap(), emptySet(), Version.CURRENT); + final ClusterState clusterState = ClusterState.builder(ClusterName.DEFAULT) + .nodes(DiscoveryNodes.builder().add(localNode).localNodeId(localNode.getId())).build(); + + final DeterministicTaskQueue deterministicTaskQueue + = new DeterministicTaskQueue(Settings.builder().put(NODE_NAME_SETTING.getKey(), "node").build(), random()); + + final AtomicLong warningCount = new AtomicLong(); + + final ClusterFormationFailureHelper clusterFormationFailureHelper = new ClusterFormationFailureHelper(settingsBuilder.build(), + () -> { + warningCount.incrementAndGet(); + return new ClusterFormationState(Settings.EMPTY, clusterState, emptyList(), emptyList()); + }, + deterministicTaskQueue.getThreadPool()); + + deterministicTaskQueue.runAllTasks(); + assertThat("should not schedule anything yet", warningCount.get(), is(0L)); + + final long startTimeMillis = deterministicTaskQueue.getCurrentTimeMillis(); + clusterFormationFailureHelper.start(); + + while (warningCount.get() == 0) { + assertTrue(clusterFormationFailureHelper.isRunning()); + if (deterministicTaskQueue.hasRunnableTasks()) { + deterministicTaskQueue.runRandomTask(); + } else { + deterministicTaskQueue.advanceTime(); + } + } + assertThat(warningCount.get(), is(1L)); + assertThat(deterministicTaskQueue.getCurrentTimeMillis() - startTimeMillis, is(expectedDelayMillis)); + + while (warningCount.get() < 5) { + assertTrue(clusterFormationFailureHelper.isRunning()); + if (deterministicTaskQueue.hasRunnableTasks()) { + deterministicTaskQueue.runRandomTask(); + } else { + deterministicTaskQueue.advanceTime(); + } + } + assertThat(deterministicTaskQueue.getCurrentTimeMillis() - startTimeMillis, greaterThanOrEqualTo(5 * expectedDelayMillis)); + + clusterFormationFailureHelper.stop(); + assertFalse(clusterFormationFailureHelper.isRunning()); + deterministicTaskQueue.runAllTasksInTimeOrder(); + + assertThat(warningCount.get(), is(5L)); + } + + public void testDescriptionOnMasterIneligibleNodes() { + final DiscoveryNode localNode = new DiscoveryNode("local", buildNewFakeTransportAddress(), emptyMap(), emptySet(), Version.CURRENT); + final ClusterState clusterState = ClusterState.builder(ClusterName.DEFAULT) + .nodes(DiscoveryNodes.builder().add(localNode).localNodeId(localNode.getId())).build(); + + assertThat(new ClusterFormationState(Settings.EMPTY, clusterState, emptyList(), emptyList()).getDescription(), + is("master not discovered yet: have discovered []; discovery will continue using [] from hosts providers and [" + localNode + + "] from last-known cluster state")); + + final TransportAddress otherAddress = buildNewFakeTransportAddress(); + assertThat(new ClusterFormationState(Settings.EMPTY, clusterState, singletonList(otherAddress), emptyList()).getDescription(), + is("master not discovered yet: have discovered []; discovery will continue using [" + otherAddress + + "] from hosts providers and [" + localNode + "] from last-known cluster state")); + + final DiscoveryNode otherNode = new DiscoveryNode("other", buildNewFakeTransportAddress(), Version.CURRENT); + assertThat(new ClusterFormationState(Settings.EMPTY, clusterState, emptyList(), singletonList(otherNode)).getDescription(), + is("master not discovered yet: have discovered [" + otherNode + "]; discovery will continue using [] from hosts providers and [" + + localNode + "] from last-known cluster state")); + } + + public void testDescriptionBeforeBootstrapping() { + final DiscoveryNode localNode = new DiscoveryNode("local", buildNewFakeTransportAddress(), Version.CURRENT); + final ClusterState clusterState = ClusterState.builder(ClusterName.DEFAULT) + .nodes(DiscoveryNodes.builder().add(localNode).localNodeId(localNode.getId())).build(); + + assertThat(new ClusterFormationState(Settings.EMPTY, clusterState, emptyList(), emptyList()).getDescription(), + is("master not discovered yet, this node has not previously joined a bootstrapped (v7+) cluster, and " + + "cluster bootstrapping is disabled on this node: have discovered []; " + + "discovery will continue using [] from hosts providers and [" + localNode + "] from last-known cluster state")); + + final TransportAddress otherAddress = buildNewFakeTransportAddress(); + assertThat(new ClusterFormationState(Settings.EMPTY, clusterState, singletonList(otherAddress), emptyList()).getDescription(), + is("master not discovered yet, this node has not previously joined a bootstrapped (v7+) cluster, and " + + "cluster bootstrapping is disabled on this node: have discovered []; " + + "discovery will continue using [" + otherAddress + "] from hosts providers and [" + localNode + + "] from last-known cluster state")); + + final DiscoveryNode otherNode = new DiscoveryNode("other", buildNewFakeTransportAddress(), Version.CURRENT); + assertThat(new ClusterFormationState(Settings.EMPTY, clusterState, emptyList(), singletonList(otherNode)).getDescription(), + is("master not discovered yet, this node has not previously joined a bootstrapped (v7+) cluster, and " + + "cluster bootstrapping is disabled on this node: have discovered [" + otherNode + "]; " + + "discovery will continue using [] from hosts providers and [" + localNode + "] from last-known cluster state")); + + assertThat(new ClusterFormationState(Settings.builder().put(INITIAL_MASTER_NODE_COUNT_SETTING.getKey(), 2).build(), + clusterState, emptyList(), emptyList()).getDescription(), + is("master not discovered yet, this node has not previously joined a bootstrapped (v7+) cluster, and " + + "this node must discover at least [2] master-eligible nodes to bootstrap a cluster: have discovered []; " + + "discovery will continue using [] from hosts providers and [" + localNode + "] from last-known cluster state")); + + assertThat(new ClusterFormationState(Settings.builder().putList(INITIAL_MASTER_NODES_SETTING.getKey(), "other").build(), + clusterState, emptyList(), emptyList()).getDescription(), + is("master not discovered yet, this node has not previously joined a bootstrapped (v7+) cluster, and " + + "this node must discover master-eligible nodes [other] to bootstrap a cluster: have discovered []; " + + "discovery will continue using [] from hosts providers and [" + localNode + "] from last-known cluster state")); + + assertThat(new ClusterFormationState(Settings.builder().putList(INITIAL_MASTER_NODES_SETTING.getKey(), "other") + .put(INITIAL_MASTER_NODE_COUNT_SETTING.getKey(), 1).build(), + clusterState, emptyList(), emptyList()).getDescription(), + is("master not discovered yet, this node has not previously joined a bootstrapped (v7+) cluster, and " + + "this node must discover master-eligible nodes [other] to bootstrap a cluster: have discovered []; " + + "discovery will continue using [] from hosts providers and [" + localNode + "] from last-known cluster state")); + + assertThat(new ClusterFormationState(Settings.builder().putList(INITIAL_MASTER_NODES_SETTING.getKey(), "other") + .put(INITIAL_MASTER_NODE_COUNT_SETTING.getKey(), 2).build(), + clusterState, emptyList(), emptyList()).getDescription(), + is("master not discovered yet, this node has not previously joined a bootstrapped (v7+) cluster, and " + + "this node must discover at least [2] master-eligible nodes, including [other], to bootstrap a cluster: have discovered " + + "[]; discovery will continue using [] from hosts providers and [" + localNode + "] from last-known cluster state")); + } + + private static VotingConfiguration config(String[] nodeIds) { + return new VotingConfiguration(Arrays.stream(nodeIds).collect(Collectors.toSet())); + } + + private static ClusterState state(DiscoveryNode localNode, String... configNodeIds) { + return state(localNode, configNodeIds, configNodeIds); + } + + private static ClusterState state(DiscoveryNode localNode, String[] acceptedConfig, String[] committedConfig) { + return ClusterState.builder(ClusterName.DEFAULT) + .nodes(DiscoveryNodes.builder().add(localNode).localNodeId(localNode.getId())) + .metaData(MetaData.builder().coordinationMetaData(CoordinationMetaData.builder() + .lastAcceptedConfiguration(config(acceptedConfig)) + .lastCommittedConfiguration(config(committedConfig)).build())).build(); + } + + public void testDescriptionAfterBootstrapping() { + final DiscoveryNode localNode = new DiscoveryNode("local", buildNewFakeTransportAddress(), Version.CURRENT); + + final ClusterState clusterState = state(localNode, "otherNode"); + + assertThat(new ClusterFormationState(Settings.EMPTY, clusterState, emptyList(), emptyList()).getDescription(), + is("master not discovered or elected yet, an election requires a node with id [otherNode], " + + "have discovered [] which is not a quorum; " + + "discovery will continue using [] from hosts providers and [" + localNode + "] from last-known cluster state")); + + final TransportAddress otherAddress = buildNewFakeTransportAddress(); + assertThat(new ClusterFormationState(Settings.EMPTY, clusterState, singletonList(otherAddress), emptyList()).getDescription(), + is("master not discovered or elected yet, an election requires a node with id [otherNode], " + + "have discovered [] which is not a quorum; " + + "discovery will continue using [" + otherAddress + "] from hosts providers and [" + localNode + + "] from last-known cluster state")); + + final DiscoveryNode otherNode = new DiscoveryNode("otherNode", buildNewFakeTransportAddress(), Version.CURRENT); + assertThat(new ClusterFormationState(Settings.EMPTY, clusterState, emptyList(), singletonList(otherNode)).getDescription(), + is("master not discovered or elected yet, an election requires a node with id [otherNode], " + + "have discovered [" + otherNode + "] which is a quorum; " + + "discovery will continue using [] from hosts providers and [" + localNode + "] from last-known cluster state")); + + final DiscoveryNode yetAnotherNode = new DiscoveryNode("yetAnotherNode", buildNewFakeTransportAddress(), Version.CURRENT); + assertThat(new ClusterFormationState(Settings.EMPTY, clusterState, emptyList(), singletonList(yetAnotherNode)).getDescription(), + is("master not discovered or elected yet, an election requires a node with id [otherNode], " + + "have discovered [" + yetAnotherNode + "] which is not a quorum; " + + "discovery will continue using [] from hosts providers and [" + localNode + "] from last-known cluster state")); + + assertThat(new ClusterFormationState(Settings.EMPTY, state(localNode, "n1", "n2"), emptyList(), emptyList()).getDescription(), + is("master not discovered or elected yet, an election requires two nodes with ids [n1, n2], " + + "have discovered [] which is not a quorum; " + + "discovery will continue using [] from hosts providers and [" + localNode + "] from last-known cluster state")); + + assertThat(new ClusterFormationState(Settings.EMPTY, state(localNode, "n1", "n2", "n3"), emptyList(), emptyList()).getDescription(), + is("master not discovered or elected yet, an election requires at least 2 nodes with ids from [n1, n2, n3], " + + "have discovered [] which is not a quorum; " + + "discovery will continue using [] from hosts providers and [" + localNode + "] from last-known cluster state")); + + assertThat(new ClusterFormationState(Settings.EMPTY, state(localNode, "n1", "n2", "n3", "n4"), emptyList(), emptyList()) + .getDescription(), + is("master not discovered or elected yet, an election requires at least 3 nodes with ids from [n1, n2, n3, n4], " + + "have discovered [] which is not a quorum; " + + "discovery will continue using [] from hosts providers and [" + localNode + "] from last-known cluster state")); + + assertThat(new ClusterFormationState(Settings.EMPTY, state(localNode, "n1", "n2", "n3", "n4", "n5"), emptyList(), emptyList()) + .getDescription(), + is("master not discovered or elected yet, an election requires at least 3 nodes with ids from [n1, n2, n3, n4, n5], " + + "have discovered [] which is not a quorum; " + + "discovery will continue using [] from hosts providers and [" + localNode + "] from last-known cluster state")); + + assertThat(new ClusterFormationState(Settings.EMPTY, state(localNode, new String[]{"n1"}, new String[]{"n1"}), + emptyList(), emptyList()).getDescription(), + is("master not discovered or elected yet, an election requires a node with id [n1], " + + "have discovered [] which is not a quorum; " + + "discovery will continue using [] from hosts providers and [" + localNode + "] from last-known cluster state")); + + assertThat(new ClusterFormationState(Settings.EMPTY, state(localNode, new String[]{"n1"}, new String[]{"n2"}), + emptyList(), emptyList()).getDescription(), + is("master not discovered or elected yet, an election requires a node with id [n1] and a node with id [n2], " + + "have discovered [] which is not a quorum; " + + "discovery will continue using [] from hosts providers and [" + localNode + "] from last-known cluster state")); + + assertThat(new ClusterFormationState(Settings.EMPTY, state(localNode, new String[]{"n1"}, new String[]{"n2", "n3"}), + emptyList(), emptyList()).getDescription(), + is("master not discovered or elected yet, an election requires a node with id [n1] and two nodes with ids [n2, n3], " + + "have discovered [] which is not a quorum; " + + "discovery will continue using [] from hosts providers and [" + localNode + "] from last-known cluster state")); + + assertThat(new ClusterFormationState(Settings.EMPTY, state(localNode, new String[]{"n1"}, new String[]{"n2", "n3", "n4"}), + emptyList(), emptyList()).getDescription(), + is("master not discovered or elected yet, an election requires a node with id [n1] and " + + "at least 2 nodes with ids from [n2, n3, n4], " + + "have discovered [] which is not a quorum; " + + "discovery will continue using [] from hosts providers and [" + localNode + "] from last-known cluster state")); + } +} diff --git a/server/src/test/java/org/elasticsearch/cluster/coordination/CoordinationMetaDataTests.java b/server/src/test/java/org/elasticsearch/cluster/coordination/CoordinationMetaDataTests.java index 4b9ece8f97736..5e366c6bdcce7 100644 --- a/server/src/test/java/org/elasticsearch/cluster/coordination/CoordinationMetaDataTests.java +++ b/server/src/test/java/org/elasticsearch/cluster/coordination/CoordinationMetaDataTests.java @@ -36,10 +36,7 @@ import java.util.HashSet; import java.util.Set; -import static org.hamcrest.Matchers.anyOf; import static org.hamcrest.Matchers.equalTo; -import static org.hamcrest.Matchers.is; -import static org.hamcrest.Matchers.startsWith; public class CoordinationMetaDataTests extends ESTestCase { @@ -50,7 +47,6 @@ public void testVotingConfiguration() { assertThat(config0.isEmpty(), equalTo(true)); assertThat(config0.hasQuorum(Sets.newHashSet()), equalTo(false)); assertThat(config0.hasQuorum(Sets.newHashSet("id1")), equalTo(false)); - assertThat(config0.getQuorumDescription(), is("cluster bootstrapping")); VotingConfiguration config1 = new VotingConfiguration(Sets.newHashSet("id1")); assertThat(config1.getNodeIds(), equalTo(Sets.newHashSet("id1"))); @@ -59,7 +55,6 @@ public void testVotingConfiguration() { assertThat(config1.hasQuorum(Sets.newHashSet("id1", "id2")), equalTo(true)); assertThat(config1.hasQuorum(Sets.newHashSet("id2")), equalTo(false)); assertThat(config1.hasQuorum(Sets.newHashSet()), equalTo(false)); - assertThat(config1.getQuorumDescription(), is("node with id [id1]")); VotingConfiguration config2 = new VotingConfiguration(Sets.newHashSet("id1", "id2")); assertThat(config2.getNodeIds(), equalTo(Sets.newHashSet("id1", "id2"))); @@ -71,7 +66,6 @@ public void testVotingConfiguration() { assertThat(config2.hasQuorum(Sets.newHashSet("id3")), equalTo(false)); assertThat(config2.hasQuorum(Sets.newHashSet("id1", "id3")), equalTo(false)); assertThat(config2.hasQuorum(Sets.newHashSet()), equalTo(false)); - assertThat(config2.getQuorumDescription(), anyOf(is("two nodes with ids [id1, id2]"), is("two nodes with ids [id2, id1]"))); VotingConfiguration config3 = new VotingConfiguration(Sets.newHashSet("id1", "id2", "id3")); assertThat(config3.getNodeIds(), equalTo(Sets.newHashSet("id1", "id2", "id3"))); @@ -87,10 +81,6 @@ public void testVotingConfiguration() { assertThat(config3.hasQuorum(Sets.newHashSet("id1", "id4")), equalTo(false)); assertThat(config3.hasQuorum(Sets.newHashSet("id1", "id4", "id5")), equalTo(false)); assertThat(config3.hasQuorum(Sets.newHashSet()), equalTo(false)); - assertThat(config3.getQuorumDescription(), startsWith("at least 2 nodes with ids from [")); - - VotingConfiguration config4 = new VotingConfiguration(Sets.newHashSet("id1", "id2", "id3", "id4")); - assertThat(config4.getQuorumDescription(), startsWith("at least 3 nodes with ids from [")); } public void testVotingConfigurationSerializationEqualsHashCode() { diff --git a/server/src/test/java/org/elasticsearch/cluster/coordination/CoordinationStateTests.java b/server/src/test/java/org/elasticsearch/cluster/coordination/CoordinationStateTests.java index 7fa113c071ea6..ab2ad25e72152 100644 --- a/server/src/test/java/org/elasticsearch/cluster/coordination/CoordinationStateTests.java +++ b/server/src/test/java/org/elasticsearch/cluster/coordination/CoordinationStateTests.java @@ -50,7 +50,6 @@ import static org.hamcrest.Matchers.containsString; import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.hasSize; -import static org.hamcrest.Matchers.is; public class CoordinationStateTests extends ESTestCase { @@ -767,16 +766,6 @@ public void testVoteCollection() { }); } - public void testQuorumDescription() { - VotingConfiguration node1Config = new VotingConfiguration(Collections.singleton(node1.getId())); - cs1.setInitialState(clusterState(0L, 1L, node1, node1Config, node1Config, 42L)); - assertThat(cs1.getQuorumDescription(), is("node with id [node1]")); - - VotingConfiguration node2Config = new VotingConfiguration(Collections.singleton(node2.getId())); - cs2.setInitialState(clusterState(0L, 1L, node1, node1Config, node2Config, 42L)); - assertThat(cs2.getQuorumDescription(), is("node with id [node2] and node with id [node1]")); - } - public void testSafety() { new Cluster(randomIntBetween(1, 5)).runRandomly(); } @@ -822,7 +811,7 @@ public static ClusterState setValue(ClusterState clusterState, long value) { public static long value(ClusterState clusterState) { return clusterState.metaData().persistentSettings().getAsLong("value", 0L); } - + static class ClusterNode { final DiscoveryNode localNode; diff --git a/server/src/test/java/org/elasticsearch/discovery/PeerFinderTests.java b/server/src/test/java/org/elasticsearch/discovery/PeerFinderTests.java index e1f1951781887..78a2f2446c5dc 100644 --- a/server/src/test/java/org/elasticsearch/discovery/PeerFinderTests.java +++ b/server/src/test/java/org/elasticsearch/discovery/PeerFinderTests.java @@ -152,7 +152,6 @@ public String toString() { class TestPeerFinder extends PeerFinder { DiscoveryNode discoveredMasterNode; OptionalLong discoveredMasterTerm = OptionalLong.empty(); - boolean emittedWarning; TestPeerFinder(Settings settings, TransportService transportService, TransportAddressConnector transportAddressConnector) { super(settings, transportService, transportAddressConnector, PeerFinderTests.this::resolveConfiguredHosts); @@ -173,12 +172,6 @@ protected void onFoundPeersUpdated() { foundPeersFromNotification = getFoundPeers(); logger.trace("onFoundPeersUpdated({})", foundPeersFromNotification); } - - @Override - protected void warnClusterFormationFailed(DiscoveryNodes clusterStateNodes, List resolvedAddresses, - List foundPeers) { - emittedWarning = true; - } } private void resolveConfiguredHosts(Consumer> onResult) { @@ -780,39 +773,6 @@ public void testReconnectsToDisconnectedNodes() { assertFoundPeers(rebootedOtherNode); } - public void testEmitsWarningsIfClusterDoesNotFormFastEnough() { - peerFinder.activate(lastAcceptedNodes); - assertFalse(peerFinder.emittedWarning); - - final long warningTimeout = PeerFinder.DISCOVERY_CLUSTER_FORMATION_WARNING_TIMEOUT_SETTING.get(Settings.EMPTY).millis() - + PeerFinder.DISCOVERY_FIND_PEERS_INTERVAL_SETTING.get(Settings.EMPTY).millis(); - - final long expectedWarningTime1 = deterministicTaskQueue.getCurrentTimeMillis() + warningTimeout; - while (deterministicTaskQueue.getCurrentTimeMillis() < expectedWarningTime1) { - deterministicTaskQueue.advanceTime(); - runAllRunnableTasks(); - } - - assertTrue(peerFinder.emittedWarning); - peerFinder.emittedWarning = false; - - final long expectedNoWarningTime = deterministicTaskQueue.getCurrentTimeMillis() - + PeerFinder.DISCOVERY_FIND_PEERS_INTERVAL_SETTING.get(Settings.EMPTY).millis() * 3; - final long expectedWarningTime2 = deterministicTaskQueue.getCurrentTimeMillis() + warningTimeout; - - while (deterministicTaskQueue.getCurrentTimeMillis() < expectedNoWarningTime) { - deterministicTaskQueue.advanceTime(); - runAllRunnableTasks(); - } - assertFalse(peerFinder.emittedWarning); - - while (deterministicTaskQueue.getCurrentTimeMillis() < expectedWarningTime2) { - deterministicTaskQueue.advanceTime(); - runAllRunnableTasks(); - } - assertTrue(peerFinder.emittedWarning); - } - private void respondToRequests(Function responseFactory) { final CapturedRequest[] capturedRequests = capturingTransport.getCapturedRequestsAndClear(); for (final CapturedRequest capturedRequest : capturedRequests) { From 69c27abf3453170ab755a10dafbbd7badc038655 Mon Sep 17 00:00:00 2001 From: David Turner Date: Fri, 7 Dec 2018 14:35:40 +0000 Subject: [PATCH 4/4] It can be volatile --- .../ClusterFormationFailureHelper.java | 19 +++++++------- .../ClusterFormationFailureHelperTests.java | 26 +++++++++++++++++-- 2 files changed, 33 insertions(+), 12 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/cluster/coordination/ClusterFormationFailureHelper.java b/server/src/main/java/org/elasticsearch/cluster/coordination/ClusterFormationFailureHelper.java index 317991dc2d69b..79142ec07c1d1 100644 --- a/server/src/main/java/org/elasticsearch/cluster/coordination/ClusterFormationFailureHelper.java +++ b/server/src/main/java/org/elasticsearch/cluster/coordination/ClusterFormationFailureHelper.java @@ -25,6 +25,7 @@ import org.elasticsearch.cluster.coordination.CoordinationMetaData.VotingConfiguration; import org.elasticsearch.cluster.coordination.CoordinationState.VoteCollection; import org.elasticsearch.cluster.node.DiscoveryNode; +import org.elasticsearch.common.Nullable; import org.elasticsearch.common.settings.Setting; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.transport.TransportAddress; @@ -36,7 +37,6 @@ import java.util.List; import java.util.Locale; import java.util.Set; -import java.util.concurrent.atomic.AtomicReference; import java.util.function.Supplier; import java.util.stream.Collectors; import java.util.stream.StreamSupport; @@ -54,35 +54,34 @@ public class ClusterFormationFailureHelper { private final Supplier clusterFormationStateSupplier; private final ThreadPool threadPool; private final TimeValue clusterFormationWarningTimeout; - private final AtomicReference warningScheduler; // null if no warning is scheduled + @Nullable // if no warning is scheduled + private volatile WarningScheduler warningScheduler; public ClusterFormationFailureHelper(Settings settings, Supplier clusterFormationStateSupplier, ThreadPool threadPool) { this.clusterFormationStateSupplier = clusterFormationStateSupplier; this.threadPool = threadPool; this.clusterFormationWarningTimeout = DISCOVERY_CLUSTER_FORMATION_WARNING_TIMEOUT_SETTING.get(settings); - warningScheduler = new AtomicReference<>(); } public boolean isRunning() { - return warningScheduler.get() != null; + return warningScheduler != null; } public void start() { - final WarningScheduler newWarningScheduler = new WarningScheduler(); - if (warningScheduler.compareAndSet(null, newWarningScheduler)) { - newWarningScheduler.scheduleNextWarning(); - } + assert warningScheduler == null; + warningScheduler = new WarningScheduler(); + warningScheduler.scheduleNextWarning(); } public void stop() { - warningScheduler.set(null); + warningScheduler = null; } private class WarningScheduler { private boolean isActive() { - return warningScheduler.get() == this; + return warningScheduler == this; } void scheduleNextWarning() { diff --git a/server/src/test/java/org/elasticsearch/cluster/coordination/ClusterFormationFailureHelperTests.java b/server/src/test/java/org/elasticsearch/cluster/coordination/ClusterFormationFailureHelperTests.java index 38c81f5d45a19..40b15709fb730 100644 --- a/server/src/test/java/org/elasticsearch/cluster/coordination/ClusterFormationFailureHelperTests.java +++ b/server/src/test/java/org/elasticsearch/cluster/coordination/ClusterFormationFailureHelperTests.java @@ -41,7 +41,7 @@ import static org.elasticsearch.cluster.coordination.ClusterBootstrapService.INITIAL_MASTER_NODES_SETTING; import static org.elasticsearch.cluster.coordination.ClusterBootstrapService.INITIAL_MASTER_NODE_COUNT_SETTING; import static org.elasticsearch.node.Node.NODE_NAME_SETTING; -import static org.hamcrest.Matchers.greaterThanOrEqualTo; +import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.is; public class ClusterFormationFailureHelperTests extends ESTestCase { @@ -98,7 +98,29 @@ public void testScheduling() { deterministicTaskQueue.advanceTime(); } } - assertThat(deterministicTaskQueue.getCurrentTimeMillis() - startTimeMillis, greaterThanOrEqualTo(5 * expectedDelayMillis)); + assertThat(deterministicTaskQueue.getCurrentTimeMillis() - startTimeMillis, equalTo(5 * expectedDelayMillis)); + + clusterFormationFailureHelper.stop(); + assertFalse(clusterFormationFailureHelper.isRunning()); + deterministicTaskQueue.runAllTasksInTimeOrder(); + + assertThat(warningCount.get(), is(5L)); + + warningCount.set(0); + clusterFormationFailureHelper.start(); + clusterFormationFailureHelper.stop(); + clusterFormationFailureHelper.start(); + final long secondStartTimeMillis = deterministicTaskQueue.getCurrentTimeMillis(); + + while (warningCount.get() < 5) { + assertTrue(clusterFormationFailureHelper.isRunning()); + if (deterministicTaskQueue.hasRunnableTasks()) { + deterministicTaskQueue.runRandomTask(); + } else { + deterministicTaskQueue.advanceTime(); + } + } + assertThat(deterministicTaskQueue.getCurrentTimeMillis() - secondStartTimeMillis, equalTo(5 * expectedDelayMillis)); clusterFormationFailureHelper.stop(); assertFalse(clusterFormationFailureHelper.isRunning());