diff --git a/server/src/main/java/org/elasticsearch/cluster/coordination/CoordinationState.java b/server/src/main/java/org/elasticsearch/cluster/coordination/CoordinationState.java index d5b9cdf6adfc3..fb7071e70806a 100644 --- a/server/src/main/java/org/elasticsearch/cluster/coordination/CoordinationState.java +++ b/server/src/main/java/org/elasticsearch/cluster/coordination/CoordinationState.java @@ -132,17 +132,17 @@ public void setInitialState(ClusterState initialState) { throw new CoordinationStateRejectedException("initial state already set: last-accepted version now " + lastAcceptedVersion); } - assert getLastAcceptedTerm() == 0; - assert getLastAcceptedConfiguration().isEmpty(); - assert getLastCommittedConfiguration().isEmpty(); - assert lastPublishedVersion == 0; - assert lastPublishedConfiguration.isEmpty(); + assert getLastAcceptedTerm() == 0 : getLastAcceptedTerm(); + assert getLastAcceptedConfiguration().isEmpty() : getLastAcceptedConfiguration(); + assert getLastCommittedConfiguration().isEmpty() : getLastCommittedConfiguration(); + assert lastPublishedVersion == 0 : lastAcceptedVersion; + assert lastPublishedConfiguration.isEmpty() : lastPublishedConfiguration; assert electionWon == false; - assert joinVotes.isEmpty(); - assert publishVotes.isEmpty(); + assert joinVotes.isEmpty() : joinVotes; + assert publishVotes.isEmpty() : publishVotes; - assert initialState.term() == 0; - assert initialState.version() == 1; + assert initialState.term() == 0 : initialState; + assert initialState.version() == 1 : initialState; assert initialState.getLastAcceptedConfiguration().isEmpty() == false; assert initialState.getLastCommittedConfiguration().isEmpty() == false; diff --git a/server/src/main/java/org/elasticsearch/cluster/coordination/Coordinator.java b/server/src/main/java/org/elasticsearch/cluster/coordination/Coordinator.java index a8e21da705ce7..3d5e8900738ca 100644 --- a/server/src/main/java/org/elasticsearch/cluster/coordination/Coordinator.java +++ b/server/src/main/java/org/elasticsearch/cluster/coordination/Coordinator.java @@ -24,6 +24,8 @@ import org.elasticsearch.cluster.ClusterChangedEvent; import org.elasticsearch.cluster.ClusterName; import org.elasticsearch.cluster.ClusterState; +import org.elasticsearch.cluster.ClusterState.Builder; +import org.elasticsearch.cluster.ClusterState.VotingConfiguration; import org.elasticsearch.cluster.ClusterStateTaskConfig; import org.elasticsearch.cluster.block.ClusterBlocks; import org.elasticsearch.cluster.coordination.FollowersChecker.FollowerCheckRequest; @@ -63,6 +65,7 @@ import java.util.Set; import java.util.concurrent.atomic.AtomicLong; import java.util.function.Supplier; +import java.util.stream.Collectors; import static org.elasticsearch.discovery.DiscoverySettings.NO_MASTER_BLOCK_WRITES; import static org.elasticsearch.gateway.GatewayService.STATE_NOT_RECOVERED_BLOCK; @@ -480,6 +483,8 @@ public void invariant() { assert followersChecker.getFastResponseState().term == getCurrentTerm() : followersChecker.getFastResponseState(); assert followersChecker.getFastResponseState().mode == getMode() : followersChecker.getFastResponseState(); assert (applierState.nodes().getMasterNodeId() == null) == applierState.blocks().hasGlobalBlock(NO_MASTER_BLOCK_WRITES.id()); + assert preVoteCollector.getPreVoteResponse().equals(getPreVoteResponse()) + : preVoteCollector + " vs " + getPreVoteResponse(); if (mode == Mode.LEADER) { final boolean becomingMaster = getStateForMasterService().term() != getCurrentTerm(); @@ -493,7 +498,6 @@ public void invariant() { assert leaderCheckScheduler == null : leaderCheckScheduler; assert applierState.nodes().getMasterNodeId() == null || getLocalNode().equals(applierState.nodes().getMasterNode()); assert preVoteCollector.getLeader() == getLocalNode() : preVoteCollector; - assert preVoteCollector.getPreVoteResponse().equals(getPreVoteResponse()) : preVoteCollector; final boolean activePublication = currentPublication.map(CoordinatorPublication::isActiveForCurrentLeader).orElse(false); if (becomingMaster && activePublication == false) { @@ -527,7 +531,6 @@ public void invariant() { assert followersChecker.getKnownFollowers().isEmpty(); assert currentPublication.map(Publication::isCommitted).orElse(true); assert preVoteCollector.getLeader().equals(lastKnownLeader.get()) : preVoteCollector; - assert preVoteCollector.getPreVoteResponse().equals(getPreVoteResponse()) : preVoteCollector; } else { assert mode == Mode.CANDIDATE; assert joinAccumulator instanceof JoinHelper.CandidateJoinAccumulator; @@ -540,11 +543,42 @@ public void invariant() { assert applierState.nodes().getMasterNodeId() == null; assert currentPublication.map(Publication::isCommitted).orElse(true); assert preVoteCollector.getLeader() == null : preVoteCollector; - assert preVoteCollector.getPreVoteResponse().equals(getPreVoteResponse()) : preVoteCollector; } } } + public void setInitialConfiguration(final VotingConfiguration votingConfiguration) { + synchronized (mutex) { + final ClusterState currentState = getStateForMasterService(); + + if (currentState.getLastAcceptedConfiguration().isEmpty() == false) { + throw new CoordinationStateRejectedException("Cannot set initial configuration: configuration has already been set"); + } + assert currentState.term() == 0 : currentState; + assert currentState.version() == 0 : currentState; + + if (mode != Mode.CANDIDATE) { + throw new CoordinationStateRejectedException("Cannot set initial configuration in mode " + mode); + } + + final List knownNodes = new ArrayList<>(); + knownNodes.add(getLocalNode()); + peerFinder.getFoundPeers().forEach(knownNodes::add); + if (votingConfiguration.hasQuorum(knownNodes.stream().map(DiscoveryNode::getId).collect(Collectors.toList())) == false) { + throw new CoordinationStateRejectedException("not enough nodes discovered to form a quorum in the initial configuration " + + "[knownNodes=" + knownNodes + ", " + votingConfiguration + "]"); + } + + logger.info("setting initial configuration to {}", votingConfiguration); + final Builder builder = masterService.incrementVersion(currentState); + builder.lastAcceptedConfiguration(votingConfiguration); + builder.lastCommittedConfiguration(votingConfiguration); + coordinationState.get().setInitialState(builder.build()); + preVoteCollector.update(getPreVoteResponse(), null); // pick up the change to last-accepted version + startElectionScheduler(); + } + } + // for tests boolean hasJoinVoteFrom(DiscoveryNode localNode) { return coordinationState.get().containsJoinVoteFor(localNode); @@ -731,25 +765,7 @@ protected void onFoundPeersUpdated() { if (foundQuorum) { if (electionScheduler == null) { - final TimeValue gracePeriod = TimeValue.ZERO; // TODO variable grace period - electionScheduler = electionSchedulerFactory.startElectionScheduler(gracePeriod, new Runnable() { - @Override - public void run() { - synchronized (mutex) { - if (mode == Mode.CANDIDATE) { - if (prevotingRound != null) { - prevotingRound.close(); - } - prevotingRound = preVoteCollector.start(lastAcceptedState, getDiscoveredNodes()); - } - } - } - - @Override - public String toString() { - return "scheduling of new prevoting round"; - } - }); + startElectionScheduler(); } } else { closePrevotingAndElectionScheduler(); @@ -759,6 +775,30 @@ public String toString() { } } + private void startElectionScheduler() { + assert electionScheduler == null : electionScheduler; + final TimeValue gracePeriod = TimeValue.ZERO; // TODO variable grace period + electionScheduler = electionSchedulerFactory.startElectionScheduler(gracePeriod, new Runnable() { + @Override + public void run() { + synchronized (mutex) { + if (mode == Mode.CANDIDATE) { + if (prevotingRound != null) { + prevotingRound.close(); + } + final ClusterState lastAcceptedState = coordinationState.get().getLastAcceptedState(); + prevotingRound = preVoteCollector.start(lastAcceptedState, getDiscoveredNodes()); + } + } + } + + @Override + public String toString() { + return "scheduling of new prevoting round"; + } + }); + } + class CoordinatorPublication extends Publication { private final PublishRequest publishRequest; diff --git a/server/src/main/java/org/elasticsearch/cluster/service/MasterService.java b/server/src/main/java/org/elasticsearch/cluster/service/MasterService.java index 59e4fc3852299..8719baeff9dbc 100644 --- a/server/src/main/java/org/elasticsearch/cluster/service/MasterService.java +++ b/server/src/main/java/org/elasticsearch/cluster/service/MasterService.java @@ -329,7 +329,7 @@ private ClusterState patchVersions(ClusterState previousClusterState, ClusterTas return newClusterState; } - protected Builder incrementVersion(ClusterState clusterState) { + public Builder incrementVersion(ClusterState clusterState) { return ClusterState.builder(clusterState).incrementVersion(); } diff --git a/server/src/test/java/org/elasticsearch/cluster/coordination/CoordinatorTests.java b/server/src/test/java/org/elasticsearch/cluster/coordination/CoordinatorTests.java index c1377c7d8419b..5eec9703d08d7 100644 --- a/server/src/test/java/org/elasticsearch/cluster/coordination/CoordinatorTests.java +++ b/server/src/test/java/org/elasticsearch/cluster/coordination/CoordinatorTests.java @@ -51,6 +51,7 @@ import java.util.ArrayList; import java.util.Arrays; +import java.util.Collections; import java.util.HashMap; import java.util.HashSet; import java.util.List; @@ -84,12 +85,15 @@ import static org.elasticsearch.node.Node.NODE_NAME_SETTING; import static org.elasticsearch.transport.TransportService.HANDSHAKE_ACTION_NAME; import static org.elasticsearch.transport.TransportService.NOOP_TRANSPORT_INTERCEPTOR; +import static org.hamcrest.Matchers.containsString; import static org.hamcrest.Matchers.empty; +import static org.hamcrest.Matchers.endsWith; import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.greaterThanOrEqualTo; import static org.hamcrest.Matchers.is; import static org.hamcrest.Matchers.lessThanOrEqualTo; import static org.hamcrest.Matchers.not; +import static org.hamcrest.Matchers.startsWith; @TestLogging("org.elasticsearch.cluster.coordination:TRACE,org.elasticsearch.discovery:TRACE") public class CoordinatorTests extends ESTestCase { @@ -404,6 +408,61 @@ public void testAckListenerReceivesNacksFromFollowerInHigherTerm() { // assertTrue("expected ack from " + follower1, ackCollector.hasAckedSuccessfully(follower1)); } + public void testSettingInitialConfigurationTriggersElection() { + final Cluster cluster = new Cluster(randomIntBetween(1, 5)); + cluster.runFor(defaultMillis(DISCOVERY_FIND_PEERS_INTERVAL_SETTING) * 2 + randomLongBetween(0, 60000), "initial discovery phase"); + for (final ClusterNode clusterNode : cluster.clusterNodes) { + final String nodeId = clusterNode.getId(); + assertThat(nodeId + " is CANDIDATE", clusterNode.coordinator.getMode(), is(CANDIDATE)); + assertThat(nodeId + " is in term 0", clusterNode.coordinator.getCurrentTerm(), is(0L)); + assertThat(nodeId + " last accepted in term 0", clusterNode.coordinator.getLastAcceptedState().term(), is(0L)); + assertThat(nodeId + " last accepted version 0", clusterNode.coordinator.getLastAcceptedState().version(), is(0L)); + assertTrue(nodeId + " has an empty last-accepted configuration", + clusterNode.coordinator.getLastAcceptedState().getLastAcceptedConfiguration().isEmpty()); + assertTrue(nodeId + " has an empty last-committed configuration", + clusterNode.coordinator.getLastAcceptedState().getLastCommittedConfiguration().isEmpty()); + } + + cluster.getAnyNode().applyInitialConfiguration(); + cluster.stabilise(defaultMillis( + // the first election should succeed, because only one node knows of the initial configuration and therefore can win a + // pre-voting round and proceed to an election, so there cannot be any collisions + ELECTION_INITIAL_TIMEOUT_SETTING) // TODO this wait is unnecessary, we could trigger the election immediately + // Allow two round-trip for pre-voting and voting + + 4 * DEFAULT_DELAY_VARIABILITY + // Then a commit of the new leader's first cluster state + + DEFAULT_CLUSTER_STATE_UPDATE_DELAY); + } + + public void testCannotSetInitialConfigurationTwice() { + final Cluster cluster = new Cluster(randomIntBetween(1, 5)); + cluster.runRandomly(); + cluster.stabilise(); + + final Coordinator coordinator = cluster.getAnyNode().coordinator; + final CoordinationStateRejectedException exception = expectThrows(CoordinationStateRejectedException.class, + () -> coordinator.setInitialConfiguration(coordinator.getLastAcceptedState().getLastCommittedConfiguration())); + + assertThat(exception.getMessage(), is("Cannot set initial configuration: configuration has already been set")); + } + + public void testCannotSetInitialConfigurationWithoutQuorum() { + final Cluster cluster = new Cluster(randomIntBetween(1, 5)); + final Coordinator coordinator = cluster.getAnyNode().coordinator; + final VotingConfiguration unknownNodeConfiguration = new VotingConfiguration(Collections.singleton("unknown-node")); + final String exceptionMessage = expectThrows(CoordinationStateRejectedException.class, + () -> coordinator.setInitialConfiguration(unknownNodeConfiguration)).getMessage(); + assertThat(exceptionMessage, + startsWith("not enough nodes discovered to form a quorum in the initial configuration [knownNodes=[")); + assertThat(exceptionMessage, + endsWith("], VotingConfiguration{unknown-node}]")); + assertThat(exceptionMessage, containsString(coordinator.getLocalNode().toString())); + + // This is VERY BAD: setting a _different_ initial configuration. Yet it works if the first attempt will never be a quorum. + coordinator.setInitialConfiguration(new VotingConfiguration(Collections.singleton(coordinator.getLocalNode().getId()))); + cluster.stabilise(); + } + private static long defaultMillis(Setting setting) { return setting.get(Settings.EMPTY).millis() + Cluster.DEFAULT_DELAY_VARIABILITY; } @@ -555,6 +614,14 @@ void runRandomly() { } break; } + } else if (rarely()) { + final ClusterNode clusterNode = getAnyNode(); + onNode(clusterNode.getLocalNode(), + () -> { + logger.debug("----> [runRandomly {}] applying initial configuration {} to {}", + thisStep, initialConfiguration, clusterNode.getId()); + clusterNode.coordinator.setInitialConfiguration(initialConfiguration); + }).run(); } else { if (deterministicTaskQueue.hasDeferredTasks() && randomBoolean()) { deterministicTaskQueue.advanceTime(); @@ -566,7 +633,6 @@ void runRandomly() { // TODO other random steps: // - reboot a node // - abdicate leadership - // - bootstrap } catch (CoordinationStateRejectedException ignored) { // This is ok: it just means a message couldn't currently be handled. @@ -606,6 +672,17 @@ void stabilise() { void stabilise(long stabilisationDurationMillis) { assertThat("stabilisation requires default delay variability (and proper cleanup of raised variability)", deterministicTaskQueue.getExecutionDelayVariabilityMillis(), lessThanOrEqualTo(DEFAULT_DELAY_VARIABILITY)); + + if (clusterNodes.stream().allMatch(n -> n.coordinator.getLastAcceptedState().getLastAcceptedConfiguration().isEmpty())) { + assertThat("setting initial configuration may fail with disconnected nodes", disconnectedNodes, empty()); + assertThat("setting initial configuration may fail with blackholed nodes", blackholedNodes, empty()); + runFor(defaultMillis(DISCOVERY_FIND_PEERS_INTERVAL_SETTING) * 2, "discovery prior to setting initial configuration"); + final ClusterNode bootstrapNode = getAnyNode(); + bootstrapNode.applyInitialConfiguration(); + } else { + logger.info("setting initial configuration not required"); + } + runFor(stabilisationDurationMillis, "stabilising"); fixLag(); assertUniqueLeaderAndExpectedModes(); @@ -705,7 +782,7 @@ private void assertUniqueLeaderAndExpectedModes() { ClusterNode getAnyLeader() { List allLeaders = clusterNodes.stream().filter(ClusterNode::isLeader).collect(Collectors.toList()); - assertThat(allLeaders, not(empty())); + assertThat("leaders", allLeaders, not(empty())); return randomFrom(allLeaders); } @@ -758,8 +835,8 @@ class ClusterNode extends AbstractComponent { super(Settings.builder().put(NODE_NAME_SETTING.getKey(), nodeIdFromIndex(nodeIndex)).build()); this.nodeIndex = nodeIndex; localNode = createDiscoveryNode(); - persistedState = new InMemoryPersistedState(1L, - clusterState(1L, 1L, localNode, initialConfiguration, initialConfiguration, 0L)); + persistedState = new InMemoryPersistedState(0L, + clusterState(0L, 0L, localNode, VotingConfiguration.EMPTY_CONFIG, VotingConfiguration.EMPTY_CONFIG, 0L)); onNode(localNode, this::setUp).run(); } @@ -916,6 +993,17 @@ ClusterState getLastAppliedClusterState() { return clusterApplier.lastAppliedClusterState; } + void applyInitialConfiguration() { + onNode(localNode, () -> { + try { + coordinator.setInitialConfiguration(initialConfiguration); + logger.info("successfully set initial configuration to {}", initialConfiguration); + } catch (CoordinationStateRejectedException e) { + logger.info(new ParameterizedMessage("failed to set initial configuration to {}", initialConfiguration), e); + } + }).run(); + } + private class FakeClusterApplier implements ClusterApplier { final ClusterName clusterName;