Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add current cluster state version to zen pings and use them in master election #20384

Merged
merged 16 commits into from
Sep 15, 2016
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -54,8 +54,8 @@
import org.elasticsearch.common.util.concurrent.ThreadContext;
import org.elasticsearch.discovery.DiscoveryModule;
import org.elasticsearch.discovery.DiscoverySettings;
import org.elasticsearch.discovery.zen.ElectMasterService;
import org.elasticsearch.discovery.zen.ZenDiscovery;
import org.elasticsearch.discovery.zen.elect.ElectMasterService;
import org.elasticsearch.discovery.zen.fd.FaultDetection;
import org.elasticsearch.discovery.zen.ping.unicast.UnicastZenPing;
import org.elasticsearch.env.Environment;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,16 +19,15 @@

package org.elasticsearch.discovery;

import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.common.inject.AbstractModule;
import org.elasticsearch.common.inject.multibindings.Multibinder;
import org.elasticsearch.common.settings.Setting;
import org.elasticsearch.common.settings.Setting.Property;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.util.ExtensionPoint;
import org.elasticsearch.discovery.local.LocalDiscovery;
import org.elasticsearch.discovery.zen.ElectMasterService;
import org.elasticsearch.discovery.zen.ZenDiscovery;
import org.elasticsearch.discovery.zen.elect.ElectMasterService;
import org.elasticsearch.discovery.zen.ping.ZenPing;
import org.elasticsearch.discovery.zen.ping.ZenPingService;
import org.elasticsearch.discovery.zen.ping.unicast.UnicastHostsProvider;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,10 @@
* under the License.
*/

package org.elasticsearch.discovery.zen.elect;
package org.elasticsearch.discovery.zen;

import com.carrotsearch.hppc.ObjectContainer;
import org.apache.lucene.util.CollectionUtil;
import org.elasticsearch.Version;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.common.component.AbstractComponent;
Expand All @@ -33,9 +32,11 @@

import java.util.ArrayList;
import java.util.Arrays;
import java.util.Comparator;
import java.util.Collection;
import java.util.Iterator;
import java.util.List;
import java.util.Objects;
import java.util.stream.Collectors;

/**
*
Expand All @@ -45,17 +46,64 @@ public class ElectMasterService extends AbstractComponent {
public static final Setting<Integer> DISCOVERY_ZEN_MINIMUM_MASTER_NODES_SETTING =
Setting.intSetting("discovery.zen.minimum_master_nodes", -1, Property.Dynamic, Property.NodeScope);

// This is the minimum version a master needs to be on, otherwise it gets ignored
// This is based on the minimum compatible version of the current version this node is on
private final Version minMasterVersion;
private final NodeComparator nodeComparator = new NodeComparator();

private volatile int minimumMasterNodes;

/**
* a class to encapsulate all the information about a candidate in a master election
* that is needed to decided which of the candidates should win
*/
public static class MasterCandidate {

public static final long UNRECOVERED_CLUSTER_VERSION = -1;

final DiscoveryNode node;

final long clusterStateVersion;

public MasterCandidate(DiscoveryNode node, long clusterStateVersion) {
Objects.requireNonNull(node);
assert clusterStateVersion >= -1 : "got: " + clusterStateVersion;
assert node.isMasterNode();
this.node = node;
this.clusterStateVersion = clusterStateVersion;
}

public DiscoveryNode getNode() {
return node;
}

public long getClusterStateVersion() {
return clusterStateVersion;
}

@Override
public String toString() {
return "Candidate{" +
"node=" + node +
", clusterStateVersion=" + clusterStateVersion +
'}';
}

/**
* compares two candidates to indicate which the a better master.
* A higher cluster state version is better
*
* @return -1 if c1 is a batter candidate, 1 if c2.
*/
public static int compare(MasterCandidate c1, MasterCandidate c2) {
// we explicitly swap c1 and c2 here. the code expects "better" is lower in a sorted
// list, so if c2 has a higher cluster state version, it needs to come first.
int ret = Long.compare(c2.clusterStateVersion, c1.clusterStateVersion);
if (ret == 0) {
ret = compareNodes(c1.getNode(), c2.getNode());
}
return ret;
}
}

@Inject
public ElectMasterService(Settings settings) {
super(settings);
this.minMasterVersion = Version.CURRENT.minimumCompatibilityVersion();
this.minimumMasterNodes = DISCOVERY_ZEN_MINIMUM_MASTER_NODES_SETTING.get(settings);
logger.debug("using minimum_master_nodes [{}]", minimumMasterNodes);
}
Expand All @@ -69,16 +117,41 @@ public int minimumMasterNodes() {
}

public boolean hasEnoughMasterNodes(Iterable<DiscoveryNode> nodes) {
if (minimumMasterNodes < 1) {
return true;
}
int count = 0;
for (DiscoveryNode node : nodes) {
if (node.isMasterNode()) {
count++;
}
}
return count >= minimumMasterNodes;
return count > 0 && (minimumMasterNodes < 0 || count >= minimumMasterNodes);
}

public boolean hasEnoughCandidates(Collection<MasterCandidate> candidates) {
if (candidates.isEmpty()) {
return false;
}
if (minimumMasterNodes < 1) {
return true;
}
assert candidates.stream().map(MasterCandidate::getNode).collect(Collectors.toSet()).size() == candidates.size() :
"duplicates ahead: " + candidates;
return candidates.size() >= minimumMasterNodes;
}

/**
* Elects a new master out of the possible nodes, returning it. Returns <tt>null</tt>
* if no master has been elected.
*/
public MasterCandidate electMaster(Collection<MasterCandidate> candidates) {
assert hasEnoughCandidates(candidates);
List<MasterCandidate> sortedCandidates = new ArrayList<>(candidates);
sortedCandidates.sort(MasterCandidate::compare);
return sortedCandidates.get(0);
}

/** selects the best active master to join, where multiple are discovered */
public DiscoveryNode tieBreakActiveMasters(Collection<DiscoveryNode> activeMasters) {
return activeMasters.stream().min(ElectMasterService::compareNodes).get();
}

public boolean hasTooManyMasterNodes(Iterable<DiscoveryNode> nodes) {
Expand Down Expand Up @@ -107,7 +180,7 @@ public void logMinimumMasterNodesWarningIfNecessary(ClusterState oldState, Clust
*/
public List<DiscoveryNode> sortByMasterLikelihood(Iterable<DiscoveryNode> nodes) {
ArrayList<DiscoveryNode> sortedNodes = CollectionUtils.iterableAsArrayList(nodes);
CollectionUtil.introSort(sortedNodes, nodeComparator);
CollectionUtil.introSort(sortedNodes, ElectMasterService::compareNodes);
return sortedNodes;
}

Expand All @@ -130,25 +203,6 @@ public DiscoveryNode[] nextPossibleMasters(ObjectContainer<DiscoveryNode> nodes,
return nextPossibleMasters.toArray(new DiscoveryNode[nextPossibleMasters.size()]);
}

/**
* Elects a new master out of the possible nodes, returning it. Returns <tt>null</tt>
* if no master has been elected.
*/
public DiscoveryNode electMaster(Iterable<DiscoveryNode> nodes) {
List<DiscoveryNode> sortedNodes = sortedMasterNodes(nodes);
if (sortedNodes == null || sortedNodes.isEmpty()) {
return null;
}
DiscoveryNode masterNode = sortedNodes.get(0);
// Sanity check: maybe we don't end up here, because serialization may have failed.
if (masterNode.getVersion().before(minMasterVersion)) {
logger.warn("ignoring master [{}], because the version [{}] is lower than the minimum compatible version [{}]", masterNode, masterNode.getVersion(), minMasterVersion);
return null;
} else {
return masterNode;
}
}

private List<DiscoveryNode> sortedMasterNodes(Iterable<DiscoveryNode> nodes) {
List<DiscoveryNode> possibleNodes = CollectionUtils.iterableAsArrayList(nodes);
if (possibleNodes.isEmpty()) {
Expand All @@ -161,21 +215,18 @@ private List<DiscoveryNode> sortedMasterNodes(Iterable<DiscoveryNode> nodes) {
it.remove();
}
}
CollectionUtil.introSort(possibleNodes, nodeComparator);
CollectionUtil.introSort(possibleNodes, ElectMasterService::compareNodes);
return possibleNodes;
}

private static class NodeComparator implements Comparator<DiscoveryNode> {

@Override
public int compare(DiscoveryNode o1, DiscoveryNode o2) {
if (o1.isMasterNode() && !o2.isMasterNode()) {
return -1;
}
if (!o1.isMasterNode() && o2.isMasterNode()) {
return 1;
}
return o1.getId().compareTo(o2.getId());
/** master nodes go before other nodes, with a secondary sort by id **/
private static int compareNodes(DiscoveryNode o1, DiscoveryNode o2) {
if (o1.isMasterNode() && !o2.isMasterNode()) {
return -1;
}
if (!o1.isMasterNode() && o2.isMasterNode()) {
return 1;
}
return o1.getId().compareTo(o2.getId());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,6 @@
import org.elasticsearch.common.transport.LocalTransportAddress;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.discovery.DiscoverySettings;
import org.elasticsearch.discovery.zen.elect.ElectMasterService;
import org.elasticsearch.discovery.zen.membership.MembershipAction;

import java.util.ArrayList;
Expand Down
Loading