From 5c20e6366b976701f349492df49fa1e25d9fb9bf Mon Sep 17 00:00:00 2001 From: David Turner Date: Sun, 7 Oct 2018 08:18:17 +0100 Subject: [PATCH 1/5] Add low-level bootstrap implementation Today we inject the initial configuration of the cluster (i.e. the set of voting nodes) at startup. In reality we must support injecting the initial configuration after startup too. This commit adds low-level support for doing so as safely as possible. --- .../coordination/CoordinationState.java | 18 ++-- .../cluster/coordination/Coordinator.java | 82 ++++++++++---- .../cluster/service/MasterService.java | 2 +- .../coordination/CoordinatorTests.java | 101 +++++++++++++++++- 4 files changed, 167 insertions(+), 36 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/cluster/coordination/CoordinationState.java b/server/src/main/java/org/elasticsearch/cluster/coordination/CoordinationState.java index d5b9cdf6adfc3..fb7071e70806a 100644 --- a/server/src/main/java/org/elasticsearch/cluster/coordination/CoordinationState.java +++ b/server/src/main/java/org/elasticsearch/cluster/coordination/CoordinationState.java @@ -132,17 +132,17 @@ public void setInitialState(ClusterState initialState) { throw new CoordinationStateRejectedException("initial state already set: last-accepted version now " + lastAcceptedVersion); } - assert getLastAcceptedTerm() == 0; - assert getLastAcceptedConfiguration().isEmpty(); - assert getLastCommittedConfiguration().isEmpty(); - assert lastPublishedVersion == 0; - assert lastPublishedConfiguration.isEmpty(); + assert getLastAcceptedTerm() == 0 : getLastAcceptedTerm(); + assert getLastAcceptedConfiguration().isEmpty() : getLastAcceptedConfiguration(); + assert getLastCommittedConfiguration().isEmpty() : getLastCommittedConfiguration(); + assert lastPublishedVersion == 0 : lastAcceptedVersion; + assert lastPublishedConfiguration.isEmpty() : lastPublishedConfiguration; assert electionWon == false; - assert joinVotes.isEmpty(); - assert publishVotes.isEmpty(); + assert joinVotes.isEmpty() : joinVotes; + assert publishVotes.isEmpty() : publishVotes; - assert initialState.term() == 0; - assert initialState.version() == 1; + assert initialState.term() == 0 : initialState; + assert initialState.version() == 1 : initialState; assert initialState.getLastAcceptedConfiguration().isEmpty() == false; assert initialState.getLastCommittedConfiguration().isEmpty() == false; diff --git a/server/src/main/java/org/elasticsearch/cluster/coordination/Coordinator.java b/server/src/main/java/org/elasticsearch/cluster/coordination/Coordinator.java index a8e21da705ce7..c193d6e89c070 100644 --- a/server/src/main/java/org/elasticsearch/cluster/coordination/Coordinator.java +++ b/server/src/main/java/org/elasticsearch/cluster/coordination/Coordinator.java @@ -24,6 +24,8 @@ import org.elasticsearch.cluster.ClusterChangedEvent; import org.elasticsearch.cluster.ClusterName; import org.elasticsearch.cluster.ClusterState; +import org.elasticsearch.cluster.ClusterState.Builder; +import org.elasticsearch.cluster.ClusterState.VotingConfiguration; import org.elasticsearch.cluster.ClusterStateTaskConfig; import org.elasticsearch.cluster.block.ClusterBlocks; import org.elasticsearch.cluster.coordination.FollowersChecker.FollowerCheckRequest; @@ -480,6 +482,8 @@ public void invariant() { assert followersChecker.getFastResponseState().term == getCurrentTerm() : followersChecker.getFastResponseState(); assert followersChecker.getFastResponseState().mode == getMode() : followersChecker.getFastResponseState(); assert (applierState.nodes().getMasterNodeId() == null) == applierState.blocks().hasGlobalBlock(NO_MASTER_BLOCK_WRITES.id()); + assert preVoteCollector.getPreVoteResponse().equals(getPreVoteResponse()) + : preVoteCollector + " vs " + getPreVoteResponse(); if (mode == Mode.LEADER) { final boolean becomingMaster = getStateForMasterService().term() != getCurrentTerm(); @@ -493,7 +497,6 @@ public void invariant() { assert leaderCheckScheduler == null : leaderCheckScheduler; assert applierState.nodes().getMasterNodeId() == null || getLocalNode().equals(applierState.nodes().getMasterNode()); assert preVoteCollector.getLeader() == getLocalNode() : preVoteCollector; - assert preVoteCollector.getPreVoteResponse().equals(getPreVoteResponse()) : preVoteCollector; final boolean activePublication = currentPublication.map(CoordinatorPublication::isActiveForCurrentLeader).orElse(false); if (becomingMaster && activePublication == false) { @@ -527,7 +530,6 @@ public void invariant() { assert followersChecker.getKnownFollowers().isEmpty(); assert currentPublication.map(Publication::isCommitted).orElse(true); assert preVoteCollector.getLeader().equals(lastKnownLeader.get()) : preVoteCollector; - assert preVoteCollector.getPreVoteResponse().equals(getPreVoteResponse()) : preVoteCollector; } else { assert mode == Mode.CANDIDATE; assert joinAccumulator instanceof JoinHelper.CandidateJoinAccumulator; @@ -540,11 +542,41 @@ public void invariant() { assert applierState.nodes().getMasterNodeId() == null; assert currentPublication.map(Publication::isCommitted).orElse(true); assert preVoteCollector.getLeader() == null : preVoteCollector; - assert preVoteCollector.getPreVoteResponse().equals(getPreVoteResponse()) : preVoteCollector; } } } + public void setInitialConfiguration(final VotingConfiguration votingConfiguration) { + synchronized (mutex) { + final ClusterState currentState = getStateForMasterService(); + + if (currentState.getLastAcceptedConfiguration().isEmpty() == false) { + throw new CoordinationStateRejectedException("Cannot set initial configuration: configuration has already been set"); + } + assert currentState.term() == 0 : currentState; + assert currentState.version() == 0 : currentState; + + if (mode != Mode.CANDIDATE) { + throw new CoordinationStateRejectedException("Cannot set initial configuration in mode " + mode); + } + + final List foundPeerIds = new ArrayList<>(); + foundPeerIds.add(getLocalNode().getId()); + peerFinder.getFoundPeers().forEach(peer -> foundPeerIds.add(peer.getId())); + if (votingConfiguration.hasQuorum(foundPeerIds) == false) { + throw new CoordinationStateRejectedException("Cannot set initial configuration: no quorum found yet"); + } + + logger.debug("setting initial configuration to {}", votingConfiguration); + final Builder builder = masterService.incrementVersion(currentState); + builder.lastAcceptedConfiguration(votingConfiguration); + builder.lastCommittedConfiguration(votingConfiguration); + coordinationState.get().setInitialState(builder.build()); + startElectionScheduler(); + preVoteCollector.update(getPreVoteResponse(), null); // pick up the change to last-accepted version + } + } + // for tests boolean hasJoinVoteFrom(DiscoveryNode localNode) { return coordinationState.get().containsJoinVoteFor(localNode); @@ -731,25 +763,7 @@ protected void onFoundPeersUpdated() { if (foundQuorum) { if (electionScheduler == null) { - final TimeValue gracePeriod = TimeValue.ZERO; // TODO variable grace period - electionScheduler = electionSchedulerFactory.startElectionScheduler(gracePeriod, new Runnable() { - @Override - public void run() { - synchronized (mutex) { - if (mode == Mode.CANDIDATE) { - if (prevotingRound != null) { - prevotingRound.close(); - } - prevotingRound = preVoteCollector.start(lastAcceptedState, getDiscoveredNodes()); - } - } - } - - @Override - public String toString() { - return "scheduling of new prevoting round"; - } - }); + startElectionScheduler(); } } else { closePrevotingAndElectionScheduler(); @@ -759,6 +773,30 @@ public String toString() { } } + private void startElectionScheduler() { + assert electionScheduler == null : electionScheduler; + final TimeValue gracePeriod = TimeValue.ZERO; // TODO variable grace period + electionScheduler = electionSchedulerFactory.startElectionScheduler(gracePeriod, new Runnable() { + @Override + public void run() { + synchronized (mutex) { + if (mode == Mode.CANDIDATE) { + if (prevotingRound != null) { + prevotingRound.close(); + } + final ClusterState lastAcceptedState = coordinationState.get().getLastAcceptedState(); + prevotingRound = preVoteCollector.start(lastAcceptedState, getDiscoveredNodes()); + } + } + } + + @Override + public String toString() { + return "scheduling of new prevoting round"; + } + }); + } + class CoordinatorPublication extends Publication { private final PublishRequest publishRequest; diff --git a/server/src/main/java/org/elasticsearch/cluster/service/MasterService.java b/server/src/main/java/org/elasticsearch/cluster/service/MasterService.java index 59e4fc3852299..8719baeff9dbc 100644 --- a/server/src/main/java/org/elasticsearch/cluster/service/MasterService.java +++ b/server/src/main/java/org/elasticsearch/cluster/service/MasterService.java @@ -329,7 +329,7 @@ private ClusterState patchVersions(ClusterState previousClusterState, ClusterTas return newClusterState; } - protected Builder incrementVersion(ClusterState clusterState) { + public Builder incrementVersion(ClusterState clusterState) { return ClusterState.builder(clusterState).incrementVersion(); } diff --git a/server/src/test/java/org/elasticsearch/cluster/coordination/CoordinatorTests.java b/server/src/test/java/org/elasticsearch/cluster/coordination/CoordinatorTests.java index c1377c7d8419b..08de15e195d98 100644 --- a/server/src/test/java/org/elasticsearch/cluster/coordination/CoordinatorTests.java +++ b/server/src/test/java/org/elasticsearch/cluster/coordination/CoordinatorTests.java @@ -51,6 +51,7 @@ import java.util.ArrayList; import java.util.Arrays; +import java.util.Collections; import java.util.HashMap; import java.util.HashSet; import java.util.List; @@ -102,6 +103,7 @@ public void resetPortCounterBeforeEachTest() { public void testCanUpdateClusterStateAfterStabilisation() { final Cluster cluster = new Cluster(randomIntBetween(1, 5)); cluster.runRandomly(); + cluster.setInitialConfigurationIfRequired(); cluster.stabilise(); final ClusterNode leader = cluster.getAnyLeader(); @@ -121,6 +123,7 @@ public void testCanUpdateClusterStateAfterStabilisation() { public void testNodesJoinAfterStableCluster() { final Cluster cluster = new Cluster(randomIntBetween(1, 5)); cluster.runRandomly(); + cluster.setInitialConfigurationIfRequired(); cluster.stabilise(); final long currentTerm = cluster.getAnyLeader().coordinator.getCurrentTerm(); @@ -141,6 +144,7 @@ public void testNodesJoinAfterStableCluster() { public void testLeaderDisconnectionDetectedQuickly() { final Cluster cluster = new Cluster(randomIntBetween(3, 5)); cluster.runRandomly(); + cluster.setInitialConfigurationIfRequired(); cluster.stabilise(); final ClusterNode originalLeader = cluster.getAnyLeader(); @@ -177,6 +181,7 @@ public void testLeaderDisconnectionDetectedQuickly() { public void testUnresponsiveLeaderDetectedEventually() { final Cluster cluster = new Cluster(randomIntBetween(3, 5)); cluster.runRandomly(); + cluster.setInitialConfigurationIfRequired(); cluster.stabilise(); final ClusterNode originalLeader = cluster.getAnyLeader(); @@ -219,6 +224,7 @@ public void testUnresponsiveLeaderDetectedEventually() { public void testFollowerDisconnectionDetectedQuickly() { final Cluster cluster = new Cluster(randomIntBetween(3, 5)); cluster.runRandomly(); + cluster.setInitialConfigurationIfRequired(); cluster.stabilise(); final ClusterNode leader = cluster.getAnyLeader(); @@ -251,6 +257,7 @@ public void testFollowerDisconnectionDetectedQuickly() { public void testUnresponsiveFollowerDetectedEventually() { final Cluster cluster = new Cluster(randomIntBetween(3, 5)); cluster.runRandomly(); + cluster.setInitialConfigurationIfRequired(); cluster.stabilise(); final ClusterNode leader = cluster.getAnyLeader(); @@ -275,6 +282,7 @@ public void testUnresponsiveFollowerDetectedEventually() { public void testAckListenerReceivesAcksFromAllNodes() { final Cluster cluster = new Cluster(randomIntBetween(3, 5)); cluster.runRandomly(); + cluster.setInitialConfigurationIfRequired(); cluster.stabilise(); final ClusterNode leader = cluster.getAnyLeader(); AckCollector ackCollector = leader.submitValue(randomLong()); @@ -289,6 +297,7 @@ public void testAckListenerReceivesAcksFromAllNodes() { public void testAckListenerReceivesNackFromFollower() { final Cluster cluster = new Cluster(3); cluster.runRandomly(); + cluster.setInitialConfigurationIfRequired(); cluster.stabilise(); final ClusterNode leader = cluster.getAnyLeader(); final ClusterNode follower0 = cluster.getAnyNodeExcept(leader); @@ -306,6 +315,7 @@ public void testAckListenerReceivesNackFromFollower() { public void testAckListenerReceivesNackFromLeader() { final Cluster cluster = new Cluster(3); cluster.runRandomly(); + cluster.setInitialConfigurationIfRequired(); cluster.stabilise(); final ClusterNode leader = cluster.getAnyLeader(); final ClusterNode follower0 = cluster.getAnyNodeExcept(leader); @@ -327,6 +337,7 @@ public void testAckListenerReceivesNackFromLeader() { public void testAckListenerReceivesNoAckFromHangingFollower() { final Cluster cluster = new Cluster(3); cluster.runRandomly(); + cluster.setInitialConfigurationIfRequired(); cluster.stabilise(); final ClusterNode leader = cluster.getAnyLeader(); final ClusterNode follower0 = cluster.getAnyNodeExcept(leader); @@ -345,6 +356,7 @@ public void testAckListenerReceivesNoAckFromHangingFollower() { public void testAckListenerReceivesNacksIfPublicationTimesOut() { final Cluster cluster = new Cluster(3); cluster.runRandomly(); + cluster.setInitialConfigurationIfRequired(); cluster.stabilise(); final ClusterNode leader = cluster.getAnyLeader(); final ClusterNode follower0 = cluster.getAnyNodeExcept(leader); @@ -404,6 +416,57 @@ public void testAckListenerReceivesNacksFromFollowerInHigherTerm() { // assertTrue("expected ack from " + follower1, ackCollector.hasAckedSuccessfully(follower1)); } + public void testSettingInitialConfigurationTriggersElection() { + final Cluster cluster = new Cluster(randomIntBetween(1, 5)); + cluster.runFor(defaultMillis(DISCOVERY_FIND_PEERS_INTERVAL_SETTING) * 2 + randomLongBetween(0, 60000), "initial discovery phase"); + for (final ClusterNode clusterNode : cluster.clusterNodes) { + final String nodeId = clusterNode.getId(); + assertThat(nodeId + " is CANDIDATE", clusterNode.coordinator.getMode(), is(CANDIDATE)); + assertThat(nodeId + " is in term 0", clusterNode.coordinator.getCurrentTerm(), is(0L)); + assertThat(nodeId + " last accepted in term 0", clusterNode.coordinator.getLastAcceptedState().term(), is(0L)); + assertThat(nodeId + " last accepted version 0", clusterNode.coordinator.getLastAcceptedState().version(), is(0L)); + assertTrue(nodeId + " has an empty last-accepted configuration", + clusterNode.coordinator.getLastAcceptedState().getLastAcceptedConfiguration().isEmpty()); + assertTrue(nodeId + " has an empty last-committed configuration", + clusterNode.coordinator.getLastAcceptedState().getLastCommittedConfiguration().isEmpty()); + } + + cluster.getAnyNode().applyInitialConfiguration(); + cluster.stabilise(defaultMillis( + // the first election should succeed + ELECTION_INITIAL_TIMEOUT_SETTING) // TODO this wait is unnecessary, we could trigger the election immediately + // Allow two round-trip for pre-voting and voting + + 4 * DEFAULT_DELAY_VARIABILITY + // Then a commit of the new leader's first cluster state + + DEFAULT_CLUSTER_STATE_UPDATE_DELAY); + } + + public void testCannotSetInitialConfigurationTwice() { + final Cluster cluster = new Cluster(randomIntBetween(1, 5)); + cluster.runRandomly(); + cluster.setInitialConfigurationIfRequired(); + cluster.stabilise(); + + final Coordinator coordinator = cluster.getAnyNode().coordinator; + final CoordinationStateRejectedException exception = expectThrows(CoordinationStateRejectedException.class, + () -> coordinator.setInitialConfiguration(coordinator.getLastAcceptedState().getLastCommittedConfiguration())); + + assertThat(exception.getMessage(), is("Cannot set initial configuration: configuration has already been set")); + } + + public void testCannotSetInitialConfigurationWithoutQuorum() { + final Cluster cluster = new Cluster(randomIntBetween(1, 5)); + final Coordinator coordinator = cluster.getAnyNode().coordinator; + final VotingConfiguration unknownNodeConfiguration = new VotingConfiguration(Collections.singleton("unknown-node")); + final CoordinationStateRejectedException exception = expectThrows(CoordinationStateRejectedException.class, + () -> coordinator.setInitialConfiguration(unknownNodeConfiguration)); + assertThat(exception.getMessage(), is("Cannot set initial configuration: no quorum found yet")); + + // This is VERY BAD: setting a _different_ initial configuration. Yet it works if the first attempt will never be a quorum. + coordinator.setInitialConfiguration(new VotingConfiguration(Collections.singleton(coordinator.getLocalNode().getId()))); + cluster.stabilise(); + } + private static long defaultMillis(Setting setting) { return setting.get(Settings.EMPTY).millis() + Cluster.DEFAULT_DELAY_VARIABILITY; } @@ -495,6 +558,18 @@ void addNodes(int newNodesCount) { } } + void setInitialConfigurationIfRequired() { + if (clusterNodes.stream().allMatch(n -> n.coordinator.getLastAcceptedState().getLastAcceptedConfiguration().isEmpty())) { + assertThat("setting initial configuration may fail with disconnected nodes", disconnectedNodes, empty()); + assertThat("setting initial configuration may fail with blackholed nodes", blackholedNodes, empty()); + runFor(defaultMillis(DISCOVERY_FIND_PEERS_INTERVAL_SETTING) * 2, "discovery prior to setting initial configuration"); + final ClusterNode bootstrapNode = getAnyNode(); + bootstrapNode.applyInitialConfiguration(); + } else { + logger.info("--> setting initial configuration not required"); + } + } + void runRandomly() { // TODO supporting (preserving?) existing disruptions needs implementing if needed, for now we just forbid it @@ -555,6 +630,14 @@ void runRandomly() { } break; } + } else if (rarely()) { + final ClusterNode clusterNode = getAnyNode(); + onNode(clusterNode.getLocalNode(), + () -> { + logger.debug("----> [runRandomly {}] applying initial configuration {} to {}", + thisStep, initialConfiguration, clusterNode.getId()); + clusterNode.coordinator.setInitialConfiguration(initialConfiguration); + }).run(); } else { if (deterministicTaskQueue.hasDeferredTasks() && randomBoolean()) { deterministicTaskQueue.advanceTime(); @@ -566,7 +649,6 @@ void runRandomly() { // TODO other random steps: // - reboot a node // - abdicate leadership - // - bootstrap } catch (CoordinationStateRejectedException ignored) { // This is ok: it just means a message couldn't currently be handled. @@ -705,7 +787,7 @@ private void assertUniqueLeaderAndExpectedModes() { ClusterNode getAnyLeader() { List allLeaders = clusterNodes.stream().filter(ClusterNode::isLeader).collect(Collectors.toList()); - assertThat(allLeaders, not(empty())); + assertThat("leaders", allLeaders, not(empty())); return randomFrom(allLeaders); } @@ -758,8 +840,8 @@ class ClusterNode extends AbstractComponent { super(Settings.builder().put(NODE_NAME_SETTING.getKey(), nodeIdFromIndex(nodeIndex)).build()); this.nodeIndex = nodeIndex; localNode = createDiscoveryNode(); - persistedState = new InMemoryPersistedState(1L, - clusterState(1L, 1L, localNode, initialConfiguration, initialConfiguration, 0L)); + persistedState = new InMemoryPersistedState(0L, + clusterState(0L, 0L, localNode, VotingConfiguration.EMPTY_CONFIG, VotingConfiguration.EMPTY_CONFIG, 0L)); onNode(localNode, this::setUp).run(); } @@ -916,6 +998,17 @@ ClusterState getLastAppliedClusterState() { return clusterApplier.lastAppliedClusterState; } + void applyInitialConfiguration() { + onNode(localNode, () -> { + try { + coordinator.setInitialConfiguration(initialConfiguration); + logger.info("successfully set initial configuration to {}", initialConfiguration); + } catch (CoordinationStateRejectedException e) { + logger.info(new ParameterizedMessage("failed to set initial configuration to {}", initialConfiguration), e); + } + }).run(); + } + private class FakeClusterApplier implements ClusterApplier { final ClusterName clusterName; From 5c47f550140910e216e6fe00240a0e43cb9639ca Mon Sep 17 00:00:00 2001 From: David Turner Date: Mon, 8 Oct 2018 10:21:49 +0100 Subject: [PATCH 2/5] Better message in the case where a quorum has not been discovered --- .../cluster/coordination/Coordinator.java | 12 +++++++----- .../cluster/coordination/CoordinatorTests.java | 13 ++++++++++--- 2 files changed, 17 insertions(+), 8 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/cluster/coordination/Coordinator.java b/server/src/main/java/org/elasticsearch/cluster/coordination/Coordinator.java index c193d6e89c070..82f8d32c8bbe0 100644 --- a/server/src/main/java/org/elasticsearch/cluster/coordination/Coordinator.java +++ b/server/src/main/java/org/elasticsearch/cluster/coordination/Coordinator.java @@ -65,6 +65,7 @@ import java.util.Set; import java.util.concurrent.atomic.AtomicLong; import java.util.function.Supplier; +import java.util.stream.Collectors; import static org.elasticsearch.discovery.DiscoverySettings.NO_MASTER_BLOCK_WRITES; import static org.elasticsearch.gateway.GatewayService.STATE_NOT_RECOVERED_BLOCK; @@ -560,11 +561,12 @@ public void setInitialConfiguration(final VotingConfiguration votingConfiguratio throw new CoordinationStateRejectedException("Cannot set initial configuration in mode " + mode); } - final List foundPeerIds = new ArrayList<>(); - foundPeerIds.add(getLocalNode().getId()); - peerFinder.getFoundPeers().forEach(peer -> foundPeerIds.add(peer.getId())); - if (votingConfiguration.hasQuorum(foundPeerIds) == false) { - throw new CoordinationStateRejectedException("Cannot set initial configuration: no quorum found yet"); + final List knownNodes = new ArrayList<>(); + knownNodes.add(getLocalNode()); + peerFinder.getFoundPeers().forEach(knownNodes::add); + if (votingConfiguration.hasQuorum(knownNodes.stream().map(DiscoveryNode::getId).collect(Collectors.toList())) == false) { + throw new CoordinationStateRejectedException("not enough nodes discovered to form a quorum in the initial configuration " + + "[knownNodes=" + knownNodes + ", " + votingConfiguration + "]"); } logger.debug("setting initial configuration to {}", votingConfiguration); diff --git a/server/src/test/java/org/elasticsearch/cluster/coordination/CoordinatorTests.java b/server/src/test/java/org/elasticsearch/cluster/coordination/CoordinatorTests.java index 08de15e195d98..a59c1d9e4085f 100644 --- a/server/src/test/java/org/elasticsearch/cluster/coordination/CoordinatorTests.java +++ b/server/src/test/java/org/elasticsearch/cluster/coordination/CoordinatorTests.java @@ -85,12 +85,15 @@ import static org.elasticsearch.node.Node.NODE_NAME_SETTING; import static org.elasticsearch.transport.TransportService.HANDSHAKE_ACTION_NAME; import static org.elasticsearch.transport.TransportService.NOOP_TRANSPORT_INTERCEPTOR; +import static org.hamcrest.Matchers.containsString; import static org.hamcrest.Matchers.empty; +import static org.hamcrest.Matchers.endsWith; import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.greaterThanOrEqualTo; import static org.hamcrest.Matchers.is; import static org.hamcrest.Matchers.lessThanOrEqualTo; import static org.hamcrest.Matchers.not; +import static org.hamcrest.Matchers.startsWith; @TestLogging("org.elasticsearch.cluster.coordination:TRACE,org.elasticsearch.discovery:TRACE") public class CoordinatorTests extends ESTestCase { @@ -458,9 +461,13 @@ public void testCannotSetInitialConfigurationWithoutQuorum() { final Cluster cluster = new Cluster(randomIntBetween(1, 5)); final Coordinator coordinator = cluster.getAnyNode().coordinator; final VotingConfiguration unknownNodeConfiguration = new VotingConfiguration(Collections.singleton("unknown-node")); - final CoordinationStateRejectedException exception = expectThrows(CoordinationStateRejectedException.class, - () -> coordinator.setInitialConfiguration(unknownNodeConfiguration)); - assertThat(exception.getMessage(), is("Cannot set initial configuration: no quorum found yet")); + final String exceptionMessage = expectThrows(CoordinationStateRejectedException.class, + () -> coordinator.setInitialConfiguration(unknownNodeConfiguration)).getMessage(); + assertThat(exceptionMessage, + startsWith("not enough nodes discovered to form a quorum in the initial configuration [knownNodes=[")); + assertThat(exceptionMessage, + endsWith("], VotingConfiguration{unknown-node}]")); + assertThat(exceptionMessage, containsString(coordinator.getLocalNode().toString())); // This is VERY BAD: setting a _different_ initial configuration. Yet it works if the first attempt will never be a quorum. coordinator.setInitialConfiguration(new VotingConfiguration(Collections.singleton(coordinator.getLocalNode().getId()))); From dbb2b1ab1739456cdfa7edcfed7adf48d290ab6f Mon Sep 17 00:00:00 2001 From: David Turner Date: Mon, 8 Oct 2018 10:27:21 +0100 Subject: [PATCH 3/5] Review feedback --- .../org/elasticsearch/cluster/coordination/Coordinator.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/cluster/coordination/Coordinator.java b/server/src/main/java/org/elasticsearch/cluster/coordination/Coordinator.java index 82f8d32c8bbe0..3d5e8900738ca 100644 --- a/server/src/main/java/org/elasticsearch/cluster/coordination/Coordinator.java +++ b/server/src/main/java/org/elasticsearch/cluster/coordination/Coordinator.java @@ -569,13 +569,13 @@ public void setInitialConfiguration(final VotingConfiguration votingConfiguratio "[knownNodes=" + knownNodes + ", " + votingConfiguration + "]"); } - logger.debug("setting initial configuration to {}", votingConfiguration); + logger.info("setting initial configuration to {}", votingConfiguration); final Builder builder = masterService.incrementVersion(currentState); builder.lastAcceptedConfiguration(votingConfiguration); builder.lastCommittedConfiguration(votingConfiguration); coordinationState.get().setInitialState(builder.build()); - startElectionScheduler(); preVoteCollector.update(getPreVoteResponse(), null); // pick up the change to last-accepted version + startElectionScheduler(); } } From ef712a50a6904a7c5b4aa1132e6fbec1c54a65c1 Mon Sep 17 00:00:00 2001 From: David Turner Date: Mon, 8 Oct 2018 10:29:14 +0100 Subject: [PATCH 4/5] Set initial configuration at the start of stabilisation --- .../coordination/CoordinatorTests.java | 35 ++++++------------- 1 file changed, 11 insertions(+), 24 deletions(-) diff --git a/server/src/test/java/org/elasticsearch/cluster/coordination/CoordinatorTests.java b/server/src/test/java/org/elasticsearch/cluster/coordination/CoordinatorTests.java index a59c1d9e4085f..690b393b4d196 100644 --- a/server/src/test/java/org/elasticsearch/cluster/coordination/CoordinatorTests.java +++ b/server/src/test/java/org/elasticsearch/cluster/coordination/CoordinatorTests.java @@ -106,7 +106,6 @@ public void resetPortCounterBeforeEachTest() { public void testCanUpdateClusterStateAfterStabilisation() { final Cluster cluster = new Cluster(randomIntBetween(1, 5)); cluster.runRandomly(); - cluster.setInitialConfigurationIfRequired(); cluster.stabilise(); final ClusterNode leader = cluster.getAnyLeader(); @@ -126,7 +125,6 @@ public void testCanUpdateClusterStateAfterStabilisation() { public void testNodesJoinAfterStableCluster() { final Cluster cluster = new Cluster(randomIntBetween(1, 5)); cluster.runRandomly(); - cluster.setInitialConfigurationIfRequired(); cluster.stabilise(); final long currentTerm = cluster.getAnyLeader().coordinator.getCurrentTerm(); @@ -147,7 +145,6 @@ public void testNodesJoinAfterStableCluster() { public void testLeaderDisconnectionDetectedQuickly() { final Cluster cluster = new Cluster(randomIntBetween(3, 5)); cluster.runRandomly(); - cluster.setInitialConfigurationIfRequired(); cluster.stabilise(); final ClusterNode originalLeader = cluster.getAnyLeader(); @@ -184,7 +181,6 @@ public void testLeaderDisconnectionDetectedQuickly() { public void testUnresponsiveLeaderDetectedEventually() { final Cluster cluster = new Cluster(randomIntBetween(3, 5)); cluster.runRandomly(); - cluster.setInitialConfigurationIfRequired(); cluster.stabilise(); final ClusterNode originalLeader = cluster.getAnyLeader(); @@ -227,7 +223,6 @@ public void testUnresponsiveLeaderDetectedEventually() { public void testFollowerDisconnectionDetectedQuickly() { final Cluster cluster = new Cluster(randomIntBetween(3, 5)); cluster.runRandomly(); - cluster.setInitialConfigurationIfRequired(); cluster.stabilise(); final ClusterNode leader = cluster.getAnyLeader(); @@ -260,7 +255,6 @@ public void testFollowerDisconnectionDetectedQuickly() { public void testUnresponsiveFollowerDetectedEventually() { final Cluster cluster = new Cluster(randomIntBetween(3, 5)); cluster.runRandomly(); - cluster.setInitialConfigurationIfRequired(); cluster.stabilise(); final ClusterNode leader = cluster.getAnyLeader(); @@ -285,7 +279,6 @@ public void testUnresponsiveFollowerDetectedEventually() { public void testAckListenerReceivesAcksFromAllNodes() { final Cluster cluster = new Cluster(randomIntBetween(3, 5)); cluster.runRandomly(); - cluster.setInitialConfigurationIfRequired(); cluster.stabilise(); final ClusterNode leader = cluster.getAnyLeader(); AckCollector ackCollector = leader.submitValue(randomLong()); @@ -300,7 +293,6 @@ public void testAckListenerReceivesAcksFromAllNodes() { public void testAckListenerReceivesNackFromFollower() { final Cluster cluster = new Cluster(3); cluster.runRandomly(); - cluster.setInitialConfigurationIfRequired(); cluster.stabilise(); final ClusterNode leader = cluster.getAnyLeader(); final ClusterNode follower0 = cluster.getAnyNodeExcept(leader); @@ -318,7 +310,6 @@ public void testAckListenerReceivesNackFromFollower() { public void testAckListenerReceivesNackFromLeader() { final Cluster cluster = new Cluster(3); cluster.runRandomly(); - cluster.setInitialConfigurationIfRequired(); cluster.stabilise(); final ClusterNode leader = cluster.getAnyLeader(); final ClusterNode follower0 = cluster.getAnyNodeExcept(leader); @@ -340,7 +331,6 @@ public void testAckListenerReceivesNackFromLeader() { public void testAckListenerReceivesNoAckFromHangingFollower() { final Cluster cluster = new Cluster(3); cluster.runRandomly(); - cluster.setInitialConfigurationIfRequired(); cluster.stabilise(); final ClusterNode leader = cluster.getAnyLeader(); final ClusterNode follower0 = cluster.getAnyNodeExcept(leader); @@ -359,7 +349,6 @@ public void testAckListenerReceivesNoAckFromHangingFollower() { public void testAckListenerReceivesNacksIfPublicationTimesOut() { final Cluster cluster = new Cluster(3); cluster.runRandomly(); - cluster.setInitialConfigurationIfRequired(); cluster.stabilise(); final ClusterNode leader = cluster.getAnyLeader(); final ClusterNode follower0 = cluster.getAnyNodeExcept(leader); @@ -447,7 +436,6 @@ public void testSettingInitialConfigurationTriggersElection() { public void testCannotSetInitialConfigurationTwice() { final Cluster cluster = new Cluster(randomIntBetween(1, 5)); cluster.runRandomly(); - cluster.setInitialConfigurationIfRequired(); cluster.stabilise(); final Coordinator coordinator = cluster.getAnyNode().coordinator; @@ -565,18 +553,6 @@ void addNodes(int newNodesCount) { } } - void setInitialConfigurationIfRequired() { - if (clusterNodes.stream().allMatch(n -> n.coordinator.getLastAcceptedState().getLastAcceptedConfiguration().isEmpty())) { - assertThat("setting initial configuration may fail with disconnected nodes", disconnectedNodes, empty()); - assertThat("setting initial configuration may fail with blackholed nodes", blackholedNodes, empty()); - runFor(defaultMillis(DISCOVERY_FIND_PEERS_INTERVAL_SETTING) * 2, "discovery prior to setting initial configuration"); - final ClusterNode bootstrapNode = getAnyNode(); - bootstrapNode.applyInitialConfiguration(); - } else { - logger.info("--> setting initial configuration not required"); - } - } - void runRandomly() { // TODO supporting (preserving?) existing disruptions needs implementing if needed, for now we just forbid it @@ -695,6 +671,17 @@ void stabilise() { void stabilise(long stabilisationDurationMillis) { assertThat("stabilisation requires default delay variability (and proper cleanup of raised variability)", deterministicTaskQueue.getExecutionDelayVariabilityMillis(), lessThanOrEqualTo(DEFAULT_DELAY_VARIABILITY)); + + if (clusterNodes.stream().allMatch(n -> n.coordinator.getLastAcceptedState().getLastAcceptedConfiguration().isEmpty())) { + assertThat("setting initial configuration may fail with disconnected nodes", disconnectedNodes, empty()); + assertThat("setting initial configuration may fail with blackholed nodes", blackholedNodes, empty()); + runFor(defaultMillis(DISCOVERY_FIND_PEERS_INTERVAL_SETTING) * 2, "discovery prior to setting initial configuration"); + final ClusterNode bootstrapNode = getAnyNode(); + bootstrapNode.applyInitialConfiguration(); + } else { + logger.info("setting initial configuration not required"); + } + runFor(stabilisationDurationMillis, "stabilising"); fixLag(); assertUniqueLeaderAndExpectedModes(); From dcbacd8c1ad079187bbe05c76a10f9ebbfe44a44 Mon Sep 17 00:00:00 2001 From: David Turner Date: Mon, 8 Oct 2018 10:31:00 +0100 Subject: [PATCH 5/5] Describe why the first election should succeed --- .../elasticsearch/cluster/coordination/CoordinatorTests.java | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/server/src/test/java/org/elasticsearch/cluster/coordination/CoordinatorTests.java b/server/src/test/java/org/elasticsearch/cluster/coordination/CoordinatorTests.java index 690b393b4d196..5eec9703d08d7 100644 --- a/server/src/test/java/org/elasticsearch/cluster/coordination/CoordinatorTests.java +++ b/server/src/test/java/org/elasticsearch/cluster/coordination/CoordinatorTests.java @@ -425,7 +425,8 @@ public void testSettingInitialConfigurationTriggersElection() { cluster.getAnyNode().applyInitialConfiguration(); cluster.stabilise(defaultMillis( - // the first election should succeed + // the first election should succeed, because only one node knows of the initial configuration and therefore can win a + // pre-voting round and proceed to an election, so there cannot be any collisions ELECTION_INITIAL_TIMEOUT_SETTING) // TODO this wait is unnecessary, we could trigger the election immediately // Allow two round-trip for pre-voting and voting + 4 * DEFAULT_DELAY_VARIABILITY