diff --git a/server/src/main/java/org/elasticsearch/cluster/coordination/Reconfigurator.java b/server/src/main/java/org/elasticsearch/cluster/coordination/Reconfigurator.java new file mode 100644 index 0000000000000..64459eadda471 --- /dev/null +++ b/server/src/main/java/org/elasticsearch/cluster/coordination/Reconfigurator.java @@ -0,0 +1,163 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.cluster.coordination; + +import org.elasticsearch.cluster.ClusterState; +import org.elasticsearch.cluster.node.DiscoveryNode; +import org.elasticsearch.common.component.AbstractComponent; +import org.elasticsearch.common.settings.ClusterSettings; +import org.elasticsearch.common.settings.Setting; +import org.elasticsearch.common.settings.Setting.Property; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.common.util.set.Sets; + +import java.util.Collection; +import java.util.Set; +import java.util.TreeSet; +import java.util.stream.Collectors; +import java.util.stream.Stream; + +/** + * Computes the optimal configuration of voting nodes in the cluster. + */ +public class Reconfigurator extends AbstractComponent { + + /** + * The cluster usually requires a vote from at least half of the master nodes in order to commit a cluster state update, and to achieve + * this it makes automatic adjustments to the quorum size as master nodes join or leave the cluster. However, if master nodes leave the + * cluster slowly enough then these automatic adjustments can end up with a single master node; if this last node were to fail then the + * cluster would be rendered permanently unavailable. Instead it may be preferable to stop processing cluster state updates and become + * unavailable when the second-last (more generally, n'th-last) node leaves the cluster, so that the cluster is never in a situation + * where a single node's failure can cause permanent unavailability. This setting determines the size of the smallest set of master + * nodes required to process a cluster state update. + */ + public static final Setting CLUSTER_MASTER_NODES_FAILURE_TOLERANCE = + Setting.intSetting("cluster.master_nodes_failure_tolerance", 0, 0, Property.NodeScope, Property.Dynamic); + // the default is not supposed to be important since we expect to set this setting explicitly at bootstrapping time + // TODO contemplate setting the default to something larger than 0 (1? 1<<30?) + + private volatile int masterNodesFailureTolerance; + + public Reconfigurator(Settings settings, ClusterSettings clusterSettings) { + super(settings); + masterNodesFailureTolerance = CLUSTER_MASTER_NODES_FAILURE_TOLERANCE.get(settings); + clusterSettings.addSettingsUpdateConsumer(CLUSTER_MASTER_NODES_FAILURE_TOLERANCE, this::setMasterNodesFailureTolerance); + } + + public void setMasterNodesFailureTolerance(int masterNodesFailureTolerance) { + this.masterNodesFailureTolerance = masterNodesFailureTolerance; + } + + private static int roundDownToOdd(int size) { + return size - (size % 2 == 0 ? 1 : 0); + } + + @Override + public String toString() { + return "Reconfigurator{" + + "masterNodesFailureTolerance=" + masterNodesFailureTolerance + + '}'; + } + + /** + * Compute an optimal configuration for the cluster. + * + * @param liveNodes The live nodes in the cluster. The optimal configuration prefers live nodes over non-live nodes as far as + * possible. + * @param retiredNodeIds Nodes that are leaving the cluster and which should not appear in the configuration if possible. Nodes that are + * retired and not in the current configuration will never appear in the resulting configuration; this is useful + * for shifting the vote in a 2-node cluster so one of the nodes can be restarted without harming availability. + * @param currentConfig The current configuration. As far as possible, we prefer to keep the current config as-is. + * @return An optimal configuration, or leave the current configuration unchanged if the optimal configuration has no live quorum. + */ + public ClusterState.VotingConfiguration reconfigure(Set liveNodes, Set retiredNodeIds, + ClusterState.VotingConfiguration currentConfig) { + logger.trace("{} reconfiguring {} based on liveNodes={}, retiredNodeIds={}", this, currentConfig, liveNodes, retiredNodeIds); + + final int safeConfigurationSize = 2 * masterNodesFailureTolerance + 1; + if (currentConfig.getNodeIds().size() < safeConfigurationSize) { + throw new AssertionError(currentConfig + " is smaller than expected " + safeConfigurationSize); + } + + /* + * There are three true/false properties of each node in play: live/non-live, retired/non-retired and in-config/not-in-config. + * Firstly we divide the nodes into disjoint sets based on these properties: + * + * - retiredInConfigNotLiveIds + * - nonRetiredInConfigNotLiveIds + * - retiredInConfigLiveIds + * - nonRetiredInConfigLiveIds + * - nonRetiredLiveNotInConfigIds + * + * The other 3 possibilities are not relevant: + * - retired, not-in-config, live -- cannot add a retired node back to the config + * - retired, not-in-config, non-live -- cannot add a retired node back to the config + * - non-retired, non-live, not-in-config -- no evidence this node exists at all + */ + + final Set liveNodeIds = liveNodes.stream() + .filter(DiscoveryNode::isMasterNode).map(DiscoveryNode::getId).collect(Collectors.toSet()); + final Set liveInConfigIds = new TreeSet<>(currentConfig.getNodeIds()); + liveInConfigIds.retainAll(liveNodeIds); + + final Set inConfigNotLiveIds = Sets.sortedDifference(currentConfig.getNodeIds(), liveInConfigIds); + final Set retiredInConfigNotLiveIds = new TreeSet<>(inConfigNotLiveIds); + retiredInConfigNotLiveIds.retainAll(retiredNodeIds); + final Set nonRetiredInConfigNotLiveIds = new TreeSet<>(inConfigNotLiveIds); + nonRetiredInConfigNotLiveIds.removeAll(retiredInConfigNotLiveIds); + + final Set retiredInConfigLiveIds = new TreeSet<>(liveInConfigIds); + retiredInConfigLiveIds.retainAll(retiredNodeIds); + final Set nonRetiredInConfigLiveIds = new TreeSet<>(liveInConfigIds); + nonRetiredInConfigLiveIds.removeAll(retiredInConfigLiveIds); + + final Set nonRetiredLiveNotInConfigIds = Sets.sortedDifference(liveNodeIds, currentConfig.getNodeIds()); + nonRetiredLiveNotInConfigIds.removeAll(retiredNodeIds); + + /* + * Now we work out how many nodes should be in the configuration: + */ + + // ideally we want the configuration to be all the non-retired live nodes ... + final int nonRetiredLiveNodeCount = nonRetiredInConfigLiveIds.size() + nonRetiredLiveNotInConfigIds.size(); + + // ... except one, if even, because odd configurations are slightly more resilient ... + final int votingNodeCount = roundDownToOdd(nonRetiredLiveNodeCount); + + // ... except that the new configuration must satisfy CLUSTER_MASTER_NODES_FAILURE_TOLERANCE too: + final int targetSize = Math.max(votingNodeCount, safeConfigurationSize); + + /* + * The new configuration is formed by taking this many nodes in the following preference order: + */ + final ClusterState.VotingConfiguration newConfig = new ClusterState.VotingConfiguration( + Stream.of(nonRetiredInConfigLiveIds, nonRetiredLiveNotInConfigIds, // live nodes first, preferring the current config + retiredInConfigLiveIds, // if we need more, first use retired nodes that are still alive and haven't been removed yet + nonRetiredInConfigNotLiveIds, retiredInConfigNotLiveIds) // if we need more, use non-live nodes + .flatMap(Collection::stream).limit(targetSize).collect(Collectors.toSet())); + + if (newConfig.hasQuorum(liveNodeIds)) { + return newConfig; + } else { + // If there are not enough live nodes to form a quorum in the newly-proposed configuration, it's better to do nothing. + return currentConfig; + } + } +} diff --git a/server/src/main/java/org/elasticsearch/common/settings/ClusterSettings.java b/server/src/main/java/org/elasticsearch/common/settings/ClusterSettings.java index 6bfd8d4d0f206..0789c50ac606c 100644 --- a/server/src/main/java/org/elasticsearch/common/settings/ClusterSettings.java +++ b/server/src/main/java/org/elasticsearch/common/settings/ClusterSettings.java @@ -34,6 +34,7 @@ import org.elasticsearch.cluster.coordination.Coordinator; import org.elasticsearch.cluster.coordination.ElectionSchedulerFactory; import org.elasticsearch.cluster.coordination.JoinHelper; +import org.elasticsearch.cluster.coordination.Reconfigurator; import org.elasticsearch.cluster.metadata.IndexGraveyard; import org.elasticsearch.cluster.metadata.MetaData; import org.elasticsearch.cluster.routing.OperationRouting; @@ -450,9 +451,11 @@ public void apply(Settings value, Settings current, Settings previous) { ElectionSchedulerFactory.ELECTION_INITIAL_TIMEOUT_SETTING, ElectionSchedulerFactory.ELECTION_BACK_OFF_TIME_SETTING, ElectionSchedulerFactory.ELECTION_MAX_TIMEOUT_SETTING, + Coordinator.PUBLISH_TIMEOUT_SETTING, ElectionSchedulerFactory.ELECTION_DURATION_SETTING, Coordinator.PUBLISH_TIMEOUT_SETTING, - JoinHelper.JOIN_TIMEOUT_SETTING + JoinHelper.JOIN_TIMEOUT_SETTING, + Reconfigurator.CLUSTER_MASTER_NODES_FAILURE_TOLERANCE ))); public static List> BUILT_IN_SETTING_UPGRADERS = Collections.unmodifiableList(Arrays.asList( diff --git a/server/src/test/java/org/elasticsearch/cluster/coordination/ReconfiguratorTests.java b/server/src/test/java/org/elasticsearch/cluster/coordination/ReconfiguratorTests.java new file mode 100644 index 0000000000000..b94a3de854c69 --- /dev/null +++ b/server/src/test/java/org/elasticsearch/cluster/coordination/ReconfiguratorTests.java @@ -0,0 +1,188 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.cluster.coordination; + +import org.apache.logging.log4j.message.ParameterizedMessage; +import org.elasticsearch.Version; +import org.elasticsearch.cluster.ClusterState; +import org.elasticsearch.cluster.ClusterState.VotingConfiguration; +import org.elasticsearch.cluster.node.DiscoveryNode; +import org.elasticsearch.common.settings.ClusterSettings; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.common.util.set.Sets; +import org.elasticsearch.test.ESTestCase; + +import java.util.Arrays; +import java.util.HashSet; +import java.util.List; +import java.util.Set; +import java.util.stream.Collectors; + +import static java.util.Collections.emptySet; +import static org.elasticsearch.cluster.coordination.Reconfigurator.CLUSTER_MASTER_NODES_FAILURE_TOLERANCE; +import static org.hamcrest.Matchers.containsString; +import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.sameInstance; + +public class ReconfiguratorTests extends ESTestCase { + + public void testReconfigurationExamples() { + + check(nodes("a"), conf("a"), 0, conf("a")); + check(nodes("a", "b"), conf("a"), 0, conf("a")); + check(nodes("a", "b", "c"), conf("a"), 0, conf("a", "b", "c")); + check(nodes("a", "b", "c"), conf("a", "b"), 0, conf("a", "b", "c")); + check(nodes("a", "b"), conf("a", "b", "e"), 0, conf("a")); + check(nodes("a", "b"), conf("a", "b", "e"), 1, conf("a", "b", "e")); + + for (int masterNodesFailureTolerance = 0; masterNodesFailureTolerance <= 1; masterNodesFailureTolerance++) { + check(nodes("a", "b", "c"), conf("a", "b", "c"), masterNodesFailureTolerance, conf("a", "b", "c")); + check(nodes("a", "b", "c", "d"), conf("a", "b", "c"), masterNodesFailureTolerance, conf("a", "b", "c")); + check(nodes("a", "b", "c", "d", "e"), conf("a", "b", "c"), masterNodesFailureTolerance, conf("a", "b", "c", "d", "e")); + check(nodes("a", "b", "c"), conf("a", "b", "e"), masterNodesFailureTolerance, conf("a", "b", "c")); + check(nodes("a", "b", "c", "d"), conf("a", "b", "e"), masterNodesFailureTolerance, conf("a", "b", "c")); + check(nodes("a", "b", "c", "d", "e"), conf("a", "f", "g"), masterNodesFailureTolerance, conf("a", "b", "c", "d", "e")); + check(nodes("a", "b", "c", "d"), conf("a", "b", "c", "d", "e"), masterNodesFailureTolerance, conf("a", "b", "c")); + } + + check(nodes("a", "b", "c", "d"), conf("a", "b", "c", "d", "e"), 2, conf("a", "b", "c", "d", "e")); + + // Retiring a single node shifts the votes elsewhere if possible. + check(nodes("a", "b"), retired("a"), conf("a"), 0, conf("b")); + + // Retiring a node from a three-node cluster drops down to a one-node configuration if failure tolerance is 0 + check(nodes("a", "b", "c"), retired("a"), conf("a"), 0, conf("b")); + check(nodes("a", "b", "c"), retired("a"), conf("a", "b", "c"), 0, conf("b")); + + // Retiring is prevented in a three-node cluster if failure tolerance is 1 + check(nodes("a", "b", "c"), retired("a"), conf("a", "b", "c"), 1, conf("a", "b", "c")); + + // 7 nodes, one for each combination of live/retired/current. Ideally we want the config to be the non-retired live nodes. + // Since there are 2 non-retired live nodes we round down to 1 and just use the one that's already in the config. + check(nodes("a", "b", "c", "f"), retired("c", "e", "f", "g"), conf("a", "c", "d", "e"), 0, conf("a")); + // If we want the config to be at least 3 nodes then we don't retire "c" just yet. + check(nodes("a", "b", "c", "f"), retired("c", "e", "f", "g"), conf("a", "c", "d", "e"), 1, conf("a", "b", "c")); + // If we want the config to be at least 5 nodes then we keep "d" and "h". + check(nodes("a", "b", "c", "f"), retired("c", "e", "f", "g"), conf("a", "c", "d", "e", "h"), 2, conf("a", "b", "c", "d", "h")); + } + + public void testReconfigurationProperty() { + final String[] allNodes = new String[]{"a", "b", "c", "d", "e", "f", "g"}; + + final String[] liveNodes = new String[randomIntBetween(1, allNodes.length)]; + randomSubsetOf(liveNodes.length, allNodes).toArray(liveNodes); + + final String[] initialVotingNodes = new String[randomIntBetween(1, allNodes.length)]; + randomSubsetOf(initialVotingNodes.length, allNodes).toArray(initialVotingNodes); + + final int masterNodesFailureTolerance = randomIntBetween(0, 2); + + final Reconfigurator reconfigurator = makeReconfigurator( + Settings.builder().put(CLUSTER_MASTER_NODES_FAILURE_TOLERANCE.getKey(), masterNodesFailureTolerance).build()); + final Set liveNodesSet = nodes(liveNodes); + final ClusterState.VotingConfiguration initialConfig = conf(initialVotingNodes); + + // min configuration size comes from CLUSTER_MASTER_NODES_FAILURE_TOLERANCE as long as there are enough nodes in the current config + + if (initialConfig.getNodeIds().size() >= masterNodesFailureTolerance * 2 + 1) { + // actual size of a quorum: half the configured nodes, which is all the live nodes plus maybe some dead ones to make up numbers + final int quorumSize = Math.max(liveNodes.length / 2 + 1, masterNodesFailureTolerance + 1); + + final ClusterState.VotingConfiguration finalConfig = reconfigurator.reconfigure(liveNodesSet, emptySet(), initialConfig); + + final String description = "reconfigure " + liveNodesSet + " from " + initialConfig + " with failure tolerance of " + + masterNodesFailureTolerance + " yielded " + finalConfig; + + if (quorumSize > liveNodes.length) { + assertFalse(description + " without a live quorum", finalConfig.hasQuorum(Arrays.asList(liveNodes))); + } else { + final List expectedQuorum = randomSubsetOf(quorumSize, liveNodes); + assertTrue(description + " with quorum[" + quorumSize + "] of " + expectedQuorum, finalConfig.hasQuorum(expectedQuorum)); + } + } else { + assertThat(expectThrows(AssertionError.class, + () -> reconfigurator.reconfigure(liveNodesSet, emptySet(), initialConfig)).getMessage(), + containsString("is smaller than expected")); + } + } + + private ClusterState.VotingConfiguration conf(String... nodes) { + return new ClusterState.VotingConfiguration(Sets.newHashSet(nodes)); + } + + private Set nodes(String... nodes) { + final Set liveNodes = new HashSet<>(); + for (String id : nodes) { + liveNodes.add(new DiscoveryNode(id, buildNewFakeTransportAddress(), Version.CURRENT)); + } + return liveNodes; + } + + private Set retired(String... nodes) { + return Arrays.stream(nodes).collect(Collectors.toSet()); + } + + private void check(Set liveNodes, ClusterState.VotingConfiguration config, int masterNodesFailureTolerance, + ClusterState.VotingConfiguration expectedConfig) { + check(liveNodes, retired(), config, masterNodesFailureTolerance, expectedConfig); + } + + private void check(Set liveNodes, Set retired, ClusterState.VotingConfiguration config, + int masterNodesFailureTolerance, ClusterState.VotingConfiguration expectedConfig) { + final Reconfigurator reconfigurator = makeReconfigurator(Settings.builder() + .put(CLUSTER_MASTER_NODES_FAILURE_TOLERANCE.getKey(), masterNodesFailureTolerance) + .build()); + + final ClusterState.VotingConfiguration adaptedConfig = reconfigurator.reconfigure(liveNodes, retired, config); + assertEquals(new ParameterizedMessage("[liveNodes={}, retired={}, config={}, masterNodesFailureTolerance={}]", + liveNodes, retired, config, masterNodesFailureTolerance).getFormattedMessage(), + expectedConfig, adaptedConfig); + } + + private Reconfigurator makeReconfigurator(Settings settings) { + return new Reconfigurator(settings, new ClusterSettings(settings, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS)); + } + + public void testDynamicSetting() { + final ClusterSettings clusterSettings = new ClusterSettings(Settings.EMPTY, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS); + final Reconfigurator reconfigurator = new Reconfigurator(Settings.EMPTY, clusterSettings); + final VotingConfiguration initialConfig = conf("a", "b", "c", "d", "e"); + + // default is "0" + assertThat(reconfigurator.reconfigure(nodes("a"), retired(), initialConfig), equalTo(conf("a"))); + + // update to "2" + clusterSettings.applySettings(Settings.builder().put(CLUSTER_MASTER_NODES_FAILURE_TOLERANCE.getKey(), "2").build()); + assertThat(reconfigurator.reconfigure(nodes("a"), retired(), initialConfig), sameInstance(initialConfig)); // cannot reconfigure + assertThat(reconfigurator.reconfigure(nodes("a", "b", "c"), retired(), initialConfig), equalTo(conf("a", "b", "c", "d", "e"))); + + // update to "1" + clusterSettings.applySettings(Settings.builder().put(CLUSTER_MASTER_NODES_FAILURE_TOLERANCE.getKey(), "1").build()); + assertThat(reconfigurator.reconfigure(nodes("a"), retired(), initialConfig), sameInstance(initialConfig)); // cannot reconfigure + assertThat(reconfigurator.reconfigure(nodes("a", "b", "c"), retired(), initialConfig), equalTo(conf("a", "b", "c"))); + + // explicitly set to "0" + clusterSettings.applySettings(Settings.builder().put(CLUSTER_MASTER_NODES_FAILURE_TOLERANCE.getKey(), "0").build()); + assertThat(reconfigurator.reconfigure(nodes("a"), retired(), initialConfig), equalTo(conf("a"))); + + expectThrows(IllegalArgumentException.class, () -> + clusterSettings.applySettings(Settings.builder().put(CLUSTER_MASTER_NODES_FAILURE_TOLERANCE.getKey(), "-1").build())); + } +}