diff --git a/core/src/main/java/org/elasticsearch/common/settings/ClusterSettings.java b/core/src/main/java/org/elasticsearch/common/settings/ClusterSettings.java index 1ce156b853682..1256021b96e99 100644 --- a/core/src/main/java/org/elasticsearch/common/settings/ClusterSettings.java +++ b/core/src/main/java/org/elasticsearch/common/settings/ClusterSettings.java @@ -54,8 +54,8 @@ import org.elasticsearch.common.util.concurrent.ThreadContext; import org.elasticsearch.discovery.DiscoveryModule; import org.elasticsearch.discovery.DiscoverySettings; +import org.elasticsearch.discovery.zen.ElectMasterService; import org.elasticsearch.discovery.zen.ZenDiscovery; -import org.elasticsearch.discovery.zen.elect.ElectMasterService; import org.elasticsearch.discovery.zen.fd.FaultDetection; import org.elasticsearch.discovery.zen.ping.unicast.UnicastZenPing; import org.elasticsearch.env.Environment; diff --git a/core/src/main/java/org/elasticsearch/discovery/DiscoveryModule.java b/core/src/main/java/org/elasticsearch/discovery/DiscoveryModule.java index 040066adeb6b1..b41316b65345d 100644 --- a/core/src/main/java/org/elasticsearch/discovery/DiscoveryModule.java +++ b/core/src/main/java/org/elasticsearch/discovery/DiscoveryModule.java @@ -19,7 +19,6 @@ package org.elasticsearch.discovery; -import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.common.inject.AbstractModule; import org.elasticsearch.common.inject.multibindings.Multibinder; import org.elasticsearch.common.settings.Setting; @@ -27,8 +26,8 @@ import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.util.ExtensionPoint; import org.elasticsearch.discovery.local.LocalDiscovery; +import org.elasticsearch.discovery.zen.ElectMasterService; import org.elasticsearch.discovery.zen.ZenDiscovery; -import org.elasticsearch.discovery.zen.elect.ElectMasterService; import org.elasticsearch.discovery.zen.ping.ZenPing; import org.elasticsearch.discovery.zen.ping.ZenPingService; import org.elasticsearch.discovery.zen.ping.unicast.UnicastHostsProvider; diff --git a/core/src/main/java/org/elasticsearch/discovery/zen/elect/ElectMasterService.java b/core/src/main/java/org/elasticsearch/discovery/zen/ElectMasterService.java similarity index 61% rename from core/src/main/java/org/elasticsearch/discovery/zen/elect/ElectMasterService.java rename to core/src/main/java/org/elasticsearch/discovery/zen/ElectMasterService.java index 3ef9138f933b9..1d11f5cf0f569 100644 --- a/core/src/main/java/org/elasticsearch/discovery/zen/elect/ElectMasterService.java +++ b/core/src/main/java/org/elasticsearch/discovery/zen/ElectMasterService.java @@ -17,11 +17,10 @@ * under the License. */ -package org.elasticsearch.discovery.zen.elect; +package org.elasticsearch.discovery.zen; import com.carrotsearch.hppc.ObjectContainer; import org.apache.lucene.util.CollectionUtil; -import org.elasticsearch.Version; import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.common.component.AbstractComponent; @@ -33,9 +32,11 @@ import java.util.ArrayList; import java.util.Arrays; -import java.util.Comparator; +import java.util.Collection; import java.util.Iterator; import java.util.List; +import java.util.Objects; +import java.util.stream.Collectors; /** * @@ -45,17 +46,64 @@ public class ElectMasterService extends AbstractComponent { public static final Setting DISCOVERY_ZEN_MINIMUM_MASTER_NODES_SETTING = Setting.intSetting("discovery.zen.minimum_master_nodes", -1, Property.Dynamic, Property.NodeScope); - // This is the minimum version a master needs to be on, otherwise it gets ignored - // This is based on the minimum compatible version of the current version this node is on - private final Version minMasterVersion; - private final NodeComparator nodeComparator = new NodeComparator(); - private volatile int minimumMasterNodes; + /** + * a class to encapsulate all the information about a candidate in a master election + * that is needed to decided which of the candidates should win + */ + public static class MasterCandidate { + + public static final long UNRECOVERED_CLUSTER_VERSION = -1; + + final DiscoveryNode node; + + final long clusterStateVersion; + + public MasterCandidate(DiscoveryNode node, long clusterStateVersion) { + Objects.requireNonNull(node); + assert clusterStateVersion >= -1 : "got: " + clusterStateVersion; + assert node.isMasterNode(); + this.node = node; + this.clusterStateVersion = clusterStateVersion; + } + + public DiscoveryNode getNode() { + return node; + } + + public long getClusterStateVersion() { + return clusterStateVersion; + } + + @Override + public String toString() { + return "Candidate{" + + "node=" + node + + ", clusterStateVersion=" + clusterStateVersion + + '}'; + } + + /** + * compares two candidates to indicate which the a better master. + * A higher cluster state version is better + * + * @return -1 if c1 is a batter candidate, 1 if c2. + */ + public static int compare(MasterCandidate c1, MasterCandidate c2) { + // we explicitly swap c1 and c2 here. the code expects "better" is lower in a sorted + // list, so if c2 has a higher cluster state version, it needs to come first. + int ret = Long.compare(c2.clusterStateVersion, c1.clusterStateVersion); + if (ret == 0) { + ret = compareNodes(c1.getNode(), c2.getNode()); + } + return ret; + } + } + @Inject public ElectMasterService(Settings settings) { super(settings); - this.minMasterVersion = Version.CURRENT.minimumCompatibilityVersion(); this.minimumMasterNodes = DISCOVERY_ZEN_MINIMUM_MASTER_NODES_SETTING.get(settings); logger.debug("using minimum_master_nodes [{}]", minimumMasterNodes); } @@ -69,16 +117,41 @@ public int minimumMasterNodes() { } public boolean hasEnoughMasterNodes(Iterable nodes) { - if (minimumMasterNodes < 1) { - return true; - } int count = 0; for (DiscoveryNode node : nodes) { if (node.isMasterNode()) { count++; } } - return count >= minimumMasterNodes; + return count > 0 && (minimumMasterNodes < 0 || count >= minimumMasterNodes); + } + + public boolean hasEnoughCandidates(Collection candidates) { + if (candidates.isEmpty()) { + return false; + } + if (minimumMasterNodes < 1) { + return true; + } + assert candidates.stream().map(MasterCandidate::getNode).collect(Collectors.toSet()).size() == candidates.size() : + "duplicates ahead: " + candidates; + return candidates.size() >= minimumMasterNodes; + } + + /** + * Elects a new master out of the possible nodes, returning it. Returns null + * if no master has been elected. + */ + public MasterCandidate electMaster(Collection candidates) { + assert hasEnoughCandidates(candidates); + List sortedCandidates = new ArrayList<>(candidates); + sortedCandidates.sort(MasterCandidate::compare); + return sortedCandidates.get(0); + } + + /** selects the best active master to join, where multiple are discovered */ + public DiscoveryNode tieBreakActiveMasters(Collection activeMasters) { + return activeMasters.stream().min(ElectMasterService::compareNodes).get(); } public boolean hasTooManyMasterNodes(Iterable nodes) { @@ -107,7 +180,7 @@ public void logMinimumMasterNodesWarningIfNecessary(ClusterState oldState, Clust */ public List sortByMasterLikelihood(Iterable nodes) { ArrayList sortedNodes = CollectionUtils.iterableAsArrayList(nodes); - CollectionUtil.introSort(sortedNodes, nodeComparator); + CollectionUtil.introSort(sortedNodes, ElectMasterService::compareNodes); return sortedNodes; } @@ -130,25 +203,6 @@ public DiscoveryNode[] nextPossibleMasters(ObjectContainer nodes, return nextPossibleMasters.toArray(new DiscoveryNode[nextPossibleMasters.size()]); } - /** - * Elects a new master out of the possible nodes, returning it. Returns null - * if no master has been elected. - */ - public DiscoveryNode electMaster(Iterable nodes) { - List sortedNodes = sortedMasterNodes(nodes); - if (sortedNodes == null || sortedNodes.isEmpty()) { - return null; - } - DiscoveryNode masterNode = sortedNodes.get(0); - // Sanity check: maybe we don't end up here, because serialization may have failed. - if (masterNode.getVersion().before(minMasterVersion)) { - logger.warn("ignoring master [{}], because the version [{}] is lower than the minimum compatible version [{}]", masterNode, masterNode.getVersion(), minMasterVersion); - return null; - } else { - return masterNode; - } - } - private List sortedMasterNodes(Iterable nodes) { List possibleNodes = CollectionUtils.iterableAsArrayList(nodes); if (possibleNodes.isEmpty()) { @@ -161,21 +215,18 @@ private List sortedMasterNodes(Iterable nodes) { it.remove(); } } - CollectionUtil.introSort(possibleNodes, nodeComparator); + CollectionUtil.introSort(possibleNodes, ElectMasterService::compareNodes); return possibleNodes; } - private static class NodeComparator implements Comparator { - - @Override - public int compare(DiscoveryNode o1, DiscoveryNode o2) { - if (o1.isMasterNode() && !o2.isMasterNode()) { - return -1; - } - if (!o1.isMasterNode() && o2.isMasterNode()) { - return 1; - } - return o1.getId().compareTo(o2.getId()); + /** master nodes go before other nodes, with a secondary sort by id **/ + private static int compareNodes(DiscoveryNode o1, DiscoveryNode o2) { + if (o1.isMasterNode() && !o2.isMasterNode()) { + return -1; + } + if (!o1.isMasterNode() && o2.isMasterNode()) { + return 1; } + return o1.getId().compareTo(o2.getId()); } } diff --git a/core/src/main/java/org/elasticsearch/discovery/zen/NodeJoinController.java b/core/src/main/java/org/elasticsearch/discovery/zen/NodeJoinController.java index 6f0b8966d0916..bf8559fb9495a 100644 --- a/core/src/main/java/org/elasticsearch/discovery/zen/NodeJoinController.java +++ b/core/src/main/java/org/elasticsearch/discovery/zen/NodeJoinController.java @@ -41,7 +41,6 @@ import org.elasticsearch.common.transport.LocalTransportAddress; import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.discovery.DiscoverySettings; -import org.elasticsearch.discovery.zen.elect.ElectMasterService; import org.elasticsearch.discovery.zen.membership.MembershipAction; import java.util.ArrayList; diff --git a/core/src/main/java/org/elasticsearch/discovery/zen/ZenDiscovery.java b/core/src/main/java/org/elasticsearch/discovery/zen/ZenDiscovery.java index 132505fb40377..43739a2f4106f 100644 --- a/core/src/main/java/org/elasticsearch/discovery/zen/ZenDiscovery.java +++ b/core/src/main/java/org/elasticsearch/discovery/zen/ZenDiscovery.java @@ -24,7 +24,6 @@ import org.apache.logging.log4j.util.Supplier; import org.elasticsearch.ElasticsearchException; import org.elasticsearch.ExceptionsHelper; -import org.elasticsearch.Version; import org.elasticsearch.cluster.ClusterChangedEvent; import org.elasticsearch.cluster.ClusterName; import org.elasticsearch.cluster.ClusterState; @@ -56,7 +55,6 @@ import org.elasticsearch.discovery.Discovery; import org.elasticsearch.discovery.DiscoverySettings; import org.elasticsearch.discovery.DiscoveryStats; -import org.elasticsearch.discovery.zen.elect.ElectMasterService; import org.elasticsearch.discovery.zen.fd.MasterFaultDetection; import org.elasticsearch.discovery.zen.fd.NodesFaultDetection; import org.elasticsearch.discovery.zen.membership.MembershipAction; @@ -76,13 +74,10 @@ import java.io.IOException; import java.util.ArrayList; -import java.util.Arrays; -import java.util.HashSet; import java.util.List; import java.util.Set; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; -import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicReference; import java.util.function.BiFunction; import java.util.stream.Collectors; @@ -146,9 +141,6 @@ public class ZenDiscovery extends AbstractLifecycleComponent implements Discover private final JoinThreadControl joinThreadControl; - /** counts the time this node has joined the cluster or have elected it self as master */ - private final AtomicLong clusterJoinsCounter = new AtomicLong(); - // must initialized in doStart(), when we have the allocationService set private volatile NodeJoinController nodeJoinController; private volatile NodeRemovalClusterStateTaskExecutor nodeRemovalExecutor; @@ -304,8 +296,8 @@ public DiscoveryNodes nodes() { } @Override - public boolean nodeHasJoinedClusterOnce() { - return clusterJoinsCounter.get() > 0; + public ClusterState clusterState() { + return clusterService.state(); } /** end of {@link org.elasticsearch.discovery.zen.ping.PingContextProvider } implementation */ @@ -406,8 +398,6 @@ public void onElectedAsMaster(ClusterState state) { joinThreadControl.markThreadAsDone(currentThread); // we only starts nodesFD if we are master (it may be that we received a cluster state while pinging) nodesFD.updateNodesAndPing(state); // start the nodes FD - long count = clusterJoinsCounter.incrementAndGet(); - logger.trace("cluster joins counter set to [{}] (elected as master)", count); } @Override @@ -764,9 +754,6 @@ public ClusterState execute(ClusterState currentState) { if (currentState.blocks().hasGlobalBlock(discoverySettings.getNoMasterBlock())) { // its a fresh update from the master as we transition from a start of not having a master to having one logger.debug("got first state from fresh master [{}]", newClusterState.nodes().getMasterNodeId()); - long count = clusterJoinsCounter.incrementAndGet(); - logger.trace("updated cluster join cluster to [{}]", count); - return newClusterState; } @@ -873,16 +860,6 @@ void handleJoinRequest(final DiscoveryNode node, final ClusterState state, final } else if (nodeJoinController == null) { throw new IllegalStateException("discovery module is not yet started"); } else { - // The minimum supported version for a node joining a master: - Version minimumNodeJoinVersion = localNode().getVersion().minimumCompatibilityVersion(); - // Sanity check: maybe we don't end up here, because serialization may have failed. - if (node.getVersion().before(minimumNodeJoinVersion)) { - callback.onFailure( - new IllegalStateException("Can't handle join request from a node with a version [" + node.getVersion() + "] that is lower than the minimum compatible version [" + minimumNodeJoinVersion.minimumCompatibilityVersion() + "]") - ); - return; - } - // try and connect to the node, if it fails, we can raise an exception back to the client... transportService.connectToNode(node); @@ -901,14 +878,14 @@ void handleJoinRequest(final DiscoveryNode node, final ClusterState state, final private DiscoveryNode findMaster() { logger.trace("starting to ping"); - ZenPing.PingResponse[] fullPingResponses = pingService.pingAndWait(pingTimeout); + List fullPingResponses = pingService.pingAndWait(pingTimeout).toList(); if (fullPingResponses == null) { logger.trace("No full ping responses"); return null; } if (logger.isTraceEnabled()) { StringBuilder sb = new StringBuilder(); - if (fullPingResponses.length == 0) { + if (fullPingResponses.size() == 0) { sb.append(" {none}"); } else { for (ZenPing.PingResponse pingResponse : fullPingResponses) { @@ -918,69 +895,57 @@ private DiscoveryNode findMaster() { logger.trace("full ping responses:{}", sb); } + final DiscoveryNode localNode = clusterService.localNode(); + + // add our selves + assert fullPingResponses.stream().map(ZenPing.PingResponse::node) + .filter(n -> n.equals(localNode)).findAny().isPresent() == false; + + fullPingResponses.add(new ZenPing.PingResponse(localNode, null, clusterService.state())); + // filter responses final List pingResponses = filterPingResponses(fullPingResponses, masterElectionIgnoreNonMasters, logger); - final DiscoveryNode localNode = clusterService.localNode(); - List pingMasters = new ArrayList<>(); + List activeMasters = new ArrayList<>(); for (ZenPing.PingResponse pingResponse : pingResponses) { - if (pingResponse.master() != null) { - // We can't include the local node in pingMasters list, otherwise we may up electing ourselves without - // any check / verifications from other nodes in ZenDiscover#innerJoinCluster() - if (!localNode.equals(pingResponse.master())) { - pingMasters.add(pingResponse.master()); - } + // We can't include the local node in pingMasters list, otherwise we may up electing ourselves without + // any check / verifications from other nodes in ZenDiscover#innerJoinCluster() + if (pingResponse.master() != null && !localNode.equals(pingResponse.master())) { + activeMasters.add(pingResponse.master()); } } // nodes discovered during pinging - Set activeNodes = new HashSet<>(); - // nodes discovered who has previously been part of the cluster and do not ping for the very first time - Set joinedOnceActiveNodes = new HashSet<>(); - if (localNode.isMasterNode()) { - activeNodes.add(localNode); - long joinsCounter = clusterJoinsCounter.get(); - if (joinsCounter > 0) { - logger.trace("adding local node to the list of active nodes that have previously joined the cluster (joins counter is [{}])", joinsCounter); - joinedOnceActiveNodes.add(localNode); - } - } + List masterCandidates = new ArrayList<>(); for (ZenPing.PingResponse pingResponse : pingResponses) { - activeNodes.add(pingResponse.node()); - if (pingResponse.hasJoinedOnce()) { - joinedOnceActiveNodes.add(pingResponse.node()); + if (pingResponse.node().isMasterNode()) { + masterCandidates.add(new ElectMasterService.MasterCandidate(pingResponse.node(), pingResponse.getClusterStateVersion())); } } - if (pingMasters.isEmpty()) { - if (electMaster.hasEnoughMasterNodes(activeNodes)) { - // we give preference to nodes who have previously already joined the cluster. Those will - // have a cluster state in memory, including an up to date routing table (which is not persistent to disk - // by the gateway) - DiscoveryNode master = electMaster.electMaster(joinedOnceActiveNodes); - if (master != null) { - return master; - } - return electMaster.electMaster(activeNodes); + if (activeMasters.isEmpty()) { + if (electMaster.hasEnoughCandidates(masterCandidates)) { + final ElectMasterService.MasterCandidate winner = electMaster.electMaster(masterCandidates); + logger.trace("candidate {} won election", winner); + return winner.getNode(); } else { // if we don't have enough master nodes, we bail, because there are not enough master to elect from - logger.trace("not enough master nodes [{}]", activeNodes); + logger.trace("not enough master nodes [{}]", masterCandidates); return null; } } else { - - assert !pingMasters.contains(localNode) : "local node should never be elected as master when other nodes indicate an active master"; + assert !activeMasters.contains(localNode) : "local node should never be elected as master when other nodes indicate an active master"; // lets tie break between discovered nodes - return electMaster.electMaster(pingMasters); + return electMaster.tieBreakActiveMasters(activeMasters); } } - static List filterPingResponses(ZenPing.PingResponse[] fullPingResponses, boolean masterElectionIgnoreNonMasters, Logger logger) { + static List filterPingResponses(List fullPingResponses, boolean masterElectionIgnoreNonMasters, Logger logger) { List pingResponses; if (masterElectionIgnoreNonMasters) { - pingResponses = Arrays.stream(fullPingResponses).filter(ping -> ping.node().isMasterNode()).collect(Collectors.toList()); + pingResponses = fullPingResponses.stream().filter(ping -> ping.node().isMasterNode()).collect(Collectors.toList()); } else { - pingResponses = Arrays.asList(fullPingResponses); + pingResponses = fullPingResponses; } if (logger.isDebugEnabled()) { diff --git a/core/src/main/java/org/elasticsearch/discovery/zen/ping/PingContextProvider.java b/core/src/main/java/org/elasticsearch/discovery/zen/ping/PingContextProvider.java index 568bc3ec16d75..0bcc8b37d882a 100644 --- a/core/src/main/java/org/elasticsearch/discovery/zen/ping/PingContextProvider.java +++ b/core/src/main/java/org/elasticsearch/discovery/zen/ping/PingContextProvider.java @@ -19,6 +19,7 @@ package org.elasticsearch.discovery.zen.ping; +import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.discovery.zen.DiscoveryNodesProvider; /** @@ -26,7 +27,7 @@ */ public interface PingContextProvider extends DiscoveryNodesProvider { - /** return true if this node has previously joined the cluster at least once. False if this is first join */ - boolean nodeHasJoinedClusterOnce(); + /** return the current cluster state of the node */ + ClusterState clusterState(); } diff --git a/core/src/main/java/org/elasticsearch/discovery/zen/ping/ZenPing.java b/core/src/main/java/org/elasticsearch/discovery/zen/ping/ZenPing.java index 5a9f5f463e236..b4bb61ad461e3 100644 --- a/core/src/main/java/org/elasticsearch/discovery/zen/ping/ZenPing.java +++ b/core/src/main/java/org/elasticsearch/discovery/zen/ping/ZenPing.java @@ -20,30 +20,42 @@ package org.elasticsearch.discovery.zen.ping; import org.elasticsearch.cluster.ClusterName; +import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.common.component.LifecycleComponent; import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamOutput; import org.elasticsearch.common.io.stream.Streamable; import org.elasticsearch.common.unit.TimeValue; +import org.elasticsearch.discovery.zen.ElectMasterService; import java.io.IOException; +import java.util.ArrayList; +import java.util.Collection; import java.util.HashMap; +import java.util.List; import java.util.Map; import java.util.concurrent.atomic.AtomicLong; +import static org.elasticsearch.gateway.GatewayService.STATE_NOT_RECOVERED_BLOCK; + public interface ZenPing extends LifecycleComponent { void setPingContextProvider(PingContextProvider contextProvider); void ping(PingListener listener, TimeValue timeout); - public interface PingListener { + interface PingListener { - void onPing(PingResponse[] pings); + /** + * called when pinging is done. + * + * @param pings ping result *must + */ + void onPing(Collection pings); } - public static class PingResponse implements Streamable { + class PingResponse implements Streamable { public static final PingResponse[] EMPTY = new PingResponse[0]; @@ -59,29 +71,36 @@ public static class PingResponse implements Streamable { private DiscoveryNode master; - private boolean hasJoinedOnce; + private long clusterStateVersion; private PingResponse() { } /** - * @param node the node which this ping describes - * @param master the current master of the node - * @param clusterName the cluster name of the node - * @param hasJoinedOnce true if the joined has successfully joined the cluster before + * @param node the node which this ping describes + * @param master the current master of the node + * @param clusterName the cluster name of the node + * @param clusterStateVersion the current cluster state version of that node + * ({@link ElectMasterService.MasterCandidate#UNRECOVERED_CLUSTER_VERSION} for not recovered) */ - public PingResponse(DiscoveryNode node, DiscoveryNode master, ClusterName clusterName, boolean hasJoinedOnce) { + public PingResponse(DiscoveryNode node, DiscoveryNode master, ClusterName clusterName, long clusterStateVersion) { this.id = idGenerator.incrementAndGet(); this.node = node; this.master = master; this.clusterName = clusterName; - this.hasJoinedOnce = hasJoinedOnce; + this.clusterStateVersion = clusterStateVersion; } - /** - * an always increasing unique identifier for this ping response. - * lower values means older pings. - */ + public PingResponse(DiscoveryNode node, DiscoveryNode master, ClusterState state) { + this(node, master, state.getClusterName(), + state.blocks().hasGlobalBlock(STATE_NOT_RECOVERED_BLOCK) ? + ElectMasterService.MasterCandidate.UNRECOVERED_CLUSTER_VERSION : state.version()); + } + + /** + * an always increasing unique identifier for this ping response. + * lower values means older pings. + */ public long id() { return this.id; } @@ -100,9 +119,11 @@ public DiscoveryNode master() { return master; } - /** true if the joined has successfully joined the cluster before */ - public boolean hasJoinedOnce() { - return hasJoinedOnce; + /** + * the current cluster state version of that node ({@link ElectMasterService.MasterCandidate#UNRECOVERED_CLUSTER_VERSION} + * for not recovered) */ + public long getClusterStateVersion() { + return clusterStateVersion; } public static PingResponse readPingResponse(StreamInput in) throws IOException { @@ -118,7 +139,7 @@ public void readFrom(StreamInput in) throws IOException { if (in.readBoolean()) { master = new DiscoveryNode(in); } - this.hasJoinedOnce = in.readBoolean(); + this.clusterStateVersion = in.readLong(); this.id = in.readLong(); } @@ -132,13 +153,14 @@ public void writeTo(StreamOutput out) throws IOException { out.writeBoolean(true); master.writeTo(out); } - out.writeBoolean(hasJoinedOnce); + out.writeLong(clusterStateVersion); out.writeLong(id); } @Override public String toString() { - return "ping_response{node [" + node + "], id[" + id + "], master [" + master + "], hasJoinedOnce [" + hasJoinedOnce + "], cluster_name[" + clusterName.value() + "]}"; + return "ping_response{node [" + node + "], id[" + id + "], master [" + master + "], cluster_state_version [" + clusterStateVersion + + "], cluster_name[" + clusterName.value() + "]}"; } } @@ -146,7 +168,7 @@ public String toString() { /** * a utility collection of pings where only the most recent ping is stored per node */ - public static class PingCollection { + class PingCollection { Map pings; @@ -171,15 +193,15 @@ public synchronized boolean addPing(PingResponse ping) { } /** adds multiple pings if newer than previous pings from the same node */ - public synchronized void addPings(PingResponse[] pings) { + public synchronized void addPings(Iterable pings) { for (PingResponse ping : pings) { addPing(ping); } } - /** serialize current pings to an array */ - public synchronized PingResponse[] toArray() { - return pings.values().toArray(new PingResponse[pings.size()]); + /** serialize current pings to a list. It is guaranteed that the list contains one ping response per node */ + public synchronized List toList() { + return new ArrayList<>(pings.values()); } /** the number of nodes for which there are known pings */ diff --git a/core/src/main/java/org/elasticsearch/discovery/zen/ping/ZenPingService.java b/core/src/main/java/org/elasticsearch/discovery/zen/ping/ZenPingService.java index bd5855666aca2..3a2ddc10cfbc8 100644 --- a/core/src/main/java/org/elasticsearch/discovery/zen/ping/ZenPingService.java +++ b/core/src/main/java/org/elasticsearch/discovery/zen/ping/ZenPingService.java @@ -23,17 +23,15 @@ import org.elasticsearch.common.inject.Inject; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.unit.TimeValue; -import org.elasticsearch.common.util.concurrent.EsRejectedExecutionException; import java.util.ArrayList; import java.util.Collections; import java.util.List; import java.util.Set; import java.util.concurrent.CountDownLatch; -import java.util.concurrent.atomic.AtomicInteger; -import java.util.concurrent.atomic.AtomicReference; +import java.util.concurrent.atomic.AtomicBoolean; -public class ZenPingService extends AbstractLifecycleComponent implements ZenPing { +public class ZenPingService extends AbstractLifecycleComponent { private List zenPings = Collections.emptyList(); @@ -47,7 +45,6 @@ public List zenPings() { return this.zenPings; } - @Override public void setPingContextProvider(PingContextProvider contextProvider) { if (lifecycle.started()) { throw new IllegalStateException("Can't set nodes provider when started"); @@ -78,60 +75,31 @@ protected void doClose() { } } - public PingResponse[] pingAndWait(TimeValue timeout) { - final AtomicReference response = new AtomicReference<>(); - final CountDownLatch latch = new CountDownLatch(1); - ping(new PingListener() { - @Override - public void onPing(PingResponse[] pings) { - response.set(pings); - latch.countDown(); + public ZenPing.PingCollection pingAndWait(TimeValue timeout) { + final ZenPing.PingCollection response = new ZenPing.PingCollection(); + final CountDownLatch latch = new CountDownLatch(zenPings.size()); + for (ZenPing zenPing : zenPings) { + final AtomicBoolean counted = new AtomicBoolean(); + try { + zenPing.ping(pings -> { + response.addPings(pings); + if (counted.compareAndSet(false, true)) { + latch.countDown(); + } + }, timeout); + } catch (Exception ex) { + logger.warn("Ping execution failed", ex); + if (counted.compareAndSet(false, true)) { + latch.countDown(); + } } - }, timeout); + } try { latch.await(); - return response.get(); + return response; } catch (InterruptedException e) { logger.trace("pingAndWait interrupted"); - return null; - } - } - - @Override - public void ping(PingListener listener, TimeValue timeout) { - List zenPings = this.zenPings; - CompoundPingListener compoundPingListener = new CompoundPingListener(listener, zenPings); - for (ZenPing zenPing : zenPings) { - try { - zenPing.ping(compoundPingListener, timeout); - } catch (EsRejectedExecutionException ex) { - logger.debug("Ping execution rejected", ex); - compoundPingListener.onPing(null); - } - } - } - - private static class CompoundPingListener implements PingListener { - - private final PingListener listener; - - private final AtomicInteger counter; - - private PingCollection responses = new PingCollection(); - - private CompoundPingListener(PingListener listener, List zenPings) { - this.listener = listener; - this.counter = new AtomicInteger(zenPings.size()); - } - - @Override - public void onPing(PingResponse[] pings) { - if (pings != null) { - responses.addPings(pings); - } - if (counter.decrementAndGet() == 0) { - listener.onPing(responses.toArray()); - } + return response; } } } diff --git a/core/src/main/java/org/elasticsearch/discovery/zen/ping/unicast/UnicastZenPing.java b/core/src/main/java/org/elasticsearch/discovery/zen/ping/unicast/UnicastZenPing.java index afe4902f887a7..637730c75fd76 100644 --- a/core/src/main/java/org/elasticsearch/discovery/zen/ping/unicast/UnicastZenPing.java +++ b/core/src/main/java/org/elasticsearch/discovery/zen/ping/unicast/UnicastZenPing.java @@ -44,7 +44,7 @@ import org.elasticsearch.common.util.concurrent.ConcurrentCollections; import org.elasticsearch.common.util.concurrent.EsExecutors; import org.elasticsearch.common.util.concurrent.EsRejectedExecutionException; -import org.elasticsearch.discovery.zen.elect.ElectMasterService; +import org.elasticsearch.discovery.zen.ElectMasterService; import org.elasticsearch.discovery.zen.ping.PingContextProvider; import org.elasticsearch.discovery.zen.ping.ZenPing; import org.elasticsearch.threadpool.ThreadPool; @@ -63,6 +63,7 @@ import java.io.IOException; import java.util.ArrayList; import java.util.Arrays; +import java.util.Collection; import java.util.HashSet; import java.util.List; import java.util.Map; @@ -236,8 +237,9 @@ public void clearTemporalResponses() { temporalResponses.clear(); } - public PingResponse[] pingAndWait(TimeValue duration) { - final AtomicReference response = new AtomicReference<>(); + // test only + Collection pingAndWait(TimeValue duration) { + final AtomicReference> response = new AtomicReference<>(); final CountDownLatch latch = new CountDownLatch(1); ping(pings -> { response.set(pings); @@ -273,7 +275,7 @@ protected void doRun() { protected void doRun() throws Exception { sendPings(duration, TimeValue.timeValueMillis(duration.millis() / 2), sendPingsHandler); sendPingsHandler.close(); - listener.onPing(sendPingsHandler.pingCollection().toArray()); + listener.onPing(sendPingsHandler.pingCollection().toList()); for (DiscoveryNode node : sendPingsHandler.nodeToDisconnect) { logger.trace("[{}] disconnecting from {}", sendPingsHandler.id(), node); transportService.disconnectFromNode(node); @@ -576,8 +578,7 @@ public void writeTo(StreamOutput out) throws IOException { } private PingResponse createPingResponse(DiscoveryNodes discoNodes) { - return new PingResponse(discoNodes.getLocalNode(), discoNodes.getMasterNode(), clusterName, - contextProvider.nodeHasJoinedClusterOnce()); + return new PingResponse(discoNodes.getLocalNode(), discoNodes.getMasterNode(), contextProvider.clusterState()); } static class UnicastPingResponse extends TransportResponse { diff --git a/core/src/test/java/org/elasticsearch/action/support/master/IndexingMasterFailoverIT.java b/core/src/test/java/org/elasticsearch/action/support/master/IndexingMasterFailoverIT.java index b30a343547973..87f86c3f596d2 100644 --- a/core/src/test/java/org/elasticsearch/action/support/master/IndexingMasterFailoverIT.java +++ b/core/src/test/java/org/elasticsearch/action/support/master/IndexingMasterFailoverIT.java @@ -23,7 +23,7 @@ import org.elasticsearch.action.index.IndexResponse; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.discovery.DiscoverySettings; -import org.elasticsearch.discovery.zen.elect.ElectMasterService; +import org.elasticsearch.discovery.zen.ElectMasterService; import org.elasticsearch.discovery.zen.fd.FaultDetection; import org.elasticsearch.plugins.Plugin; import org.elasticsearch.test.ESIntegTestCase; diff --git a/core/src/test/java/org/elasticsearch/cluster/MinimumMasterNodesIT.java b/core/src/test/java/org/elasticsearch/cluster/MinimumMasterNodesIT.java index 07c1e5dd8da3e..2e86cb5b896f8 100644 --- a/core/src/test/java/org/elasticsearch/cluster/MinimumMasterNodesIT.java +++ b/core/src/test/java/org/elasticsearch/cluster/MinimumMasterNodesIT.java @@ -29,8 +29,8 @@ import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.discovery.Discovery; import org.elasticsearch.discovery.DiscoverySettings; +import org.elasticsearch.discovery.zen.ElectMasterService; import org.elasticsearch.discovery.zen.ZenDiscovery; -import org.elasticsearch.discovery.zen.elect.ElectMasterService; import org.elasticsearch.index.query.QueryBuilders; import org.elasticsearch.plugins.Plugin; import org.elasticsearch.test.ESIntegTestCase; diff --git a/core/src/test/java/org/elasticsearch/cluster/allocation/AwarenessAllocationIT.java b/core/src/test/java/org/elasticsearch/cluster/allocation/AwarenessAllocationIT.java index 98c7b1a3d67a0..31e841227b866 100644 --- a/core/src/test/java/org/elasticsearch/cluster/allocation/AwarenessAllocationIT.java +++ b/core/src/test/java/org/elasticsearch/cluster/allocation/AwarenessAllocationIT.java @@ -30,8 +30,8 @@ import org.elasticsearch.common.Priority; import org.elasticsearch.common.logging.Loggers; import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.discovery.zen.ElectMasterService; import org.elasticsearch.discovery.zen.ZenDiscovery; -import org.elasticsearch.discovery.zen.elect.ElectMasterService; import org.elasticsearch.test.ESIntegTestCase; import org.elasticsearch.test.ESIntegTestCase.ClusterScope; diff --git a/core/src/test/java/org/elasticsearch/discovery/DiscoveryModuleTests.java b/core/src/test/java/org/elasticsearch/discovery/DiscoveryModuleTests.java index 057b54c7a078f..3b436f4541097 100644 --- a/core/src/test/java/org/elasticsearch/discovery/DiscoveryModuleTests.java +++ b/core/src/test/java/org/elasticsearch/discovery/DiscoveryModuleTests.java @@ -18,13 +18,10 @@ */ package org.elasticsearch.discovery; -import org.elasticsearch.Version; import org.elasticsearch.common.inject.ModuleTestCase; import org.elasticsearch.common.settings.Settings; -import org.elasticsearch.discovery.local.LocalDiscovery; +import org.elasticsearch.discovery.zen.ElectMasterService; import org.elasticsearch.discovery.zen.ZenDiscovery; -import org.elasticsearch.discovery.zen.elect.ElectMasterService; -import org.elasticsearch.node.Node; import org.elasticsearch.test.NoopDiscovery; /** diff --git a/core/src/test/java/org/elasticsearch/discovery/DiscoveryWithServiceDisruptionsIT.java b/core/src/test/java/org/elasticsearch/discovery/DiscoveryWithServiceDisruptionsIT.java index f04db89796c38..b78b1d923b9f4 100644 --- a/core/src/test/java/org/elasticsearch/discovery/DiscoveryWithServiceDisruptionsIT.java +++ b/core/src/test/java/org/elasticsearch/discovery/DiscoveryWithServiceDisruptionsIT.java @@ -49,8 +49,8 @@ import org.elasticsearch.common.collect.Tuple; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.unit.TimeValue; +import org.elasticsearch.discovery.zen.ElectMasterService; import org.elasticsearch.discovery.zen.ZenDiscovery; -import org.elasticsearch.discovery.zen.elect.ElectMasterService; import org.elasticsearch.discovery.zen.fd.FaultDetection; import org.elasticsearch.discovery.zen.membership.MembershipAction; import org.elasticsearch.discovery.zen.ping.ZenPing; @@ -110,9 +110,12 @@ import java.util.concurrent.atomic.AtomicReference; import java.util.stream.Collectors; +import static org.elasticsearch.cluster.metadata.IndexMetaData.INDEX_NUMBER_OF_REPLICAS_SETTING; +import static org.elasticsearch.cluster.metadata.IndexMetaData.INDEX_NUMBER_OF_SHARDS_SETTING; import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder; import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked; import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.greaterThanOrEqualTo; import static org.hamcrest.Matchers.is; import static org.hamcrest.Matchers.not; import static org.hamcrest.Matchers.nullValue; @@ -164,7 +167,7 @@ private List startCluster(int numberOfNodes, int minimumMasterNode) thro private List startCluster(int numberOfNodes, int minimumMasterNode, @Nullable int[] unicastHostsOrdinals) throws ExecutionException, InterruptedException { - configureUnicastCluster(numberOfNodes, unicastHostsOrdinals, minimumMasterNode); + configureCluster(numberOfNodes, unicastHostsOrdinals, minimumMasterNode); List nodes = internalCluster().startNodesAsync(numberOfNodes).get(); ensureStableCluster(numberOfNodes); @@ -196,15 +199,15 @@ protected Collection> nodePlugins() { return Arrays.asList(MockTransportService.TestPlugin.class); } - private void configureUnicastCluster( + private void configureCluster( int numberOfNodes, @Nullable int[] unicastHostsOrdinals, int minimumMasterNode ) throws ExecutionException, InterruptedException { - configureUnicastCluster(DEFAULT_SETTINGS, numberOfNodes, unicastHostsOrdinals, minimumMasterNode); + configureCluster(DEFAULT_SETTINGS, numberOfNodes, unicastHostsOrdinals, minimumMasterNode); } - private void configureUnicastCluster( + private void configureCluster( Settings settings, int numberOfNodes, @Nullable int[] unicastHostsOrdinals, @@ -1031,7 +1034,7 @@ public void onFailure(Exception e) { } public void testClusterFormingWithASlowNode() throws Exception { - configureUnicastCluster(3, null, 2); + configureCluster(3, null, 2); SlowClusterStateProcessing disruption = new SlowClusterStateProcessing(random(), 0, 0, 1000, 2000); @@ -1094,7 +1097,7 @@ public void testNodeNotReachableFromMaster() throws Exception { */ public void testSearchWithRelocationAndSlowClusterStateProcessing() throws Exception { // don't use DEFAULT settings (which can cause node disconnects on a slow CI machine) - configureUnicastCluster(Settings.EMPTY, 3, null, 1); + configureCluster(Settings.EMPTY, 3, null, 1); InternalTestCluster.Async masterNodeFuture = internalCluster().startMasterOnlyNodeAsync(); InternalTestCluster.Async node_1Future = internalCluster().startDataOnlyNodeAsync(); @@ -1135,7 +1138,7 @@ public void testSearchWithRelocationAndSlowClusterStateProcessing() throws Excep public void testIndexImportedFromDataOnlyNodesIfMasterLostDataFolder() throws Exception { // test for https://github.com/elastic/elasticsearch/issues/8823 - configureUnicastCluster(2, null, 1); + configureCluster(2, null, 1); String masterNode = internalCluster().startMasterOnlyNode(Settings.EMPTY); internalCluster().startDataOnlyNode(Settings.EMPTY); @@ -1166,7 +1169,7 @@ public void testIndicesDeleted() throws Exception { .put(DiscoverySettings.COMMIT_TIMEOUT_SETTING.getKey(), "30s") // wait till cluster state is committed .build(); final String idxName = "test"; - configureUnicastCluster(settings, 3, null, 2); + configureCluster(settings, 3, null, 2); InternalTestCluster.Async> masterNodes = internalCluster().startMasterOnlyNodesAsync(2); InternalTestCluster.Async dataNode = internalCluster().startDataOnlyNodeAsync(); dataNode.get(); @@ -1195,6 +1198,61 @@ public void testIndicesDeleted() throws Exception { assertFalse(client().admin().indices().prepareExists(idxName).get().isExists()); } + public void testElectMasterWithLatestVersion() throws Exception { + configureCluster(3, null, 2); + final Set nodes = new HashSet<>(internalCluster().startNodesAsync(3).get()); + ensureStableCluster(3); + ServiceDisruptionScheme isolateAllNodes = new NetworkDisruption(new NetworkDisruption.IsolateAllNodes(nodes), new NetworkDisconnect()); + internalCluster().setDisruptionScheme(isolateAllNodes); + + logger.info("--> forcing a complete election to make sure \"preferred\" master is elected"); + isolateAllNodes.startDisrupting(); + for (String node: nodes) { + assertNoMaster(node); + } + isolateAllNodes.stopDisrupting(); + ensureStableCluster(3); + final String preferredMasterName = internalCluster().getMasterName(); + final DiscoveryNode preferredMaster = internalCluster().clusterService(preferredMasterName).localNode(); + for (String node: nodes) { + DiscoveryNode discoveryNode = internalCluster().clusterService(node).localNode(); + assertThat(discoveryNode.getId(), greaterThanOrEqualTo(preferredMaster.getId())); + } + + logger.info("--> preferred master is {}", preferredMaster); + final Set nonPreferredNodes = new HashSet<>(nodes); + nonPreferredNodes.remove(preferredMasterName); + final ServiceDisruptionScheme isolatePreferredMaster = + new NetworkDisruption( + new NetworkDisruption.TwoPartitions( + Collections.singleton(preferredMasterName), nonPreferredNodes), + new NetworkDisconnect()); + internalCluster().setDisruptionScheme(isolatePreferredMaster); + isolatePreferredMaster.startDisrupting(); + + assertAcked(client(randomFrom(nonPreferredNodes)).admin().indices().prepareCreate("test").setSettings( + INDEX_NUMBER_OF_SHARDS_SETTING.getKey(), 1, + INDEX_NUMBER_OF_REPLICAS_SETTING.getKey(), 0 + )); + + internalCluster().clearDisruptionScheme(false); + internalCluster().setDisruptionScheme(isolateAllNodes); + + logger.info("--> forcing a complete election again"); + isolateAllNodes.startDisrupting(); + for (String node: nodes) { + assertNoMaster(node); + } + + isolateAllNodes.stopDisrupting(); + + final ClusterState state = client().admin().cluster().prepareState().get().getState(); + if (state.metaData().hasIndex("test") == false) { + fail("index 'test' was lost. current cluster state: " + state.prettyPrint()); + } + + } + protected NetworkDisruption addRandomDisruptionType(TwoPartitions partitions) { final NetworkLinkDisruptionType disruptionType; if (randomBoolean()) { diff --git a/core/src/test/java/org/elasticsearch/discovery/zen/ElectMasterServiceTests.java b/core/src/test/java/org/elasticsearch/discovery/zen/ElectMasterServiceTests.java index b31b0cbaa55e0..737607df6be3c 100644 --- a/core/src/test/java/org/elasticsearch/discovery/zen/ElectMasterServiceTests.java +++ b/core/src/test/java/org/elasticsearch/discovery/zen/ElectMasterServiceTests.java @@ -23,7 +23,7 @@ import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.transport.LocalTransportAddress; -import org.elasticsearch.discovery.zen.elect.ElectMasterService; +import org.elasticsearch.discovery.zen.ElectMasterService.MasterCandidate; import org.elasticsearch.test.ESTestCase; import java.util.ArrayList; @@ -31,6 +31,10 @@ import java.util.HashSet; import java.util.List; import java.util.Set; +import java.util.stream.Collectors; + +import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.greaterThan; public class ElectMasterServiceTests extends ESTestCase { @@ -55,6 +59,22 @@ List generateRandomNodes() { return nodes; } + List generateRandomCandidates() { + int count = scaledRandomIntBetween(1, 100); + ArrayList candidates = new ArrayList<>(count); + for (int i = 0; i < count; i++) { + Set roles = new HashSet<>(); + roles.add(DiscoveryNode.Role.MASTER); + DiscoveryNode node = new DiscoveryNode("n_" + i, "n_" + i, LocalTransportAddress.buildUnique(), Collections.emptyMap(), + roles, Version.CURRENT); + candidates.add(new MasterCandidate(node, randomBoolean() ? MasterCandidate.UNRECOVERED_CLUSTER_VERSION : randomPositiveLong())); + } + + Collections.shuffle(candidates, random()); + return candidates; + } + + public void testSortByMasterLikelihood() { List nodes = generateRandomNodes(); List sortedNodes = electMasterService().sortByMasterLikelihood(nodes); @@ -69,36 +89,53 @@ public void testSortByMasterLikelihood() { } prevNode = node; } + } + public void testTieBreakActiveMasters() { + List nodes = generateRandomCandidates().stream().map(MasterCandidate::getNode).collect(Collectors.toList()); + DiscoveryNode bestMaster = electMasterService().tieBreakActiveMasters(nodes); + for (DiscoveryNode node: nodes) { + if (node.equals(bestMaster) == false) { + assertTrue(bestMaster.getId().compareTo(node.getId()) < 0); + } + } } - public void testElectMaster() { - List nodes = generateRandomNodes(); + public void testHasEnoughNodes() { + List nodes = rarely() ? Collections.emptyList() : generateRandomNodes(); ElectMasterService service = electMasterService(); - int min_master_nodes = randomIntBetween(0, nodes.size()); - service.minimumMasterNodes(min_master_nodes); + int masterNodes = (int) nodes.stream().filter(DiscoveryNode::isMasterNode).count(); + service.minimumMasterNodes(randomIntBetween(-1, masterNodes)); + assertThat(service.hasEnoughMasterNodes(nodes), equalTo(masterNodes > 0)); + service.minimumMasterNodes(masterNodes + 1 + randomIntBetween(0, nodes.size())); + assertFalse(service.hasEnoughMasterNodes(nodes)); + } - int master_nodes = 0; - for (DiscoveryNode node : nodes) { - if (node.isMasterNode()) { - master_nodes++; - } - } - DiscoveryNode master = null; - if (service.hasEnoughMasterNodes(nodes)) { - master = service.electMaster(nodes); - } + public void testHasEnoughCandidates() { + List candidates = rarely() ? Collections.emptyList() : generateRandomCandidates(); + ElectMasterService service = electMasterService(); + service.minimumMasterNodes(randomIntBetween(-1, candidates.size())); + assertThat(service.hasEnoughCandidates(candidates), equalTo(candidates.size() > 0)); + service.minimumMasterNodes(candidates.size() + 1 + randomIntBetween(0, candidates.size())); + assertFalse(service.hasEnoughCandidates(candidates)); + } - if (master_nodes == 0) { - assertNull(master); - } else if (min_master_nodes > 0 && master_nodes < min_master_nodes) { - assertNull(master); - } else { - assertNotNull(master); - for (DiscoveryNode node : nodes) { - if (node.isMasterNode()) { - assertTrue(master.getId().compareTo(node.getId()) <= 0); - } + public void testElectMaster() { + List candidates = generateRandomCandidates(); + ElectMasterService service = electMasterService(); + int minMasterNodes = randomIntBetween(0, candidates.size()); + service.minimumMasterNodes(minMasterNodes); + MasterCandidate master = service.electMaster(candidates); + assertNotNull(master); + for (MasterCandidate candidate : candidates) { + if (candidate.getNode().equals(master.getNode())) { + // nothing much to test here + } else if (candidate.getClusterStateVersion() == master.getClusterStateVersion()) { + assertThat("candidate " + candidate + " has a lower or equal id than master " + master, candidate.getNode().getId(), + greaterThan(master.getNode().getId())); + } else { + assertThat("candidate " + master + " has a higher cluster state version than candidate " + candidate, + master.getClusterStateVersion(), greaterThan(candidate.getClusterStateVersion())); } } } diff --git a/core/src/test/java/org/elasticsearch/discovery/zen/NodeJoinControllerTests.java b/core/src/test/java/org/elasticsearch/discovery/zen/NodeJoinControllerTests.java index 0acba3c420f27..ca75ea960ad1d 100644 --- a/core/src/test/java/org/elasticsearch/discovery/zen/NodeJoinControllerTests.java +++ b/core/src/test/java/org/elasticsearch/discovery/zen/NodeJoinControllerTests.java @@ -43,7 +43,6 @@ import org.elasticsearch.common.util.concurrent.AbstractRunnable; import org.elasticsearch.common.util.concurrent.BaseFuture; import org.elasticsearch.discovery.DiscoverySettings; -import org.elasticsearch.discovery.zen.elect.ElectMasterService; import org.elasticsearch.discovery.zen.membership.MembershipAction; import org.elasticsearch.index.shard.ShardId; import org.elasticsearch.test.ESTestCase; diff --git a/core/src/test/java/org/elasticsearch/discovery/zen/NodeRemovalClusterStateTaskExecutorTests.java b/core/src/test/java/org/elasticsearch/discovery/zen/NodeRemovalClusterStateTaskExecutorTests.java index 35335a8ede4ce..1e8954330cd26 100644 --- a/core/src/test/java/org/elasticsearch/discovery/zen/NodeRemovalClusterStateTaskExecutorTests.java +++ b/core/src/test/java/org/elasticsearch/discovery/zen/NodeRemovalClusterStateTaskExecutorTests.java @@ -28,7 +28,6 @@ import org.elasticsearch.cluster.routing.allocation.AllocationService; import org.elasticsearch.cluster.routing.allocation.RoutingAllocation; import org.elasticsearch.common.transport.LocalTransportAddress; -import org.elasticsearch.discovery.zen.elect.ElectMasterService; import org.elasticsearch.test.ESTestCase; import java.util.ArrayList; diff --git a/core/src/test/java/org/elasticsearch/discovery/zen/ZenDiscoveryIT.java b/core/src/test/java/org/elasticsearch/discovery/zen/ZenDiscoveryIT.java index b7aaf2795824f..d9a8c9be7f497 100644 --- a/core/src/test/java/org/elasticsearch/discovery/zen/ZenDiscoveryIT.java +++ b/core/src/test/java/org/elasticsearch/discovery/zen/ZenDiscoveryIT.java @@ -34,14 +34,12 @@ import org.elasticsearch.common.Priority; import org.elasticsearch.common.bytes.BytesReference; import org.elasticsearch.common.settings.Settings; -import org.elasticsearch.common.transport.InetSocketTransportAddress; import org.elasticsearch.common.transport.LocalTransportAddress; import org.elasticsearch.common.xcontent.ToXContent; import org.elasticsearch.common.xcontent.XContentBuilder; import org.elasticsearch.common.xcontent.XContentFactory; import org.elasticsearch.discovery.Discovery; import org.elasticsearch.discovery.DiscoveryStats; -import org.elasticsearch.discovery.zen.elect.ElectMasterService; import org.elasticsearch.discovery.zen.fd.FaultDetection; import org.elasticsearch.discovery.zen.membership.MembershipAction; import org.elasticsearch.discovery.zen.publish.PublishClusterStateAction; @@ -60,10 +58,8 @@ import org.junit.Before; import java.io.IOException; -import java.net.InetAddress; import java.net.UnknownHostException; import java.util.ArrayList; -import java.util.Collections; import java.util.EnumSet; import java.util.List; import java.util.concurrent.CountDownLatch; @@ -77,8 +73,6 @@ import static org.hamcrest.Matchers.is; import static org.hamcrest.Matchers.not; import static org.hamcrest.Matchers.notNullValue; -import static org.hamcrest.Matchers.nullValue; -import static org.hamcrest.Matchers.sameInstance; @ESIntegTestCase.ClusterScope(scope = ESIntegTestCase.Scope.TEST, numDataNodes = 0, numClientNodes = 0) @ESIntegTestCase.SuppressLocalMode @@ -293,44 +287,6 @@ public EnumSet context() { } } - public void testHandleNodeJoin_incompatibleMinVersion() throws UnknownHostException { - Settings nodeSettings = Settings.builder() - .put("discovery.type", "zen") // <-- To override the local setting if set externally - .build(); - String nodeName = internalCluster().startNode(nodeSettings); - ZenDiscovery zenDiscovery = (ZenDiscovery) internalCluster().getInstance(Discovery.class, nodeName); - ClusterService clusterService = internalCluster().getInstance(ClusterService.class, nodeName); - DiscoveryNode node = new DiscoveryNode("_node_id", new InetSocketTransportAddress(InetAddress.getByName("0.0.0.0"), 0), - emptyMap(), emptySet(), previousMajorVersion); - final AtomicReference holder = new AtomicReference<>(); - zenDiscovery.handleJoinRequest(node, clusterService.state(), new MembershipAction.JoinCallback() { - @Override - public void onSuccess() { - } - - @Override - public void onFailure(Exception e) { - holder.set((IllegalStateException) e); - } - }); - - assertThat(holder.get(), notNullValue()); - assertThat(holder.get().getMessage(), equalTo("Can't handle join request from a node with a version [" + previousMajorVersion - + "] that is lower than the minimum compatible version [" + Version.CURRENT.minimumCompatibilityVersion() + "]")); - } - - public void testJoinElectedMaster_incompatibleMinVersion() { - ElectMasterService electMasterService = new ElectMasterService(Settings.EMPTY); - - DiscoveryNode node = new DiscoveryNode("_node_id", new LocalTransportAddress("_id"), emptyMap(), - Collections.singleton(DiscoveryNode.Role.MASTER), Version.CURRENT); - assertThat(electMasterService.electMaster(Collections.singletonList(node)), sameInstance(node)); - node = new DiscoveryNode("_node_id", new LocalTransportAddress("_id"), emptyMap(), emptySet(), previousMajorVersion); - assertThat("Can't join master because version " + previousMajorVersion - + " is lower than the minimum compatable version " + Version.CURRENT + " can support", - electMasterService.electMaster(Collections.singletonList(node)), nullValue()); - } - public void testDiscoveryStats() throws IOException { String expectedStatsJsonResponse = "{\n" + " \"discovery\" : {\n" + diff --git a/core/src/test/java/org/elasticsearch/discovery/zen/ZenDiscoveryUnitTests.java b/core/src/test/java/org/elasticsearch/discovery/zen/ZenDiscoveryUnitTests.java index b9f65016048a0..a7291dc37366e 100644 --- a/core/src/test/java/org/elasticsearch/discovery/zen/ZenDiscoveryUnitTests.java +++ b/core/src/test/java/org/elasticsearch/discovery/zen/ZenDiscoveryUnitTests.java @@ -33,7 +33,6 @@ import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.transport.LocalTransportAddress; import org.elasticsearch.discovery.Discovery; -import org.elasticsearch.discovery.zen.elect.ElectMasterService; import org.elasticsearch.discovery.zen.ping.ZenPing; import org.elasticsearch.discovery.zen.ping.ZenPingService; import org.elasticsearch.discovery.zen.publish.PublishClusterStateActionTests.AssertingAckListener; @@ -55,8 +54,8 @@ import static java.util.Collections.emptyMap; import static java.util.Collections.emptySet; +import static org.elasticsearch.discovery.zen.ElectMasterService.DISCOVERY_ZEN_MINIMUM_MASTER_NODES_SETTING; import static org.elasticsearch.discovery.zen.ZenDiscovery.shouldIgnoreOrRejectNewClusterState; -import static org.elasticsearch.discovery.zen.elect.ElectMasterService.DISCOVERY_ZEN_MINIMUM_MASTER_NODES_SETTING; import static org.elasticsearch.test.ClusterServiceUtils.createClusterService; import static org.elasticsearch.test.ClusterServiceUtils.setState; import static org.hamcrest.Matchers.containsString; @@ -128,7 +127,7 @@ public void testFilterNonMasterPingResponse() { Set roles = new HashSet<>(randomSubsetOf(Arrays.asList(Role.values()))); DiscoveryNode node = new DiscoveryNode("node_" + i, "id_" + i, LocalTransportAddress.buildUnique(), Collections.emptyMap(), roles, Version.CURRENT); - responses.add(new ZenPing.PingResponse(node, randomBoolean() ? null : node, new ClusterName("test"), randomBoolean())); + responses.add(new ZenPing.PingResponse(node, randomBoolean() ? null : node, new ClusterName("test"), randomLong())); allNodes.add(node); if (node.isMasterNode()) { masterNodes.add(node); @@ -136,8 +135,7 @@ public void testFilterNonMasterPingResponse() { } boolean ignore = randomBoolean(); - List filtered = ZenDiscovery.filterPingResponses( - responses.toArray(new ZenPing.PingResponse[responses.size()]), ignore, logger); + List filtered = ZenDiscovery.filterPingResponses(responses, ignore, logger); final List filteredNodes = filtered.stream().map(ZenPing.PingResponse::node).collect(Collectors.toList()); if (ignore) { assertThat(filteredNodes, equalTo(masterNodes)); diff --git a/core/src/test/java/org/elasticsearch/discovery/zen/ZenPingTests.java b/core/src/test/java/org/elasticsearch/discovery/zen/ZenPingTests.java index 72674f44e3dc9..2275756e8eeea 100644 --- a/core/src/test/java/org/elasticsearch/discovery/zen/ZenPingTests.java +++ b/core/src/test/java/org/elasticsearch/discovery/zen/ZenPingTests.java @@ -29,6 +29,7 @@ import java.util.ArrayList; import java.util.Collections; +import java.util.List; import static java.util.Collections.emptyMap; import static java.util.Collections.emptySet; @@ -39,7 +40,7 @@ public void testPingCollection() { DiscoveryNode[] nodes = new DiscoveryNode[randomIntBetween(1, 30)]; long maxIdPerNode[] = new long[nodes.length]; DiscoveryNode masterPerNode[] = new DiscoveryNode[nodes.length]; - boolean hasJoinedOncePerNode[] = new boolean[nodes.length]; + long clusterStateVersionPerNode[] = new long[nodes.length]; ArrayList pings = new ArrayList<>(); for (int i = 0; i < nodes.length; i++) { nodes[i] = new DiscoveryNode("" + i, LocalTransportAddress.buildUnique(), emptyMap(), emptySet(), Version.CURRENT); @@ -51,9 +52,9 @@ public void testPingCollection() { if (randomBoolean()) { masterNode = nodes[randomInt(nodes.length - 1)]; } - boolean hasJoinedOnce = randomBoolean(); + long clusterStateVersion = randomLong(); ZenPing.PingResponse ping = new ZenPing.PingResponse(nodes[node], masterNode, ClusterName.CLUSTER_NAME_SETTING. - getDefault(Settings.EMPTY), hasJoinedOnce); + getDefault(Settings.EMPTY), clusterStateVersion); if (rarely()) { // ignore some pings continue; @@ -61,7 +62,7 @@ public void testPingCollection() { // update max ping info maxIdPerNode[node] = ping.id(); masterPerNode[node] = masterNode; - hasJoinedOncePerNode[node] = hasJoinedOnce; + clusterStateVersionPerNode[node] = clusterStateVersion; pings.add(ping); } @@ -69,15 +70,15 @@ public void testPingCollection() { Collections.shuffle(pings, random()); ZenPing.PingCollection collection = new ZenPing.PingCollection(); - collection.addPings(pings.toArray(new ZenPing.PingResponse[pings.size()])); + collection.addPings(pings); - ZenPing.PingResponse[] aggregate = collection.toArray(); + List aggregate = collection.toList(); for (ZenPing.PingResponse ping : aggregate) { int nodeId = Integer.parseInt(ping.node().getId()); assertThat(maxIdPerNode[nodeId], equalTo(ping.id())); assertThat(masterPerNode[nodeId], equalTo(ping.master())); - assertThat(hasJoinedOncePerNode[nodeId], equalTo(ping.hasJoinedOnce())); + assertThat(clusterStateVersionPerNode[nodeId], equalTo(ping.getClusterStateVersion())); maxIdPerNode[nodeId] = -1; // mark as seen } diff --git a/core/src/test/java/org/elasticsearch/discovery/zen/ping/unicast/UnicastZenPingIT.java b/core/src/test/java/org/elasticsearch/discovery/zen/ping/unicast/UnicastZenPingTests.java similarity index 85% rename from core/src/test/java/org/elasticsearch/discovery/zen/ping/unicast/UnicastZenPingIT.java rename to core/src/test/java/org/elasticsearch/discovery/zen/ping/unicast/UnicastZenPingTests.java index ea5779c33bbab..e04b0b52d81cc 100644 --- a/core/src/test/java/org/elasticsearch/discovery/zen/ping/unicast/UnicastZenPingIT.java +++ b/core/src/test/java/org/elasticsearch/discovery/zen/ping/unicast/UnicastZenPingTests.java @@ -20,6 +20,9 @@ package org.elasticsearch.discovery.zen.ping.unicast; import org.elasticsearch.Version; +import org.elasticsearch.cluster.ClusterName; +import org.elasticsearch.cluster.ClusterState; +import org.elasticsearch.cluster.block.ClusterBlocks; import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.cluster.node.DiscoveryNodes; import org.elasticsearch.common.io.stream.NamedWriteableRegistry; @@ -31,7 +34,7 @@ import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.common.util.BigArrays; import org.elasticsearch.common.util.concurrent.ConcurrentCollections; -import org.elasticsearch.discovery.zen.elect.ElectMasterService; +import org.elasticsearch.discovery.zen.ElectMasterService; import org.elasticsearch.discovery.zen.ping.PingContextProvider; import org.elasticsearch.discovery.zen.ping.ZenPing; import org.elasticsearch.indices.breaker.NoneCircuitBreakerService; @@ -45,16 +48,18 @@ import org.elasticsearch.transport.TransportSettings; import java.net.InetSocketAddress; +import java.util.Collection; import java.util.Collections; import java.util.concurrent.ConcurrentMap; import java.util.concurrent.atomic.AtomicInteger; import static java.util.Collections.emptyMap; import static java.util.Collections.emptySet; +import static org.elasticsearch.gateway.GatewayService.STATE_NOT_RECOVERED_BLOCK; import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.greaterThan; -public class UnicastZenPingIT extends ESTestCase { +public class UnicastZenPingTests extends ESTestCase { public void testSimplePings() throws InterruptedException { int startPort = 11000 + randomIntBetween(0, 1000); int endPort = startPort + 10; @@ -78,6 +83,8 @@ public void testSimplePings() throws InterruptedException { Version versionD = VersionUtils.randomVersionBetween(random(), previousVersion.minimumCompatibilityVersion(), previousVersion); NetworkHandle handleD = startServices(settingsMismatch, threadPool, networkService, "UZP_D", versionD); + final ClusterState state = ClusterState.builder(new ClusterName("test")).version(randomPositiveLong()).build(); + Settings hostsSettings = Settings.builder() .putArray("discovery.zen.ping.unicast.hosts", NetworkAddress.format(new InetSocketAddress(handleA.address.address().getAddress(), handleA.address.address().getPort())), @@ -96,8 +103,8 @@ public DiscoveryNodes nodes() { } @Override - public boolean nodeHasJoinedClusterOnce() { - return false; + public ClusterState clusterState() { + return ClusterState.builder(state).blocks(ClusterBlocks.builder().addGlobalBlock(STATE_NOT_RECOVERED_BLOCK)).build(); } }); zenPingA.start(); @@ -110,8 +117,8 @@ public DiscoveryNodes nodes() { } @Override - public boolean nodeHasJoinedClusterOnce() { - return true; + public ClusterState clusterState() { + return state; } }); zenPingB.start(); @@ -130,8 +137,8 @@ public DiscoveryNodes nodes() { } @Override - public boolean nodeHasJoinedClusterOnce() { - return false; + public ClusterState clusterState() { + return state; } }); zenPingC.start(); @@ -144,36 +151,38 @@ public DiscoveryNodes nodes() { } @Override - public boolean nodeHasJoinedClusterOnce() { - return false; + public ClusterState clusterState() { + return state; } }); zenPingD.start(); try { logger.info("ping from UZP_A"); - ZenPing.PingResponse[] pingResponses = zenPingA.pingAndWait(TimeValue.timeValueSeconds(1)); - assertThat(pingResponses.length, equalTo(1)); - assertThat(pingResponses[0].node().getId(), equalTo("UZP_B")); - assertTrue(pingResponses[0].hasJoinedOnce()); + Collection pingResponses = zenPingA.pingAndWait(TimeValue.timeValueSeconds(1)); + assertThat(pingResponses.size(), equalTo(1)); + ZenPing.PingResponse ping = pingResponses.iterator().next(); + assertThat(ping.node().getId(), equalTo("UZP_B")); + assertThat(ping.getClusterStateVersion(), equalTo(state.version())); assertCounters(handleA, handleA, handleB, handleC, handleD); // ping again, this time from B, logger.info("ping from UZP_B"); pingResponses = zenPingB.pingAndWait(TimeValue.timeValueSeconds(1)); - assertThat(pingResponses.length, equalTo(1)); - assertThat(pingResponses[0].node().getId(), equalTo("UZP_A")); - assertFalse(pingResponses[0].hasJoinedOnce()); + assertThat(pingResponses.size(), equalTo(1)); + ping = pingResponses.iterator().next(); + assertThat(ping.node().getId(), equalTo("UZP_A")); + assertThat(ping.getClusterStateVersion(), equalTo(ElectMasterService.MasterCandidate.UNRECOVERED_CLUSTER_VERSION)); assertCounters(handleB, handleA, handleB, handleC, handleD); logger.info("ping from UZP_C"); pingResponses = zenPingC.pingAndWait(TimeValue.timeValueSeconds(1)); - assertThat(pingResponses.length, equalTo(0)); + assertThat(pingResponses.size(), equalTo(0)); assertCounters(handleC, handleA, handleB, handleC, handleD); logger.info("ping from UZP_D"); pingResponses = zenPingD.pingAndWait(TimeValue.timeValueSeconds(1)); - assertThat(pingResponses.length, equalTo(0)); + assertThat(pingResponses.size(), equalTo(0)); assertCounters(handleD, handleA, handleB, handleC, handleD); } finally { zenPingA.close(); diff --git a/core/src/test/java/org/elasticsearch/gateway/GatewayIndexStateIT.java b/core/src/test/java/org/elasticsearch/gateway/GatewayIndexStateIT.java index 9b340fd863a96..a998b56f64090 100644 --- a/core/src/test/java/org/elasticsearch/gateway/GatewayIndexStateIT.java +++ b/core/src/test/java/org/elasticsearch/gateway/GatewayIndexStateIT.java @@ -38,7 +38,7 @@ import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.common.xcontent.XContentFactory; -import org.elasticsearch.discovery.zen.elect.ElectMasterService; +import org.elasticsearch.discovery.zen.ElectMasterService; import org.elasticsearch.env.Environment; import org.elasticsearch.env.NodeEnvironment; import org.elasticsearch.index.mapper.MapperParsingException; diff --git a/core/src/test/java/org/elasticsearch/gateway/QuorumGatewayIT.java b/core/src/test/java/org/elasticsearch/gateway/QuorumGatewayIT.java index a1d16bfd884fd..c820bccae51ad 100644 --- a/core/src/test/java/org/elasticsearch/gateway/QuorumGatewayIT.java +++ b/core/src/test/java/org/elasticsearch/gateway/QuorumGatewayIT.java @@ -23,7 +23,7 @@ import org.elasticsearch.client.Client; import org.elasticsearch.cluster.health.ClusterHealthStatus; import org.elasticsearch.common.settings.Settings; -import org.elasticsearch.discovery.zen.elect.ElectMasterService; +import org.elasticsearch.discovery.zen.ElectMasterService; import org.elasticsearch.test.ESIntegTestCase; import org.elasticsearch.test.ESIntegTestCase.ClusterScope; import org.elasticsearch.test.ESIntegTestCase.Scope; diff --git a/core/src/test/java/org/elasticsearch/snapshots/DedicatedClusterSnapshotRestoreIT.java b/core/src/test/java/org/elasticsearch/snapshots/DedicatedClusterSnapshotRestoreIT.java index f512f1da5387b..3a045c80ac807 100644 --- a/core/src/test/java/org/elasticsearch/snapshots/DedicatedClusterSnapshotRestoreIT.java +++ b/core/src/test/java/org/elasticsearch/snapshots/DedicatedClusterSnapshotRestoreIT.java @@ -44,8 +44,8 @@ import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.unit.ByteSizeUnit; import org.elasticsearch.common.unit.TimeValue; +import org.elasticsearch.discovery.zen.ElectMasterService; import org.elasticsearch.discovery.zen.ZenDiscovery; -import org.elasticsearch.discovery.zen.elect.ElectMasterService; import org.elasticsearch.index.store.IndexStore; import org.elasticsearch.indices.recovery.RecoveryState; import org.elasticsearch.indices.ttl.IndicesTTLService; diff --git a/docs/resiliency/index.asciidoc b/docs/resiliency/index.asciidoc index 258b880f88f99..bb2f384bc17e7 100644 --- a/docs/resiliency/index.asciidoc +++ b/docs/resiliency/index.asciidoc @@ -63,6 +63,22 @@ to create new scenarios. We have currently ported all published Jepsen scenarios framework. As the Jepsen tests evolve, we will continue porting new scenarios that are not covered yet. We are committed to investigating all new scenarios and will report issues that we find on this page and in our GitHub repository. +[float] +=== Repeated network partitions can cause cluster state updates to be lost (STATUS: ONGOING) + +During a networking partition, cluster state updates (like mapping changes or shard assignments) +are committed if a majority of the master-eligible nodes received the update correctly. This means that the current master has access +to enough nodes in the cluster to continue to operate correctly. When the network partition heals, the isolated nodes catch +up with the current state and receive the previously missed changes. However, if a second partition happens while the cluster +is still recovering from the previous one *and* the old master falls on the minority side, it may be that a new master is elected +which has not yet catch up. If that happens, cluster state updates can be lost. + +This problem is mostly fixed by {GIT}20384[#20384] (v5.0.0), which takes committed cluster state updates into account during master +election. This considerably reduces the chance of this rare problem occurring but does not fully mitigate it. If the second partition +happens concurrently with a cluster state update and blocks the cluster state commit message from reaching a majority of nodes, it may be +that the in flight update will be lost. If the now-isolated master can still acknowledge the cluster state update to the client this +will amount to the loss of an acknowledged change. Fixing that last scenario needs considerable work and is currently targeted at (v6.0.0). + [float] === Better request retry mechanism when nodes are disconnected (STATUS: ONGOING) diff --git a/test/framework/src/main/java/org/elasticsearch/test/ESIntegTestCase.java b/test/framework/src/main/java/org/elasticsearch/test/ESIntegTestCase.java index 833c27f9c551b..59669ba8478ee 100644 --- a/test/framework/src/main/java/org/elasticsearch/test/ESIntegTestCase.java +++ b/test/framework/src/main/java/org/elasticsearch/test/ESIntegTestCase.java @@ -100,7 +100,7 @@ import org.elasticsearch.common.xcontent.support.XContentMapValues; import org.elasticsearch.discovery.Discovery; import org.elasticsearch.discovery.zen.ZenDiscovery; -import org.elasticsearch.discovery.zen.elect.ElectMasterService; +import org.elasticsearch.discovery.zen.ElectMasterService; import org.elasticsearch.env.Environment; import org.elasticsearch.index.Index; import org.elasticsearch.index.IndexModule; @@ -368,14 +368,14 @@ public void randomIndexTemplate() throws IOException { // TODO move settings for random directory etc here into the index based randomized settings. if (cluster().size() > 0) { Settings.Builder randomSettingsBuilder = - setRandomIndexSettings(random(), Settings.builder()); + setRandomIndexSettings(random(), Settings.builder()); if (isInternalCluster()) { // this is only used by mock plugins and if the cluster is not internal we just can't set it randomSettingsBuilder.put(INDEX_TEST_SEED_SETTING.getKey(), random().nextLong()); } randomSettingsBuilder.put(SETTING_NUMBER_OF_SHARDS, numberOfShards()) - .put(SETTING_NUMBER_OF_REPLICAS, numberOfReplicas()); + .put(SETTING_NUMBER_OF_REPLICAS, numberOfReplicas()); // if the test class is annotated with SuppressCodecs("*"), it means don't use lucene's codec randomization // otherwise, use it, it has assertions and so on that can find bugs. @@ -404,10 +404,10 @@ public void randomIndexTemplate() throws IOException { randomSettingsBuilder.put(IndexModule.INDEX_QUERY_CACHE_EVERYTHING_SETTING.getKey(), randomBoolean()); } PutIndexTemplateRequestBuilder putTemplate = client().admin().indices() - .preparePutTemplate("random_index_template") - .setTemplate("*") - .setOrder(0) - .setSettings(randomSettingsBuilder); + .preparePutTemplate("random_index_template") + .setTemplate("*") + .setOrder(0) + .setSettings(randomSettingsBuilder); if (mappings != null) { logger.info("test using _default_ mappings: [{}]", mappings.bytes().utf8ToString()); putTemplate.addMapping("_default_", mappings); @@ -443,7 +443,7 @@ protected Settings.Builder setRandomIndexSettings(Random random, Settings.Builde private static Settings.Builder setRandomIndexMergeSettings(Random random, Settings.Builder builder) { if (random.nextBoolean()) { builder.put(MergePolicyConfig.INDEX_COMPOUND_FORMAT_SETTING.getKey(), - random.nextBoolean() ? random.nextDouble() : random.nextBoolean()); + random.nextBoolean() ? random.nextDouble() : random.nextBoolean()); } switch (random.nextInt(4)) { case 3: @@ -525,9 +525,9 @@ protected final void afterInternal(boolean afterClass) throws Exception { if (currentClusterScope != Scope.TEST) { MetaData metaData = client().admin().cluster().prepareState().execute().actionGet().getState().getMetaData(); assertThat("test leaves persistent cluster metadata behind: " + metaData.persistentSettings().getAsMap(), metaData - .persistentSettings().getAsMap().size(), equalTo(0)); + .persistentSettings().getAsMap().size(), equalTo(0)); assertThat("test leaves transient cluster metadata behind: " + metaData.transientSettings().getAsMap(), metaData - .transientSettings().getAsMap().size(), equalTo(0)); + .transientSettings().getAsMap().size(), equalTo(0)); } ensureClusterSizeConsistency(); ensureClusterStateConsistency(); @@ -540,7 +540,7 @@ protected final void afterInternal(boolean afterClass) throws Exception { @Override public void run() { assertThat("still having pending states: " + Strings.arrayToDelimitedString(zenDiscovery.pendingClusterStates(), "\n"), - zenDiscovery.pendingClusterStates(), emptyArray()); + zenDiscovery.pendingClusterStates(), emptyArray()); } }); } @@ -829,7 +829,7 @@ public void assertResultsAndLogOnFailure(long expectedResults, SearchResponse se String failMsg = sb.toString(); for (SearchHit hit : searchResponse.getHits().getHits()) { sb.append("\n-> _index: [").append(hit.getIndex()).append("] type [").append(hit.getType()) - .append("] id [").append(hit.id()).append("]"); + .append("] id [").append(hit.id()).append("]"); } logger.warn("{}", sb); fail(failMsg); @@ -873,7 +873,7 @@ public ClusterHealthStatus ensureGreen(String... indices) { */ public ClusterHealthStatus ensureGreen(TimeValue timeout, String... indices) { ClusterHealthResponse actionGet = client().admin().cluster() - .health(Requests.clusterHealthRequest(indices).timeout(timeout).waitForGreenStatus().waitForEvents(Priority.LANGUID).waitForNoRelocatingShards(true)).actionGet(); + .health(Requests.clusterHealthRequest(indices).timeout(timeout).waitForGreenStatus().waitForEvents(Priority.LANGUID).waitForNoRelocatingShards(true)).actionGet(); if (actionGet.isTimedOut()) { logger.info("ensureGreen timed out, cluster state:\n{}\n{}", client().admin().cluster().prepareState().get().getState().prettyPrint(), client().admin().cluster().preparePendingClusterTasks().get().prettyPrint()); fail("timed out waiting for green state"); @@ -900,7 +900,7 @@ public ClusterHealthStatus waitForRelocation(ClusterHealthStatus status) { request.waitForStatus(status); } ClusterHealthResponse actionGet = client().admin().cluster() - .health(request).actionGet(); + .health(request).actionGet(); if (actionGet.isTimedOut()) { logger.info("waitForRelocation timed out (status={}), cluster state:\n{}\n{}", status, client().admin().cluster().prepareState().get().getState().prettyPrint(), client().admin().cluster().preparePendingClusterTasks().get().prettyPrint()); assertThat("timed out waiting for relocation", actionGet.isTimedOut(), equalTo(false)); @@ -945,7 +945,7 @@ public long waitForDocs(final long numDocs, @Nullable final BackgroundIndexer in * @return the actual number of docs seen. */ public long waitForDocs(final long numDocs, int maxWaitTime, TimeUnit maxWaitTimeUnit, @Nullable final BackgroundIndexer indexer) - throws InterruptedException { + throws InterruptedException { final AtomicLong lastKnownCount = new AtomicLong(-1); long lastStartCount = -1; BooleanSupplier testDocs = () -> { @@ -988,8 +988,8 @@ public long waitForDocs(final long numDocs, int maxWaitTime, TimeUnit maxWaitTim */ public void setMinimumMasterNodes(int n) { assertTrue(client().admin().cluster().prepareUpdateSettings().setTransientSettings( - Settings.builder().put(ElectMasterService.DISCOVERY_ZEN_MINIMUM_MASTER_NODES_SETTING.getKey(), n)) - .get().isAcknowledged()); + Settings.builder().put(ElectMasterService.DISCOVERY_ZEN_MINIMUM_MASTER_NODES_SETTING.getKey(), n)) + .get().isAcknowledged()); } /** @@ -997,7 +997,7 @@ public void setMinimumMasterNodes(int n) { */ public ClusterHealthStatus ensureYellow(String... indices) { ClusterHealthResponse actionGet = client().admin().cluster() - .health(Requests.clusterHealthRequest(indices).waitForNoRelocatingShards(true).waitForYellowStatus().waitForEvents(Priority.LANGUID)).actionGet(); + .health(Requests.clusterHealthRequest(indices).waitForNoRelocatingShards(true).waitForYellowStatus().waitForEvents(Priority.LANGUID)).actionGet(); if (actionGet.isTimedOut()) { logger.info("ensureYellow timed out, cluster state:\n{}\n{}", client().admin().cluster().prepareState().get().getState().prettyPrint(), client().admin().cluster().preparePendingClusterTasks().get().prettyPrint()); assertThat("timed out waiting for yellow", actionGet.isTimedOut(), equalTo(false)); @@ -1019,7 +1019,7 @@ public void logClusterState() { public void logSegmentsState(String... indices) throws Exception { IndicesSegmentResponse segsRsp = client().admin().indices().prepareSegments(indices).get(); logger.debug("segments {} state: \n{}", indices.length == 0 ? "[_all]" : indices, - segsRsp.toXContent(JsonXContent.contentBuilder().prettyPrint(), ToXContent.EMPTY_PARAMS).string()); + segsRsp.toXContent(JsonXContent.contentBuilder().prettyPrint(), ToXContent.EMPTY_PARAMS).string()); } /** @@ -1102,16 +1102,16 @@ protected void ensureStableCluster(int nodeCount, TimeValue timeValue, boolean l } logger.debug("ensuring cluster is stable with [{}] nodes. access node: [{}]. timeout: [{}]", nodeCount, viaNode, timeValue); ClusterHealthResponse clusterHealthResponse = client(viaNode).admin().cluster().prepareHealth() - .setWaitForEvents(Priority.LANGUID) - .setWaitForNodes(Integer.toString(nodeCount)) - .setTimeout(timeValue) - .setLocal(local) - .setWaitForNoRelocatingShards(true) - .get(); + .setWaitForEvents(Priority.LANGUID) + .setWaitForNodes(Integer.toString(nodeCount)) + .setTimeout(timeValue) + .setLocal(local) + .setWaitForNoRelocatingShards(true) + .get(); if (clusterHealthResponse.isTimedOut()) { ClusterStateResponse stateResponse = client(viaNode).admin().cluster().prepareState().get(); fail("failed to reach a stable cluster of [" + nodeCount + "] nodes. Tried via [" + viaNode + "]. last cluster state:\n" - + stateResponse.getState().prettyPrint()); + + stateResponse.getState().prettyPrint()); } assertThat(clusterHealthResponse.isTimedOut(), is(false)); } @@ -1234,7 +1234,7 @@ protected boolean indexExists(String index) { */ protected final void enableAllocation(String... indices) { client().admin().indices().prepareUpdateSettings(indices).setSettings(Settings.builder().put( - EnableAllocationDecider.INDEX_ROUTING_ALLOCATION_ENABLE_SETTING.getKey(), "all" + EnableAllocationDecider.INDEX_ROUTING_ALLOCATION_ENABLE_SETTING.getKey(), "all" )).get(); } @@ -1243,7 +1243,7 @@ protected final void enableAllocation(String... indices) { */ protected final void disableAllocation(String... indices) { client().admin().indices().prepareUpdateSettings(indices).setSettings(Settings.builder().put( - EnableAllocationDecider.INDEX_ROUTING_ALLOCATION_ENABLE_SETTING.getKey(), "none" + EnableAllocationDecider.INDEX_ROUTING_ALLOCATION_ENABLE_SETTING.getKey(), "none" )).get(); } @@ -1357,7 +1357,7 @@ public void indexRandom(boolean forceRefresh, boolean dummyDocuments, boolean ma } } else { List> partition = eagerPartition(builders, Math.min(MAX_BULK_INDEX_REQUEST_SIZE, - Math.max(1, (int) (builders.size() * randomDouble())))); + Math.max(1, (int) (builders.size() * randomDouble())))); logger.info("Index [{}] docs async: [{}] bulk: [{}] partitions [{}]", builders.size(), false, true, partition.size()); for (List segmented : partition) { BulkRequestBuilder bulkBuilder = client().prepareBulk(); @@ -1426,18 +1426,18 @@ private void postIndexAsyncActions(String[] indices, List inFlig if (rarely()) { if (rarely()) { client().admin().indices().prepareRefresh(indices).setIndicesOptions(IndicesOptions.lenientExpandOpen()).execute( - new LatchedActionListener<>(newLatch(inFlightAsyncOperations))); + new LatchedActionListener<>(newLatch(inFlightAsyncOperations))); } else if (maybeFlush && rarely()) { if (randomBoolean()) { client().admin().indices().prepareFlush(indices).setIndicesOptions(IndicesOptions.lenientExpandOpen()).execute( - new LatchedActionListener<>(newLatch(inFlightAsyncOperations))); + new LatchedActionListener<>(newLatch(inFlightAsyncOperations))); } else { client().admin().indices().syncedFlush(syncedFlushRequest(indices).indicesOptions(IndicesOptions.lenientExpandOpen()), new LatchedActionListener<>(newLatch(inFlightAsyncOperations))); } } else if (rarely()) { client().admin().indices().prepareForceMerge(indices).setIndicesOptions(IndicesOptions.lenientExpandOpen()).setMaxNumSegments(between(1, 10)).setFlush(maybeFlush && randomBoolean()).execute( - new LatchedActionListener<>(newLatch(inFlightAsyncOperations))); + new LatchedActionListener<>(newLatch(inFlightAsyncOperations))); } } while (inFlightAsyncOperations.size() > MAX_IN_FLIGHT_ASYNC_INDEXES) { @@ -1567,7 +1567,7 @@ protected void addError(Exception e) { */ public void clearScroll(String... scrollIds) { ClearScrollResponse clearResponse = client().prepareClearScroll() - .setScrollIds(Arrays.asList(scrollIds)).get(); + .setScrollIds(Arrays.asList(scrollIds)).get(); assertThat(clearResponse.isSucceeded(), equalTo(true)); } @@ -1631,20 +1631,20 @@ private boolean randomDynamicTemplates() { */ protected Settings nodeSettings(int nodeOrdinal) { Settings.Builder builder = Settings.builder() - .put(NodeEnvironment.MAX_LOCAL_STORAGE_NODES_SETTING.getKey(), Integer.MAX_VALUE) - // Default the watermarks to absurdly low to prevent the tests - // from failing on nodes without enough disk space - .put(DiskThresholdSettings.CLUSTER_ROUTING_ALLOCATION_LOW_DISK_WATERMARK_SETTING.getKey(), "1b") - .put(DiskThresholdSettings.CLUSTER_ROUTING_ALLOCATION_HIGH_DISK_WATERMARK_SETTING.getKey(), "1b") - .put(ScriptService.SCRIPT_MAX_COMPILATIONS_PER_MINUTE.getKey(), 1000) - .put("script.stored", "true") - .put("script.inline", "true") - // by default we never cache below 10k docs in a segment, - // bypass this limit so that caching gets some testing in - // integration tests that usually create few documents - .put(IndicesQueryCache.INDICES_QUERIES_CACHE_ALL_SEGMENTS_SETTING.getKey(), nodeOrdinal % 2 == 0) - // wait short time for other active shards before actually deleting, default 30s not needed in tests - .put(IndicesStore.INDICES_STORE_DELETE_SHARD_TIMEOUT.getKey(), new TimeValue(1, TimeUnit.SECONDS)); + .put(NodeEnvironment.MAX_LOCAL_STORAGE_NODES_SETTING.getKey(), Integer.MAX_VALUE) + // Default the watermarks to absurdly low to prevent the tests + // from failing on nodes without enough disk space + .put(DiskThresholdSettings.CLUSTER_ROUTING_ALLOCATION_LOW_DISK_WATERMARK_SETTING.getKey(), "1b") + .put(DiskThresholdSettings.CLUSTER_ROUTING_ALLOCATION_HIGH_DISK_WATERMARK_SETTING.getKey(), "1b") + .put(ScriptService.SCRIPT_MAX_COMPILATIONS_PER_MINUTE.getKey(), 1000) + .put("script.stored", "true") + .put("script.inline", "true") + // by default we never cache below 10k docs in a segment, + // bypass this limit so that caching gets some testing in + // integration tests that usually create few documents + .put(IndicesQueryCache.INDICES_QUERIES_CACHE_ALL_SEGMENTS_SETTING.getKey(), nodeOrdinal % 2 == 0) + // wait short time for other active shards before actually deleting, default 30s not needed in tests + .put(IndicesStore.INDICES_STORE_DELETE_SHARD_TIMEOUT.getKey(), new TimeValue(1, TimeUnit.SECONDS)); return builder.build(); } @@ -1739,8 +1739,8 @@ protected TestCluster buildTestCluster(Scope scope, long seed) throws IOExceptio mockPlugins = mocks; } return new InternalTestCluster(seed, createTempDir(), supportsDedicatedMasters, minNumDataNodes, maxNumDataNodes, - InternalTestCluster.clusterName(scope.name(), seed) + "-cluster", nodeConfigurationSource, getNumClientNodes(), - InternalTestCluster.DEFAULT_ENABLE_HTTP_PIPELINING, nodePrefix, mockPlugins, getClientWrapper()); + InternalTestCluster.clusterName(scope.name(), seed) + "-cluster", nodeConfigurationSource, getNumClientNodes(), + InternalTestCluster.DEFAULT_ENABLE_HTTP_PIPELINING, nodePrefix, mockPlugins, getClientWrapper()); } protected NodeConfigurationSource getNodeConfigSource() { @@ -1772,7 +1772,7 @@ public Settings nodeSettings(int nodeOrdinal) { .put(DiscoveryModule.DISCOVERY_TYPE_SETTING.getKey(), isNetwork ? DiscoveryModule.DISCOVERY_TYPE_SETTING.getDefault(Settings.EMPTY) : "local") .put(networkSettings.build()). - put(ESIntegTestCase.this.nodeSettings(nodeOrdinal)).build(); + put(ESIntegTestCase.this.nodeSettings(nodeOrdinal)).build(); } @Override @@ -2071,8 +2071,8 @@ protected Settings prepareBackwardsDataDir(Path backwardsIndex, Object... settin assertFalse(Files.exists(src)); assertTrue(Files.exists(dest)); Settings.Builder builder = Settings.builder() - .put(settings) - .put(Environment.PATH_DATA_SETTING.getKey(), dataDir.toAbsolutePath()); + .put(settings) + .put(Environment.PATH_DATA_SETTING.getKey(), dataDir.toAbsolutePath()); Path configDir = indexDir.resolve("config"); if (Files.exists(configDir)) { diff --git a/test/framework/src/main/java/org/elasticsearch/test/InternalTestCluster.java b/test/framework/src/main/java/org/elasticsearch/test/InternalTestCluster.java index e49e6d4aa408d..6a5493ff1eb80 100644 --- a/test/framework/src/main/java/org/elasticsearch/test/InternalTestCluster.java +++ b/test/framework/src/main/java/org/elasticsearch/test/InternalTestCluster.java @@ -1660,10 +1660,18 @@ public void setDisruptionScheme(ServiceDisruptionScheme scheme) { } public void clearDisruptionScheme() { + clearDisruptionScheme(true); + } + + public void clearDisruptionScheme(boolean ensureHealthyCluster) { if (activeDisruptionScheme != null) { TimeValue expectedHealingTime = activeDisruptionScheme.expectedTimeToHeal(); logger.info("Clearing active scheme {}, expected healing time {}", activeDisruptionScheme, expectedHealingTime); - activeDisruptionScheme.removeAndEnsureHealthy(this); + if (ensureHealthyCluster) { + activeDisruptionScheme.removeAndEnsureHealthy(this); + } else { + activeDisruptionScheme.removeFromCluster(this); + } } activeDisruptionScheme = null; } diff --git a/test/framework/src/main/java/org/elasticsearch/test/disruption/NetworkDisruption.java b/test/framework/src/main/java/org/elasticsearch/test/disruption/NetworkDisruption.java index 4e135c4c2b0bf..f7094d8ae9feb 100644 --- a/test/framework/src/main/java/org/elasticsearch/test/disruption/NetworkDisruption.java +++ b/test/framework/src/main/java/org/elasticsearch/test/disruption/NetworkDisruption.java @@ -328,6 +328,18 @@ public String toString() { } } + public static class IsolateAllNodes extends DisruptedLinks { + + public IsolateAllNodes(Set nodes) { + super(nodes); + } + + @Override + public boolean disrupt(String node1, String node2) { + return true; + } + } + /** * Abstract class representing various types of network disruptions. Instances of this class override the {@link #applyDisruption} * method to apply their specific disruption type to requests that are send from a source to a target node. diff --git a/test/framework/src/main/java/org/elasticsearch/test/disruption/NetworkDisruptionTests.java b/test/framework/src/main/java/org/elasticsearch/test/disruption/NetworkDisruptionTests.java index 4d0f1123a1bee..edc261c175959 100644 --- a/test/framework/src/main/java/org/elasticsearch/test/disruption/NetworkDisruptionTests.java +++ b/test/framework/src/main/java/org/elasticsearch/test/disruption/NetworkDisruptionTests.java @@ -56,6 +56,21 @@ private void checkTwoPartitions(TwoPartitions topology, Set partition1, assertTrue(topology.getMajoritySide().size() >= topology.getMinoritySide().size()); } + public void testIsolateAll() { + Set nodes = generateRandomStringSet(1, 10); + NetworkDisruption.DisruptedLinks topology = new NetworkDisruption.IsolateAllNodes(nodes); + for (int i = 0; i < 10; i++) { + final String node1 = randomFrom(nodes); + final String node2 = randomFrom(nodes); + if (node1.equals(node2)) { + continue; + } + assertTrue(topology.nodes().contains(node1)); + assertTrue(topology.nodes().contains(node2)); + assertTrue(topology.disrupt(node1, node2)); + } + } + public void testBridge() { Set partition1 = generateRandomStringSet(1, 10); Set partition2 = generateRandomStringSet(1, 10);