-
Notifications
You must be signed in to change notification settings - Fork 25k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[Zen2] Add low-level bootstrap implementation #34345
Changes from all commits
5c20e63
5c47f55
dbb2b1a
ef712a5
dcbacd8
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -51,6 +51,7 @@ | |
|
||
import java.util.ArrayList; | ||
import java.util.Arrays; | ||
import java.util.Collections; | ||
import java.util.HashMap; | ||
import java.util.HashSet; | ||
import java.util.List; | ||
|
@@ -84,12 +85,15 @@ | |
import static org.elasticsearch.node.Node.NODE_NAME_SETTING; | ||
import static org.elasticsearch.transport.TransportService.HANDSHAKE_ACTION_NAME; | ||
import static org.elasticsearch.transport.TransportService.NOOP_TRANSPORT_INTERCEPTOR; | ||
import static org.hamcrest.Matchers.containsString; | ||
import static org.hamcrest.Matchers.empty; | ||
import static org.hamcrest.Matchers.endsWith; | ||
import static org.hamcrest.Matchers.equalTo; | ||
import static org.hamcrest.Matchers.greaterThanOrEqualTo; | ||
import static org.hamcrest.Matchers.is; | ||
import static org.hamcrest.Matchers.lessThanOrEqualTo; | ||
import static org.hamcrest.Matchers.not; | ||
import static org.hamcrest.Matchers.startsWith; | ||
|
||
@TestLogging("org.elasticsearch.cluster.coordination:TRACE,org.elasticsearch.discovery:TRACE") | ||
public class CoordinatorTests extends ESTestCase { | ||
|
@@ -404,6 +408,61 @@ public void testAckListenerReceivesNacksFromFollowerInHigherTerm() { | |
// assertTrue("expected ack from " + follower1, ackCollector.hasAckedSuccessfully(follower1)); | ||
} | ||
|
||
public void testSettingInitialConfigurationTriggersElection() { | ||
final Cluster cluster = new Cluster(randomIntBetween(1, 5)); | ||
cluster.runFor(defaultMillis(DISCOVERY_FIND_PEERS_INTERVAL_SETTING) * 2 + randomLongBetween(0, 60000), "initial discovery phase"); | ||
for (final ClusterNode clusterNode : cluster.clusterNodes) { | ||
final String nodeId = clusterNode.getId(); | ||
assertThat(nodeId + " is CANDIDATE", clusterNode.coordinator.getMode(), is(CANDIDATE)); | ||
assertThat(nodeId + " is in term 0", clusterNode.coordinator.getCurrentTerm(), is(0L)); | ||
assertThat(nodeId + " last accepted in term 0", clusterNode.coordinator.getLastAcceptedState().term(), is(0L)); | ||
assertThat(nodeId + " last accepted version 0", clusterNode.coordinator.getLastAcceptedState().version(), is(0L)); | ||
assertTrue(nodeId + " has an empty last-accepted configuration", | ||
clusterNode.coordinator.getLastAcceptedState().getLastAcceptedConfiguration().isEmpty()); | ||
assertTrue(nodeId + " has an empty last-committed configuration", | ||
clusterNode.coordinator.getLastAcceptedState().getLastCommittedConfiguration().isEmpty()); | ||
} | ||
|
||
cluster.getAnyNode().applyInitialConfiguration(); | ||
cluster.stabilise(defaultMillis( | ||
// the first election should succeed, because only one node knows of the initial configuration and therefore can win a | ||
// pre-voting round and proceed to an election, so there cannot be any collisions | ||
ELECTION_INITIAL_TIMEOUT_SETTING) // TODO this wait is unnecessary, we could trigger the election immediately | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. why does the first election succeed? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. In this case only one node has a nonempty configuration, so it's the only one that can pass through prevoting and win the election, so there's no election collisions. I expanded the comment in dcbacd8. |
||
// Allow two round-trip for pre-voting and voting | ||
+ 4 * DEFAULT_DELAY_VARIABILITY | ||
// Then a commit of the new leader's first cluster state | ||
+ DEFAULT_CLUSTER_STATE_UPDATE_DELAY); | ||
} | ||
|
||
public void testCannotSetInitialConfigurationTwice() { | ||
final Cluster cluster = new Cluster(randomIntBetween(1, 5)); | ||
cluster.runRandomly(); | ||
cluster.stabilise(); | ||
|
||
final Coordinator coordinator = cluster.getAnyNode().coordinator; | ||
final CoordinationStateRejectedException exception = expectThrows(CoordinationStateRejectedException.class, | ||
() -> coordinator.setInitialConfiguration(coordinator.getLastAcceptedState().getLastCommittedConfiguration())); | ||
|
||
assertThat(exception.getMessage(), is("Cannot set initial configuration: configuration has already been set")); | ||
} | ||
|
||
public void testCannotSetInitialConfigurationWithoutQuorum() { | ||
final Cluster cluster = new Cluster(randomIntBetween(1, 5)); | ||
final Coordinator coordinator = cluster.getAnyNode().coordinator; | ||
final VotingConfiguration unknownNodeConfiguration = new VotingConfiguration(Collections.singleton("unknown-node")); | ||
final String exceptionMessage = expectThrows(CoordinationStateRejectedException.class, | ||
() -> coordinator.setInitialConfiguration(unknownNodeConfiguration)).getMessage(); | ||
assertThat(exceptionMessage, | ||
startsWith("not enough nodes discovered to form a quorum in the initial configuration [knownNodes=[")); | ||
assertThat(exceptionMessage, | ||
endsWith("], VotingConfiguration{unknown-node}]")); | ||
assertThat(exceptionMessage, containsString(coordinator.getLocalNode().toString())); | ||
|
||
// This is VERY BAD: setting a _different_ initial configuration. Yet it works if the first attempt will never be a quorum. | ||
coordinator.setInitialConfiguration(new VotingConfiguration(Collections.singleton(coordinator.getLocalNode().getId()))); | ||
cluster.stabilise(); | ||
} | ||
|
||
private static long defaultMillis(Setting<TimeValue> setting) { | ||
return setting.get(Settings.EMPTY).millis() + Cluster.DEFAULT_DELAY_VARIABILITY; | ||
} | ||
|
@@ -555,6 +614,14 @@ void runRandomly() { | |
} | ||
break; | ||
} | ||
} else if (rarely()) { | ||
final ClusterNode clusterNode = getAnyNode(); | ||
onNode(clusterNode.getLocalNode(), | ||
() -> { | ||
logger.debug("----> [runRandomly {}] applying initial configuration {} to {}", | ||
thisStep, initialConfiguration, clusterNode.getId()); | ||
clusterNode.coordinator.setInitialConfiguration(initialConfiguration); | ||
}).run(); | ||
} else { | ||
if (deterministicTaskQueue.hasDeferredTasks() && randomBoolean()) { | ||
deterministicTaskQueue.advanceTime(); | ||
|
@@ -566,7 +633,6 @@ void runRandomly() { | |
// TODO other random steps: | ||
// - reboot a node | ||
// - abdicate leadership | ||
// - bootstrap | ||
|
||
} catch (CoordinationStateRejectedException ignored) { | ||
// This is ok: it just means a message couldn't currently be handled. | ||
|
@@ -606,6 +672,17 @@ void stabilise() { | |
void stabilise(long stabilisationDurationMillis) { | ||
assertThat("stabilisation requires default delay variability (and proper cleanup of raised variability)", | ||
deterministicTaskQueue.getExecutionDelayVariabilityMillis(), lessThanOrEqualTo(DEFAULT_DELAY_VARIABILITY)); | ||
|
||
if (clusterNodes.stream().allMatch(n -> n.coordinator.getLastAcceptedState().getLastAcceptedConfiguration().isEmpty())) { | ||
assertThat("setting initial configuration may fail with disconnected nodes", disconnectedNodes, empty()); | ||
assertThat("setting initial configuration may fail with blackholed nodes", blackholedNodes, empty()); | ||
runFor(defaultMillis(DISCOVERY_FIND_PEERS_INTERVAL_SETTING) * 2, "discovery prior to setting initial configuration"); | ||
final ClusterNode bootstrapNode = getAnyNode(); | ||
bootstrapNode.applyInitialConfiguration(); | ||
} else { | ||
logger.info("setting initial configuration not required"); | ||
} | ||
|
||
runFor(stabilisationDurationMillis, "stabilising"); | ||
fixLag(); | ||
assertUniqueLeaderAndExpectedModes(); | ||
|
@@ -705,7 +782,7 @@ private void assertUniqueLeaderAndExpectedModes() { | |
|
||
ClusterNode getAnyLeader() { | ||
List<ClusterNode> allLeaders = clusterNodes.stream().filter(ClusterNode::isLeader).collect(Collectors.toList()); | ||
assertThat(allLeaders, not(empty())); | ||
assertThat("leaders", allLeaders, not(empty())); | ||
return randomFrom(allLeaders); | ||
} | ||
|
||
|
@@ -758,8 +835,8 @@ class ClusterNode extends AbstractComponent { | |
super(Settings.builder().put(NODE_NAME_SETTING.getKey(), nodeIdFromIndex(nodeIndex)).build()); | ||
this.nodeIndex = nodeIndex; | ||
localNode = createDiscoveryNode(); | ||
persistedState = new InMemoryPersistedState(1L, | ||
clusterState(1L, 1L, localNode, initialConfiguration, initialConfiguration, 0L)); | ||
persistedState = new InMemoryPersistedState(0L, | ||
clusterState(0L, 0L, localNode, VotingConfiguration.EMPTY_CONFIG, VotingConfiguration.EMPTY_CONFIG, 0L)); | ||
onNode(localNode, this::setUp).run(); | ||
} | ||
|
||
|
@@ -916,6 +993,17 @@ ClusterState getLastAppliedClusterState() { | |
return clusterApplier.lastAppliedClusterState; | ||
} | ||
|
||
void applyInitialConfiguration() { | ||
onNode(localNode, () -> { | ||
try { | ||
coordinator.setInitialConfiguration(initialConfiguration); | ||
logger.info("successfully set initial configuration to {}", initialConfiguration); | ||
} catch (CoordinationStateRejectedException e) { | ||
logger.info(new ParameterizedMessage("failed to set initial configuration to {}", initialConfiguration), e); | ||
} | ||
}).run(); | ||
} | ||
|
||
private class FakeClusterApplier implements ClusterApplier { | ||
|
||
final ClusterName clusterName; | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is this duplicating the checks in
CoordinationState.setInitialState
?There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, except that these checks happen earlier.