-
Notifications
You must be signed in to change notification settings - Fork 25k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[Zen2] Introduce gossip-like discovery of master nodes #32246
Changes from 67 commits
aaa8753
321def8
8e9c881
c870c94
686a434
83f7189
1525607
0b4e9d1
d524a62
93ce793
38ef542
578a57f
6ec315c
a933a7f
dddeecd
2177d0a
320ebdd
3712208
a566ffc
5f12c03
dcfa1ca
53c1dff
0120029
3fab139
4701d35
4b70b2e
ffe6637
b219998
52834a1
354c94e
94cf9a3
c7bcae9
7d2a5a6
9e156df
c805add
7aec464
33b17ce
b00795c
0e9850e
f466b44
3c219de
9356a26
7afe75c
27e95cf
5374f2b
1308444
b7c598b
417e6e2
4ca2e62
f034f18
380d930
34b6fd4
98cd79a
e3f9638
253e2ea
5d97de7
fb171e7
8126e86
01e230f
8e3b175
4c52e50
121aad3
c07c7d4
1d4adcc
f0e7155
c19b3df
2051db6
221253a
e5414a1
3608505
2013f10
8b605ad
9aa9003
b517ce9
253a994
7948268
b4df5f3
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,377 @@ | ||
/* | ||
* Licensed to Elasticsearch under one or more contributor | ||
* license agreements. See the NOTICE file distributed with | ||
* this work for additional information regarding copyright | ||
* ownership. Elasticsearch licenses this file to you under | ||
* the Apache License, Version 2.0 (the "License"); you may | ||
* not use this file except in compliance with the License. | ||
* You may obtain a copy of the License at | ||
* | ||
* http://www.apache.org/licenses/LICENSE-2.0 | ||
* | ||
* Unless required by applicable law or agreed to in writing, | ||
* software distributed under the License is distributed on an | ||
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY | ||
* KIND, either express or implied. See the License for the | ||
* specific language governing permissions and limitations | ||
* under the License. | ||
*/ | ||
|
||
package org.elasticsearch.cluster.coordination; | ||
|
||
import com.carrotsearch.hppc.cursors.ObjectCursor; | ||
import org.apache.logging.log4j.message.ParameterizedMessage; | ||
import org.apache.lucene.util.SetOnce; | ||
import org.elasticsearch.action.ActionListener; | ||
import org.elasticsearch.cluster.node.DiscoveryNode; | ||
import org.elasticsearch.cluster.node.DiscoveryNodes; | ||
import org.elasticsearch.common.Nullable; | ||
import org.elasticsearch.common.component.AbstractComponent; | ||
import org.elasticsearch.common.io.stream.StreamInput; | ||
import org.elasticsearch.common.settings.Setting; | ||
import org.elasticsearch.common.settings.Settings; | ||
import org.elasticsearch.common.transport.TransportAddress; | ||
import org.elasticsearch.common.unit.TimeValue; | ||
import org.elasticsearch.common.util.concurrent.AbstractRunnable; | ||
import org.elasticsearch.discovery.ConfiguredHostsResolver; | ||
import org.elasticsearch.threadpool.ThreadPool.Names; | ||
import org.elasticsearch.transport.TransportException; | ||
import org.elasticsearch.transport.TransportResponseHandler; | ||
import org.elasticsearch.transport.TransportService; | ||
|
||
import java.io.IOException; | ||
import java.util.Collections; | ||
import java.util.List; | ||
import java.util.Map; | ||
import java.util.Objects; | ||
import java.util.Optional; | ||
import java.util.stream.Collectors; | ||
|
||
import static org.elasticsearch.common.util.concurrent.ConcurrentCollections.newConcurrentMap; | ||
|
||
public abstract class PeerFinder extends AbstractComponent { | ||
|
||
public static final String REQUEST_PEERS_ACTION_NAME = "internal:discovery/request_peers"; | ||
|
||
// the time between attempts to find all peers | ||
public static final Setting<TimeValue> DISCOVERY_FIND_PEERS_INTERVAL_SETTING = | ||
Setting.timeSetting("discovery.find_peers_interval", | ||
TimeValue.timeValueMillis(1000), TimeValue.timeValueMillis(1), Setting.Property.NodeScope); | ||
|
||
private final TimeValue findPeersDelay; | ||
|
||
private final Object mutex = new Object(); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The new structure suggests that perhaps we don't need such coarse locking, but I do not think this is worth pursuing until we have some evidence that we need something finer. This is not very high-throughput code. |
||
private final TransportService transportService; | ||
private final FutureExecutor futureExecutor; | ||
private final TransportAddressConnector transportAddressConnector; | ||
private final ConfiguredHostsResolver configuredHostsResolver; | ||
|
||
private volatile long currentTerm; | ||
private boolean active; | ||
private DiscoveryNodes lastAcceptedNodes; | ||
private final Map<TransportAddress, Peer> peersByAddress = newConcurrentMap(); | ||
private Optional<DiscoveryNode> leader = Optional.empty(); | ||
|
||
PeerFinder(Settings settings, TransportService transportService, FutureExecutor futureExecutor, | ||
TransportAddressConnector transportAddressConnector, ConfiguredHostsResolver configuredHostsResolver) { | ||
super(settings); | ||
findPeersDelay = DISCOVERY_FIND_PEERS_INTERVAL_SETTING.get(settings); | ||
this.transportService = transportService; | ||
this.futureExecutor = futureExecutor; | ||
this.transportAddressConnector = transportAddressConnector; | ||
this.configuredHostsResolver = configuredHostsResolver; | ||
|
||
transportService.registerRequestHandler(REQUEST_PEERS_ACTION_NAME, Names.GENERIC, false, false, | ||
PeersRequest::new, | ||
(request, channel, task) -> channel.sendResponse(handlePeersRequest(request))); | ||
} | ||
|
||
public void activate(final DiscoveryNodes lastAcceptedNodes) { | ||
logger.trace("activating PeerFinder {}", lastAcceptedNodes); | ||
|
||
synchronized (mutex) { | ||
assert active == false; | ||
active = true; | ||
this.lastAcceptedNodes = lastAcceptedNodes; | ||
leader = Optional.empty(); | ||
handleWakeUp(); | ||
} | ||
} | ||
|
||
public void deactivate(DiscoveryNode leader) { | ||
synchronized (mutex) { | ||
logger.trace("deactivating PeerFinder and setting leader to {}", leader); | ||
active = false; | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. assert active == false? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The tests deactivate the Also although I am pretty sure that we can't change the leader without being active, this is not something we guarantee, nor do I think we need to. |
||
handleWakeUp(); | ||
this.leader = Optional.of(leader); | ||
assert assertInactiveWithNoKnownPeers(); | ||
} | ||
} | ||
|
||
// exposed to subclasses for testing | ||
protected final boolean holdsLock() { | ||
return Thread.holdsLock(mutex); | ||
} | ||
|
||
boolean assertInactiveWithNoKnownPeers() { | ||
assert active == false; | ||
assert peersByAddress.isEmpty(); | ||
return true; | ||
} | ||
|
||
PeersResponse handlePeersRequest(PeersRequest peersRequest) { | ||
synchronized (mutex) { | ||
assert peersRequest.getSourceNode().equals(getLocalNode()) == false; | ||
if (active) { | ||
startProbe(peersRequest.getSourceNode().getAddress()); | ||
peersRequest.getKnownPeers().stream().map(DiscoveryNode::getAddress).forEach(this::startProbe); | ||
return new PeersResponse(Optional.empty(), getFoundPeersUnderLock(), currentTerm); | ||
} else { | ||
return new PeersResponse(leader, Collections.emptyList(), currentTerm); | ||
} | ||
} | ||
} | ||
|
||
public void setCurrentTerm(long currentTerm) { | ||
this.currentTerm = currentTerm; | ||
} | ||
|
||
private DiscoveryNode getLocalNode() { | ||
final DiscoveryNode localNode = transportService.getLocalNode(); | ||
assert localNode != null; | ||
return localNode; | ||
} | ||
|
||
/** | ||
* Called on receipt of a PeersResponse from a node that believes it's an active leader, which this node should therefore try and join. | ||
*/ | ||
protected abstract void onActiveMasterFound(DiscoveryNode masterNode, long term); | ||
|
||
public interface TransportAddressConnector { | ||
/** | ||
* Identify the node at the given address and, if it is a master node and not the local node then establish a full connection to it. | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This probably means that we repeatedly establish (light-weight) connections to nodes in the unicast host pings list that are not master-eligible? Why is the TransportAddressConnector implementation not part of the PR? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think this is benign, but there are things we could do in the This PR is large enough without another implementation and test suite - the real implementation can come in a followup. |
||
*/ | ||
void connectToRemoteMasterNode(TransportAddress transportAddress, ActionListener<DiscoveryNode> listener); | ||
} | ||
|
||
public Iterable<DiscoveryNode> getFoundPeers() { | ||
synchronized (mutex) { | ||
return getFoundPeersUnderLock(); | ||
} | ||
} | ||
|
||
private List<DiscoveryNode> getFoundPeersUnderLock() { | ||
assert active; | ||
assert holdsLock() : "PeerFinder mutex not held"; | ||
return peersByAddress.values().stream().map(Peer::getDiscoveryNode).filter(Objects::nonNull).collect(Collectors.toList()); | ||
} | ||
|
||
private Peer createConnectingPeer(TransportAddress transportAddress) { | ||
Peer peer = new Peer(transportAddress); | ||
peer.establishConnection(); | ||
return peer; | ||
} | ||
|
||
private void handleWakeUp() { | ||
assert holdsLock() : "PeerFinder mutex not held"; | ||
|
||
for (final Peer peer : peersByAddress.values()) { | ||
peer.handleWakeUp(); | ||
} | ||
|
||
if (active == false) { | ||
logger.trace("PeerFinder: not running"); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. do we even need to say that this is PeerFinder? We have a logger for PeerFinder that already spits out the class name (same thing for other logging occurrences in this class). There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Of course. I pushed 7948268. |
||
return; | ||
} | ||
|
||
for (ObjectCursor<DiscoveryNode> discoveryNodeObjectCursor : lastAcceptedNodes.getMasterNodes().values()) { | ||
startProbe(discoveryNodeObjectCursor.value.getAddress()); | ||
} | ||
|
||
configuredHostsResolver.resolveConfiguredHosts(providedAddresses -> { | ||
synchronized (mutex) { | ||
logger.trace("PeerFinder: probing resolved transport addresses {}", providedAddresses); | ||
providedAddresses.forEach(this::startProbe); | ||
} | ||
}); | ||
|
||
futureExecutor.schedule(new AbstractRunnable() { | ||
@Override | ||
public boolean isForceExecution() { | ||
return true; | ||
} | ||
|
||
@Override | ||
public void onFailure(Exception e) { | ||
assert false : e; | ||
logger.debug("unexpected exception in PeerFinder wakeup", e); | ||
} | ||
|
||
@Override | ||
protected void doRun() { | ||
synchronized (mutex) { | ||
handleWakeUp(); | ||
} | ||
} | ||
|
||
@Override | ||
public String toString() { | ||
return "PeerFinder::handleWakeUp"; | ||
} | ||
}, findPeersDelay); | ||
} | ||
|
||
private void startProbe(TransportAddress transportAddress) { | ||
assert holdsLock() : "PeerFinder mutex not held"; | ||
if (active == false) { | ||
logger.trace("startProbe({}) not running", transportAddress); | ||
return; | ||
} | ||
|
||
if (transportAddress.equals(getLocalNode().getAddress())) { | ||
logger.trace("startProbe({}) not probing local node", transportAddress); | ||
return; | ||
} | ||
|
||
peersByAddress.computeIfAbsent(transportAddress, this::createConnectingPeer); | ||
} | ||
|
||
private class Peer { | ||
private final TransportAddress transportAddress; | ||
private SetOnce<DiscoveryNode> discoveryNode = new SetOnce<>(); | ||
private volatile boolean peersRequestInFlight; | ||
|
||
Peer(TransportAddress transportAddress) { | ||
this.transportAddress = transportAddress; | ||
} | ||
|
||
@Nullable | ||
DiscoveryNode getDiscoveryNode() { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. See 4c52e50 |
||
return discoveryNode.get(); | ||
} | ||
|
||
void handleWakeUp() { | ||
assert holdsLock() : "PeerFinder mutex not held"; | ||
|
||
if (active == false) { | ||
removePeer(); | ||
return; | ||
} | ||
|
||
final DiscoveryNode discoveryNode = getDiscoveryNode(); | ||
// may be null if connection not yet established | ||
|
||
if (discoveryNode != null) { | ||
if (transportService.nodeConnected(discoveryNode)) { | ||
if (peersRequestInFlight == false) { | ||
requestPeers(); | ||
} | ||
} else { | ||
logger.trace("{} no longer connected to {}", this, discoveryNode); | ||
removePeer(); | ||
} | ||
} | ||
} | ||
|
||
void establishConnection() { | ||
assert holdsLock() : "PeerFinder mutex not held"; | ||
assert getDiscoveryNode() == null : "unexpectedly connected to " + getDiscoveryNode(); | ||
assert active; | ||
|
||
logger.trace("{} attempting connection", this); | ||
transportAddressConnector.connectToRemoteMasterNode(transportAddress, new ActionListener<DiscoveryNode>() { | ||
@Override | ||
public void onResponse(DiscoveryNode remoteNode) { | ||
assert remoteNode.isMasterNode() : remoteNode + " is not master-eligible"; | ||
assert remoteNode.equals(getLocalNode()) == false : remoteNode + " is the local node"; | ||
synchronized (mutex) { | ||
if (active) { | ||
assert discoveryNode.get() == null : "discoveryNode unexpectedly already set to " + discoveryNode.get(); | ||
discoveryNode.set(remoteNode); | ||
requestPeers(); | ||
} | ||
} | ||
} | ||
|
||
@Override | ||
public void onFailure(Exception e) { | ||
logger.debug(() -> new ParameterizedMessage("{} connection failed", Peer.this), e); | ||
removePeer(); | ||
} | ||
}); | ||
} | ||
|
||
private void removePeer() { | ||
final Peer removed = peersByAddress.remove(transportAddress); | ||
assert removed == Peer.this; | ||
} | ||
|
||
private void requestPeers() { | ||
assert holdsLock() : "PeerFinder mutex not held"; | ||
assert peersRequestInFlight == false : "PeersRequest already in flight"; | ||
assert active; | ||
|
||
final DiscoveryNode discoveryNode = getDiscoveryNode(); | ||
assert discoveryNode != null : "cannot request peers without first connecting"; | ||
|
||
logger.trace("{} requesting peers from {}", this, discoveryNode); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. no need for the There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Sure, see e5414a1 |
||
peersRequestInFlight = true; | ||
|
||
List<DiscoveryNode> knownNodes = getFoundPeersUnderLock(); | ||
|
||
transportService.sendRequest(discoveryNode, REQUEST_PEERS_ACTION_NAME, | ||
new PeersRequest(getLocalNode(), knownNodes), | ||
new TransportResponseHandler<PeersResponse>() { | ||
|
||
@Override | ||
public PeersResponse read(StreamInput in) throws IOException { | ||
return new PeersResponse(in); | ||
} | ||
|
||
@Override | ||
public void handleResponse(PeersResponse response) { | ||
logger.trace("{} received {} from {}", Peer.this, response, discoveryNode); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. with discoveryNode added to Peer.toString, no need to add discoveryNode here. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Sure, see e5414a1 |
||
synchronized (mutex) { | ||
if (active == false) { | ||
return; | ||
} | ||
|
||
peersRequestInFlight = false; | ||
|
||
if (response.getMasterNode().isPresent()) { | ||
final DiscoveryNode masterNode = response.getMasterNode().get(); | ||
if (masterNode.equals(discoveryNode) == false) { | ||
startProbe(masterNode.getAddress()); | ||
} | ||
} else { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I wonder if we should even make this distinction. I think the following should suffice:
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. It's all about the one-liners today. See 221253a. |
||
response.getKnownPeers().stream().map(DiscoveryNode::getAddress) | ||
.forEach(PeerFinder.this::startProbe); | ||
} | ||
} | ||
|
||
if (response.getMasterNode().equals(Optional.of(discoveryNode))) { | ||
// Must not hold lock here to avoid deadlock | ||
assert holdsLock() == false : "PeerFinder mutex is held in error"; | ||
onActiveMasterFound(discoveryNode, response.getTerm()); | ||
} | ||
} | ||
|
||
@Override | ||
public void handleException(TransportException exp) { | ||
peersRequestInFlight = false; | ||
logger.debug("PeersRequest failed", exp); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. add Peer.this to message? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Sure, see e5414a1 |
||
} | ||
|
||
@Override | ||
public String executor() { | ||
return Names.GENERIC; | ||
} | ||
}); | ||
} | ||
|
||
@Override | ||
public String toString() { | ||
return "Peer{" + transportAddress + " peersRequestInFlight=" + peersRequestInFlight + "}"; | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. also add discoveryNode? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Sure, see e5414a1 |
||
} | ||
} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Arguably this should be
org.elasticsearch.discovery
. WDYT?There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ok
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ok, moved things around in 2013f10