Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Step down as master when configured out of voting configuration #37802

Merged
merged 13 commits into from
Jan 29, 2019
Merged
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,10 @@ The node that should be added to the exclusions list is specified using
<<cluster-nodes,node filters>> in place of `node_name` here. If a call to the
voting configuration exclusions API fails, you can safely retry it. Only a
successful response guarantees that the node has actually been removed from the
voting configuration and will not be reinstated.
voting configuration and will not be reinstated. If it's the active master that
was removed from the voting configuration, then it will abdicate to another
master-eligible node that's still in the voting configuration, if such a node
is available.

Although the voting configuration exclusions API is most useful for down-scaling
a two-node to a one-node cluster, it is also possible to use it to remove
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -112,6 +112,7 @@ public class Coordinator extends AbstractLifecycleComponent implements Discovery

private final PeerFinder peerFinder;
private final PreVoteCollector preVoteCollector;
private final Random random;
private final ElectionSchedulerFactory electionSchedulerFactory;
private final UnicastConfiguredHostsResolver configuredHostsResolver;
private final TimeValue publishTimeout;
Expand Down Expand Up @@ -153,6 +154,7 @@ public Coordinator(String nodeName, Settings settings, ClusterSettings clusterSe
this.lastJoin = Optional.empty();
this.joinAccumulator = new InitialJoinAccumulator();
this.publishTimeout = PUBLISH_TIMEOUT_SETTING.get(settings);
this.random = random;
this.electionSchedulerFactory = new ElectionSchedulerFactory(settings, random, transportService.getThreadPool());
this.preVoteCollector = new PreVoteCollector(transportService, this::startElection, this::updateMaxTermSeen);
configuredHostsResolver = new UnicastConfiguredHostsResolver(nodeName, settings, transportService, unicastHostsProvider);
Expand Down Expand Up @@ -366,11 +368,33 @@ private void startElection() {
}
}

private void abdicateTo(DiscoveryNode newMaster) {
assert Thread.holdsLock(mutex);
assert mode == Mode.LEADER : "expected to be leader on abdication but was " + mode;
assert newMaster.isMasterNode() : "should only abdicate to master-eligible node but was " + newMaster;
final StartJoinRequest startJoinRequest = new StartJoinRequest(newMaster, Math.max(getCurrentTerm(), maxTermSeen) + 1);
logger.info("abdicating to {} with term {}", newMaster, startJoinRequest.getTerm());
getLastAcceptedState().nodes().mastersFirstStream().forEach(node -> {
if (isZen1Node(node) == false) {
joinHelper.sendStartJoinRequest(startJoinRequest, node);
}
});
// handling of start join messages on the local node will be dispatched to the generic thread-pool
assert mode == Mode.LEADER : "should still be leader after sending abdication messages " + mode;
// explicitly move node to candidate state so that the next cluster state update task yields an onNoLongerMaster event
becomeCandidate("after abdicating to " + newMaster);
}

private static boolean electionQuorumContainsLocalNode(ClusterState lastAcceptedState) {
final String localNodeId = lastAcceptedState.nodes().getLocalNodeId();
assert localNodeId != null;
return lastAcceptedState.getLastCommittedConfiguration().getNodeIds().contains(localNodeId)
|| lastAcceptedState.getLastAcceptedConfiguration().getNodeIds().contains(localNodeId);
final DiscoveryNode localNode = lastAcceptedState.nodes().getLocalNode();
assert localNode != null;
return electionQuorumContains(lastAcceptedState, localNode);
}

private static boolean electionQuorumContains(ClusterState lastAcceptedState, DiscoveryNode node) {
final String nodeId = node.getId();
return lastAcceptedState.getLastCommittedConfiguration().getNodeIds().contains(nodeId)
|| lastAcceptedState.getLastAcceptedConfiguration().getNodeIds().contains(nodeId);
}

private Optional<Join> ensureTermAtLeast(DiscoveryNode sourceNode, long targetTerm) {
Expand Down Expand Up @@ -1184,7 +1208,18 @@ public void onSuccess(String source) {
updateMaxTermSeen(getCurrentTerm());

if (mode == Mode.LEADER) {
scheduleReconfigurationIfNeeded();
final ClusterState state = getLastAcceptedState(); // committed state
if (electionQuorumContainsLocalNode(state) == false) {
final List<DiscoveryNode> masterCandidates = completedNodes().stream()
.filter(DiscoveryNode::isMasterNode)
.filter(node -> electionQuorumContains(state, node))
.collect(Collectors.toList());
if (masterCandidates.isEmpty() == false) {
abdicateTo(masterCandidates.get(random.nextInt(masterCandidates.size())));
}
} else {
scheduleReconfigurationIfNeeded();
}
}
lagDetector.startLagDetector(publishRequest.getAcceptedState().version());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
import java.util.Optional;
import java.util.Set;
import java.util.function.LongSupplier;
import java.util.stream.Collectors;

public abstract class Publication {

Expand Down Expand Up @@ -92,6 +93,13 @@ public void onFaultyNode(DiscoveryNode faultyNode) {
onPossibleCompletion();
}

public List<DiscoveryNode> completedNodes() {
return publicationTargets.stream()
.filter(PublicationTarget::isSuccessfullyCompleted)
.map(PublicationTarget::getDiscoveryNode)
.collect(Collectors.toList());
}

public boolean isCommitted() {
return applyCommitRequest.isPresent();
}
Expand Down Expand Up @@ -268,6 +276,10 @@ void onFaultyNode(DiscoveryNode faultyNode) {
}
}

DiscoveryNode getDiscoveryNode() {
return discoveryNode;
}

private void ackOnce(Exception e) {
if (ackIsPending) {
ackIsPending = false;
Expand All @@ -280,6 +292,10 @@ boolean isActive() {
&& state != PublicationTargetState.APPLIED_COMMIT;
}

boolean isSuccessfullyCompleted() {
return state == PublicationTargetState.APPLIED_COMMIT;
}

boolean isWaitingForQuorum() {
return state == PublicationTargetState.WAITING_FOR_QUORUM;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,7 @@
import org.elasticsearch.test.disruption.DisruptableMockTransport.ConnectionStatus;
import org.elasticsearch.transport.TransportService;
import org.hamcrest.Matcher;
import org.hamcrest.core.IsCollectionContaining;
import org.junit.After;
import org.junit.Before;

Expand Down Expand Up @@ -1328,6 +1329,8 @@ void stabilise(long stabilisationDurationMillis) {
final VotingConfiguration lastCommittedConfiguration = lastAcceptedState.getLastCommittedConfiguration();
assertTrue(connectedNodeIds + " should be a quorum of " + lastCommittedConfiguration,
lastCommittedConfiguration.hasQuorum(connectedNodeIds));
assertThat("leader " + leader.getLocalNode() + " should be part of voting configuration " + lastCommittedConfiguration,
lastCommittedConfiguration.getNodeIds(), IsCollectionContaining.hasItem(leader.getLocalNode().getId()));

assertThat("no reconfiguration is in progress",
lastAcceptedState.getLastCommittedConfiguration(), equalTo(lastAcceptedState.getLastAcceptedConfiguration()));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@

import static org.hamcrest.Matchers.containsInAnyOrder;
import static org.hamcrest.Matchers.containsString;
import static org.hamcrest.Matchers.empty;
import static org.hamcrest.Matchers.equalTo;

public class PublicationTests extends ESTestCase {
Expand Down Expand Up @@ -178,6 +179,7 @@ public void testSimpleClusterStatePublishing() throws InterruptedException {
discoveryNodes, singleNodeConfig, singleNodeConfig, 42L), ackListener, Collections.emptySet());

assertThat(publication.pendingPublications.keySet(), equalTo(discoNodes));
assertThat(publication.completedNodes(), empty());
assertTrue(publication.pendingCommits.isEmpty());
AtomicBoolean processedNode1PublishResponse = new AtomicBoolean();
boolean delayProcessingNode2PublishResponse = randomBoolean();
Expand Down Expand Up @@ -232,10 +234,12 @@ public void testSimpleClusterStatePublishing() throws InterruptedException {

assertFalse(publication.completed);
assertFalse(publication.committed);
assertThat(publication.completedNodes(), containsInAnyOrder(n1, n3));
publication.pendingCommits.get(n2).onResponse(TransportResponse.Empty.INSTANCE);
}

assertTrue(publication.completed);
assertThat(publication.completedNodes(), containsInAnyOrder(n1, n2, n3));
assertTrue(publication.committed);

assertThat(ackListener.await(0L, TimeUnit.SECONDS), containsInAnyOrder(n1, n2, n3));
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.cluster.coordination;

import org.elasticsearch.action.admin.cluster.configuration.AddVotingConfigExclusionsAction;
import org.elasticsearch.action.admin.cluster.configuration.AddVotingConfigExclusionsRequest;
import org.elasticsearch.common.Priority;
import org.elasticsearch.test.ESIntegTestCase;

import java.util.concurrent.ExecutionException;

@ESIntegTestCase.ClusterScope(scope = ESIntegTestCase.Scope.TEST, numDataNodes = 0)
public class VotingConfigurationIT extends ESIntegTestCase {

public void testAbdicateAfterVotingConfigExclusionAdded() throws ExecutionException, InterruptedException {
internalCluster().startNodes(2);
final String originalMaster = internalCluster().getMasterName();

logger.info("--> excluding master node {}", originalMaster);
client().execute(AddVotingConfigExclusionsAction.INSTANCE,
new AddVotingConfigExclusionsRequest(new String[]{originalMaster})).get();
client().admin().cluster().prepareHealth().setWaitForEvents(Priority.LANGUID).get();
assertNotEquals(originalMaster, internalCluster().getMasterName());
}
}