Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Zen2] Add low-level bootstrap implementation #34345

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -132,17 +132,17 @@ public void setInitialState(ClusterState initialState) {
throw new CoordinationStateRejectedException("initial state already set: last-accepted version now " + lastAcceptedVersion);
}

assert getLastAcceptedTerm() == 0;
assert getLastAcceptedConfiguration().isEmpty();
assert getLastCommittedConfiguration().isEmpty();
assert lastPublishedVersion == 0;
assert lastPublishedConfiguration.isEmpty();
assert getLastAcceptedTerm() == 0 : getLastAcceptedTerm();
assert getLastAcceptedConfiguration().isEmpty() : getLastAcceptedConfiguration();
assert getLastCommittedConfiguration().isEmpty() : getLastCommittedConfiguration();
assert lastPublishedVersion == 0 : lastAcceptedVersion;
assert lastPublishedConfiguration.isEmpty() : lastPublishedConfiguration;
assert electionWon == false;
assert joinVotes.isEmpty();
assert publishVotes.isEmpty();
assert joinVotes.isEmpty() : joinVotes;
assert publishVotes.isEmpty() : publishVotes;

assert initialState.term() == 0;
assert initialState.version() == 1;
assert initialState.term() == 0 : initialState;
assert initialState.version() == 1 : initialState;
assert initialState.getLastAcceptedConfiguration().isEmpty() == false;
assert initialState.getLastCommittedConfiguration().isEmpty() == false;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@
import org.elasticsearch.cluster.ClusterChangedEvent;
import org.elasticsearch.cluster.ClusterName;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.ClusterState.Builder;
import org.elasticsearch.cluster.ClusterState.VotingConfiguration;
import org.elasticsearch.cluster.ClusterStateTaskConfig;
import org.elasticsearch.cluster.block.ClusterBlocks;
import org.elasticsearch.cluster.coordination.FollowersChecker.FollowerCheckRequest;
Expand Down Expand Up @@ -63,6 +65,7 @@
import java.util.Set;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Supplier;
import java.util.stream.Collectors;

import static org.elasticsearch.discovery.DiscoverySettings.NO_MASTER_BLOCK_WRITES;
import static org.elasticsearch.gateway.GatewayService.STATE_NOT_RECOVERED_BLOCK;
Expand Down Expand Up @@ -480,6 +483,8 @@ public void invariant() {
assert followersChecker.getFastResponseState().term == getCurrentTerm() : followersChecker.getFastResponseState();
assert followersChecker.getFastResponseState().mode == getMode() : followersChecker.getFastResponseState();
assert (applierState.nodes().getMasterNodeId() == null) == applierState.blocks().hasGlobalBlock(NO_MASTER_BLOCK_WRITES.id());
assert preVoteCollector.getPreVoteResponse().equals(getPreVoteResponse())
: preVoteCollector + " vs " + getPreVoteResponse();
if (mode == Mode.LEADER) {
final boolean becomingMaster = getStateForMasterService().term() != getCurrentTerm();

Expand All @@ -493,7 +498,6 @@ public void invariant() {
assert leaderCheckScheduler == null : leaderCheckScheduler;
assert applierState.nodes().getMasterNodeId() == null || getLocalNode().equals(applierState.nodes().getMasterNode());
assert preVoteCollector.getLeader() == getLocalNode() : preVoteCollector;
assert preVoteCollector.getPreVoteResponse().equals(getPreVoteResponse()) : preVoteCollector;

final boolean activePublication = currentPublication.map(CoordinatorPublication::isActiveForCurrentLeader).orElse(false);
if (becomingMaster && activePublication == false) {
Expand Down Expand Up @@ -527,7 +531,6 @@ public void invariant() {
assert followersChecker.getKnownFollowers().isEmpty();
assert currentPublication.map(Publication::isCommitted).orElse(true);
assert preVoteCollector.getLeader().equals(lastKnownLeader.get()) : preVoteCollector;
assert preVoteCollector.getPreVoteResponse().equals(getPreVoteResponse()) : preVoteCollector;
} else {
assert mode == Mode.CANDIDATE;
assert joinAccumulator instanceof JoinHelper.CandidateJoinAccumulator;
Expand All @@ -540,11 +543,42 @@ public void invariant() {
assert applierState.nodes().getMasterNodeId() == null;
assert currentPublication.map(Publication::isCommitted).orElse(true);
assert preVoteCollector.getLeader() == null : preVoteCollector;
assert preVoteCollector.getPreVoteResponse().equals(getPreVoteResponse()) : preVoteCollector;
}
}
}

public void setInitialConfiguration(final VotingConfiguration votingConfiguration) {
synchronized (mutex) {
final ClusterState currentState = getStateForMasterService();

if (currentState.getLastAcceptedConfiguration().isEmpty() == false) {
throw new CoordinationStateRejectedException("Cannot set initial configuration: configuration has already been set");
}
assert currentState.term() == 0 : currentState;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this duplicating the checks in CoordinationState.setInitialState?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, except that these checks happen earlier.

assert currentState.version() == 0 : currentState;

if (mode != Mode.CANDIDATE) {
throw new CoordinationStateRejectedException("Cannot set initial configuration in mode " + mode);
}

final List<DiscoveryNode> knownNodes = new ArrayList<>();
knownNodes.add(getLocalNode());
peerFinder.getFoundPeers().forEach(knownNodes::add);
if (votingConfiguration.hasQuorum(knownNodes.stream().map(DiscoveryNode::getId).collect(Collectors.toList())) == false) {
throw new CoordinationStateRejectedException("not enough nodes discovered to form a quorum in the initial configuration " +
"[knownNodes=" + knownNodes + ", " + votingConfiguration + "]");
DaveCTurner marked this conversation as resolved.
Show resolved Hide resolved
}

logger.info("setting initial configuration to {}", votingConfiguration);
final Builder builder = masterService.incrementVersion(currentState);
builder.lastAcceptedConfiguration(votingConfiguration);
builder.lastCommittedConfiguration(votingConfiguration);
coordinationState.get().setInitialState(builder.build());
preVoteCollector.update(getPreVoteResponse(), null); // pick up the change to last-accepted version
DaveCTurner marked this conversation as resolved.
Show resolved Hide resolved
startElectionScheduler();
}
}

// for tests
boolean hasJoinVoteFrom(DiscoveryNode localNode) {
return coordinationState.get().containsJoinVoteFor(localNode);
Expand Down Expand Up @@ -731,25 +765,7 @@ protected void onFoundPeersUpdated() {

if (foundQuorum) {
if (electionScheduler == null) {
final TimeValue gracePeriod = TimeValue.ZERO; // TODO variable grace period
electionScheduler = electionSchedulerFactory.startElectionScheduler(gracePeriod, new Runnable() {
@Override
public void run() {
synchronized (mutex) {
if (mode == Mode.CANDIDATE) {
if (prevotingRound != null) {
prevotingRound.close();
}
prevotingRound = preVoteCollector.start(lastAcceptedState, getDiscoveredNodes());
}
}
}

@Override
public String toString() {
return "scheduling of new prevoting round";
}
});
startElectionScheduler();
}
} else {
closePrevotingAndElectionScheduler();
Expand All @@ -759,6 +775,30 @@ public String toString() {
}
}

private void startElectionScheduler() {
assert electionScheduler == null : electionScheduler;
final TimeValue gracePeriod = TimeValue.ZERO; // TODO variable grace period
electionScheduler = electionSchedulerFactory.startElectionScheduler(gracePeriod, new Runnable() {
@Override
public void run() {
synchronized (mutex) {
if (mode == Mode.CANDIDATE) {
if (prevotingRound != null) {
prevotingRound.close();
}
final ClusterState lastAcceptedState = coordinationState.get().getLastAcceptedState();
prevotingRound = preVoteCollector.start(lastAcceptedState, getDiscoveredNodes());
}
}
}

@Override
public String toString() {
return "scheduling of new prevoting round";
}
});
}

class CoordinatorPublication extends Publication {

private final PublishRequest publishRequest;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -329,7 +329,7 @@ private ClusterState patchVersions(ClusterState previousClusterState, ClusterTas
return newClusterState;
}

protected Builder incrementVersion(ClusterState clusterState) {
public Builder incrementVersion(ClusterState clusterState) {
return ClusterState.builder(clusterState).incrementVersion();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@

import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
Expand Down Expand Up @@ -84,12 +85,15 @@
import static org.elasticsearch.node.Node.NODE_NAME_SETTING;
import static org.elasticsearch.transport.TransportService.HANDSHAKE_ACTION_NAME;
import static org.elasticsearch.transport.TransportService.NOOP_TRANSPORT_INTERCEPTOR;
import static org.hamcrest.Matchers.containsString;
import static org.hamcrest.Matchers.empty;
import static org.hamcrest.Matchers.endsWith;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.greaterThanOrEqualTo;
import static org.hamcrest.Matchers.is;
import static org.hamcrest.Matchers.lessThanOrEqualTo;
import static org.hamcrest.Matchers.not;
import static org.hamcrest.Matchers.startsWith;

@TestLogging("org.elasticsearch.cluster.coordination:TRACE,org.elasticsearch.discovery:TRACE")
public class CoordinatorTests extends ESTestCase {
Expand Down Expand Up @@ -404,6 +408,61 @@ public void testAckListenerReceivesNacksFromFollowerInHigherTerm() {
// assertTrue("expected ack from " + follower1, ackCollector.hasAckedSuccessfully(follower1));
}

public void testSettingInitialConfigurationTriggersElection() {
final Cluster cluster = new Cluster(randomIntBetween(1, 5));
cluster.runFor(defaultMillis(DISCOVERY_FIND_PEERS_INTERVAL_SETTING) * 2 + randomLongBetween(0, 60000), "initial discovery phase");
for (final ClusterNode clusterNode : cluster.clusterNodes) {
final String nodeId = clusterNode.getId();
assertThat(nodeId + " is CANDIDATE", clusterNode.coordinator.getMode(), is(CANDIDATE));
assertThat(nodeId + " is in term 0", clusterNode.coordinator.getCurrentTerm(), is(0L));
assertThat(nodeId + " last accepted in term 0", clusterNode.coordinator.getLastAcceptedState().term(), is(0L));
assertThat(nodeId + " last accepted version 0", clusterNode.coordinator.getLastAcceptedState().version(), is(0L));
assertTrue(nodeId + " has an empty last-accepted configuration",
clusterNode.coordinator.getLastAcceptedState().getLastAcceptedConfiguration().isEmpty());
assertTrue(nodeId + " has an empty last-committed configuration",
clusterNode.coordinator.getLastAcceptedState().getLastCommittedConfiguration().isEmpty());
}

cluster.getAnyNode().applyInitialConfiguration();
cluster.stabilise(defaultMillis(
// the first election should succeed, because only one node knows of the initial configuration and therefore can win a
// pre-voting round and proceed to an election, so there cannot be any collisions
ELECTION_INITIAL_TIMEOUT_SETTING) // TODO this wait is unnecessary, we could trigger the election immediately
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why does the first election succeed?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In this case only one node has a nonempty configuration, so it's the only one that can pass through prevoting and win the election, so there's no election collisions. I expanded the comment in dcbacd8.

// Allow two round-trip for pre-voting and voting
+ 4 * DEFAULT_DELAY_VARIABILITY
// Then a commit of the new leader's first cluster state
+ DEFAULT_CLUSTER_STATE_UPDATE_DELAY);
}

public void testCannotSetInitialConfigurationTwice() {
final Cluster cluster = new Cluster(randomIntBetween(1, 5));
cluster.runRandomly();
cluster.stabilise();

final Coordinator coordinator = cluster.getAnyNode().coordinator;
final CoordinationStateRejectedException exception = expectThrows(CoordinationStateRejectedException.class,
() -> coordinator.setInitialConfiguration(coordinator.getLastAcceptedState().getLastCommittedConfiguration()));

assertThat(exception.getMessage(), is("Cannot set initial configuration: configuration has already been set"));
}

public void testCannotSetInitialConfigurationWithoutQuorum() {
final Cluster cluster = new Cluster(randomIntBetween(1, 5));
final Coordinator coordinator = cluster.getAnyNode().coordinator;
final VotingConfiguration unknownNodeConfiguration = new VotingConfiguration(Collections.singleton("unknown-node"));
final String exceptionMessage = expectThrows(CoordinationStateRejectedException.class,
() -> coordinator.setInitialConfiguration(unknownNodeConfiguration)).getMessage();
assertThat(exceptionMessage,
startsWith("not enough nodes discovered to form a quorum in the initial configuration [knownNodes=["));
assertThat(exceptionMessage,
endsWith("], VotingConfiguration{unknown-node}]"));
assertThat(exceptionMessage, containsString(coordinator.getLocalNode().toString()));

// This is VERY BAD: setting a _different_ initial configuration. Yet it works if the first attempt will never be a quorum.
coordinator.setInitialConfiguration(new VotingConfiguration(Collections.singleton(coordinator.getLocalNode().getId())));
cluster.stabilise();
}

private static long defaultMillis(Setting<TimeValue> setting) {
return setting.get(Settings.EMPTY).millis() + Cluster.DEFAULT_DELAY_VARIABILITY;
}
Expand Down Expand Up @@ -555,6 +614,14 @@ void runRandomly() {
}
break;
}
} else if (rarely()) {
final ClusterNode clusterNode = getAnyNode();
onNode(clusterNode.getLocalNode(),
() -> {
logger.debug("----> [runRandomly {}] applying initial configuration {} to {}",
thisStep, initialConfiguration, clusterNode.getId());
clusterNode.coordinator.setInitialConfiguration(initialConfiguration);
}).run();
} else {
if (deterministicTaskQueue.hasDeferredTasks() && randomBoolean()) {
deterministicTaskQueue.advanceTime();
Expand All @@ -566,7 +633,6 @@ void runRandomly() {
// TODO other random steps:
// - reboot a node
// - abdicate leadership
// - bootstrap

} catch (CoordinationStateRejectedException ignored) {
// This is ok: it just means a message couldn't currently be handled.
Expand Down Expand Up @@ -606,6 +672,17 @@ void stabilise() {
void stabilise(long stabilisationDurationMillis) {
assertThat("stabilisation requires default delay variability (and proper cleanup of raised variability)",
deterministicTaskQueue.getExecutionDelayVariabilityMillis(), lessThanOrEqualTo(DEFAULT_DELAY_VARIABILITY));

if (clusterNodes.stream().allMatch(n -> n.coordinator.getLastAcceptedState().getLastAcceptedConfiguration().isEmpty())) {
assertThat("setting initial configuration may fail with disconnected nodes", disconnectedNodes, empty());
assertThat("setting initial configuration may fail with blackholed nodes", blackholedNodes, empty());
runFor(defaultMillis(DISCOVERY_FIND_PEERS_INTERVAL_SETTING) * 2, "discovery prior to setting initial configuration");
final ClusterNode bootstrapNode = getAnyNode();
bootstrapNode.applyInitialConfiguration();
} else {
logger.info("setting initial configuration not required");
}

runFor(stabilisationDurationMillis, "stabilising");
fixLag();
assertUniqueLeaderAndExpectedModes();
Expand Down Expand Up @@ -705,7 +782,7 @@ private void assertUniqueLeaderAndExpectedModes() {

ClusterNode getAnyLeader() {
List<ClusterNode> allLeaders = clusterNodes.stream().filter(ClusterNode::isLeader).collect(Collectors.toList());
assertThat(allLeaders, not(empty()));
assertThat("leaders", allLeaders, not(empty()));
return randomFrom(allLeaders);
}

Expand Down Expand Up @@ -758,8 +835,8 @@ class ClusterNode extends AbstractComponent {
super(Settings.builder().put(NODE_NAME_SETTING.getKey(), nodeIdFromIndex(nodeIndex)).build());
this.nodeIndex = nodeIndex;
localNode = createDiscoveryNode();
persistedState = new InMemoryPersistedState(1L,
clusterState(1L, 1L, localNode, initialConfiguration, initialConfiguration, 0L));
persistedState = new InMemoryPersistedState(0L,
clusterState(0L, 0L, localNode, VotingConfiguration.EMPTY_CONFIG, VotingConfiguration.EMPTY_CONFIG, 0L));
onNode(localNode, this::setUp).run();
}

Expand Down Expand Up @@ -916,6 +993,17 @@ ClusterState getLastAppliedClusterState() {
return clusterApplier.lastAppliedClusterState;
}

void applyInitialConfiguration() {
onNode(localNode, () -> {
try {
coordinator.setInitialConfiguration(initialConfiguration);
logger.info("successfully set initial configuration to {}", initialConfiguration);
} catch (CoordinationStateRejectedException e) {
logger.info(new ParameterizedMessage("failed to set initial configuration to {}", initialConfiguration), e);
}
}).run();
}

private class FakeClusterApplier implements ClusterApplier {

final ClusterName clusterName;
Expand Down