Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Zen2] Introduce gossip-like discovery of master nodes #32246

Merged
merged 77 commits into from
Aug 6, 2018
Merged
Show file tree
Hide file tree
Changes from 67 commits
Commits
Show all changes
77 commits
Select commit Hold shift + click to select a range
aaa8753
Introduce gossip-like discovery of master nodes
DaveCTurner Jul 20, 2018
321def8
No zen2 in request peers action
DaveCTurner Jul 31, 2018
8e9c881
Fix up discovery.find_peers_interval setting
DaveCTurner Jul 31, 2018
c870c94
Remove unused foundQuorumFrom
DaveCTurner Jul 31, 2018
686a434
protected
DaveCTurner Jul 31, 2018
83f7189
Inline one-use method
DaveCTurner Jul 31, 2018
1525607
We do not probe the local address so it does not need to be provided
DaveCTurner Jul 31, 2018
0b4e9d1
Fixup protected
DaveCTurner Jul 31, 2018
d524a62
Remove labelRunnable() and use an AbstractRunnable
DaveCTurner Jul 31, 2018
93ce793
Just created this, don't need another copy
DaveCTurner Jul 31, 2018
38ef542
Rename
DaveCTurner Jul 31, 2018
578a57f
Rename
DaveCTurner Jul 31, 2018
6ec315c
Not so private
DaveCTurner Jul 31, 2018
a933a7f
TODOs
DaveCTurner Jul 31, 2018
dddeecd
Renaming
DaveCTurner Jul 31, 2018
2177d0a
lastAcceptedNodes cannot change while the peerfinder is active, so ju…
DaveCTurner Aug 1, 2018
320ebdd
Do not need the local node to be in foundPeers, so remove it
DaveCTurner Aug 1, 2018
3712208
Rename discoveryNodes -> knownPeers in PeersRequest
DaveCTurner Aug 1, 2018
a566ffc
Rename candidateNodes to knownPeers in PeersResponse too
DaveCTurner Aug 1, 2018
5f12c03
Imports
DaveCTurner Aug 1, 2018
dcfa1ca
Private class
DaveCTurner Aug 1, 2018
53c1dff
Private
DaveCTurner Aug 1, 2018
0120029
Add assertion of received term
DaveCTurner Aug 1, 2018
3fab139
Start work on having the PeerFinder respond to PeersRequests directly
DaveCTurner Aug 1, 2018
4701d35
Fix assertion
DaveCTurner Aug 2, 2018
4b70b2e
Add test case for values in PeersResponse
DaveCTurner Aug 2, 2018
ffe6637
Test that it does delegate to onPeersRequestWhenInactive if inactive
DaveCTurner Aug 2, 2018
b219998
Also verify that it receives messages from the transport service
DaveCTurner Aug 2, 2018
52834a1
TODO resolved
DaveCTurner Aug 2, 2018
354c94e
Imports
DaveCTurner Aug 2, 2018
94cf9a3
Remove foundPeers and just track the nodes we've sent requests to
DaveCTurner Aug 2, 2018
c7bcae9
Prevent multiple concurrent attempts to connect to the same node
DaveCTurner Aug 2, 2018
7d2a5a6
Merge branch 'zen2' into 2018-07-20-peerfinder
DaveCTurner Aug 2, 2018
9e156df
Line length
DaveCTurner Aug 2, 2018
c805add
Remove unused class
DaveCTurner Aug 3, 2018
7aec464
Remove refs to probeLock
DaveCTurner Aug 3, 2018
33b17ce
connectTo is already async, no need to schedule it with the executorS…
DaveCTurner Aug 3, 2018
b00795c
Consolidate per-address logic into Peer class
DaveCTurner Aug 3, 2018
0e9850e
No need to track separate state, just need a boolean
DaveCTurner Aug 3, 2018
f466b44
Tweaks
DaveCTurner Aug 3, 2018
3c219de
Need moar coffee
DaveCTurner Aug 3, 2018
9356a26
Pass in leader when deactivating
DaveCTurner Aug 4, 2018
7afe75c
Remove comments re. synchronisation
DaveCTurner Aug 4, 2018
27e95cf
Rename callback
DaveCTurner Aug 4, 2018
5374f2b
Remove foundMasterNode boolean
DaveCTurner Aug 4, 2018
1308444
Assert discoveryNode not set
DaveCTurner Aug 4, 2018
b7c598b
Get the remote node again rather than passing it in
DaveCTurner Aug 4, 2018
417e6e2
Unwrap provided addresses
DaveCTurner Aug 4, 2018
4ca2e62
Extract ConfiguredHostsResolver class
DaveCTurner Aug 4, 2018
f034f18
Combine ActivePeerFinder and PeerFinder
DaveCTurner Aug 4, 2018
380d930
No need to expose isActive - this test is not helpful
DaveCTurner Aug 4, 2018
34b6fd4
Assert lifecycle is started
DaveCTurner Aug 4, 2018
98cd79a
No need for PeerFinder to be responsible for lifecycle of ConfiguredH…
DaveCTurner Aug 4, 2018
e3f9638
Add another test with failing address resolution
DaveCTurner Aug 4, 2018
253e2ea
Delete unused class
DaveCTurner Aug 4, 2018
5d97de7
Fix log message
DaveCTurner Aug 4, 2018
fb171e7
Assert no known peers as soon as deactivated
DaveCTurner Aug 4, 2018
8126e86
Private
DaveCTurner Aug 5, 2018
01e230f
Inline and rename
DaveCTurner Aug 5, 2018
8e3b175
Use AbstractRunnable and force execution
DaveCTurner Aug 5, 2018
4c52e50
Nullable
DaveCTurner Aug 6, 2018
121aad3
Rename and streamify
DaveCTurner Aug 6, 2018
c07c7d4
Reorder method
DaveCTurner Aug 6, 2018
1d4adcc
Rename request peers action
DaveCTurner Aug 6, 2018
f0e7155
Safe to wake up peers even if already deactivated
DaveCTurner Aug 6, 2018
c19b3df
Imports
DaveCTurner Aug 6, 2018
2051db6
Fix log message
DaveCTurner Aug 6, 2018
221253a
Oneliners
DaveCTurner Aug 6, 2018
e5414a1
Add discoveryNode to PeersFinder.Peer.toString() and remove from log …
DaveCTurner Aug 6, 2018
3608505
Remove TODO
DaveCTurner Aug 6, 2018
2013f10
Move PeerFinder machinery to discovery package
DaveCTurner Aug 6, 2018
8b605ad
Move ConfiguredHostsResolver interface into PeerFinder
DaveCTurner Aug 6, 2018
9aa9003
Logger usage
DaveCTurner Aug 6, 2018
b517ce9
Whitespace
DaveCTurner Aug 6, 2018
253a994
No need to refer to class name in log messages
DaveCTurner Aug 6, 2018
7948268
Can only deactivate an active PeerFinder
DaveCTurner Aug 6, 2018
b4df5f3
Revert "Can only deactivate an active PeerFinder"
DaveCTurner Aug 6, 2018
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,377 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.elasticsearch.cluster.coordination;
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Arguably this should be org.elasticsearch.discovery. WDYT?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ok

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok, moved things around in 2013f10


import com.carrotsearch.hppc.cursors.ObjectCursor;
import org.apache.logging.log4j.message.ParameterizedMessage;
import org.apache.lucene.util.SetOnce;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.node.DiscoveryNodes;
import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.component.AbstractComponent;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.settings.Setting;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.transport.TransportAddress;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.util.concurrent.AbstractRunnable;
import org.elasticsearch.discovery.ConfiguredHostsResolver;
import org.elasticsearch.threadpool.ThreadPool.Names;
import org.elasticsearch.transport.TransportException;
import org.elasticsearch.transport.TransportResponseHandler;
import org.elasticsearch.transport.TransportService;

import java.io.IOException;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.stream.Collectors;

import static org.elasticsearch.common.util.concurrent.ConcurrentCollections.newConcurrentMap;

public abstract class PeerFinder extends AbstractComponent {

public static final String REQUEST_PEERS_ACTION_NAME = "internal:discovery/request_peers";

// the time between attempts to find all peers
public static final Setting<TimeValue> DISCOVERY_FIND_PEERS_INTERVAL_SETTING =
Setting.timeSetting("discovery.find_peers_interval",
TimeValue.timeValueMillis(1000), TimeValue.timeValueMillis(1), Setting.Property.NodeScope);

private final TimeValue findPeersDelay;

private final Object mutex = new Object();
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The new structure suggests that perhaps we don't need such coarse locking, but I do not think this is worth pursuing until we have some evidence that we need something finer. This is not very high-throughput code.

private final TransportService transportService;
private final FutureExecutor futureExecutor;
private final TransportAddressConnector transportAddressConnector;
private final ConfiguredHostsResolver configuredHostsResolver;

private volatile long currentTerm;
private boolean active;
private DiscoveryNodes lastAcceptedNodes;
private final Map<TransportAddress, Peer> peersByAddress = newConcurrentMap();
private Optional<DiscoveryNode> leader = Optional.empty();

PeerFinder(Settings settings, TransportService transportService, FutureExecutor futureExecutor,
TransportAddressConnector transportAddressConnector, ConfiguredHostsResolver configuredHostsResolver) {
super(settings);
findPeersDelay = DISCOVERY_FIND_PEERS_INTERVAL_SETTING.get(settings);
this.transportService = transportService;
this.futureExecutor = futureExecutor;
this.transportAddressConnector = transportAddressConnector;
this.configuredHostsResolver = configuredHostsResolver;

transportService.registerRequestHandler(REQUEST_PEERS_ACTION_NAME, Names.GENERIC, false, false,
PeersRequest::new,
(request, channel, task) -> channel.sendResponse(handlePeersRequest(request)));
}

public void activate(final DiscoveryNodes lastAcceptedNodes) {
logger.trace("activating PeerFinder {}", lastAcceptedNodes);

synchronized (mutex) {
assert active == false;
active = true;
this.lastAcceptedNodes = lastAcceptedNodes;
leader = Optional.empty();
handleWakeUp();
}
}

public void deactivate(DiscoveryNode leader) {
synchronized (mutex) {
logger.trace("deactivating PeerFinder and setting leader to {}", leader);
active = false;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

assert active == false?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The tests deactivate the PeerFinder at the end, regardless of whether it's active or not, and there's no particular reason to avoid double-deactivation or to make sure that each test ends with an active PeerFinder.

Also although I am pretty sure that we can't change the leader without being active, this is not something we guarantee, nor do I think we need to.

handleWakeUp();
this.leader = Optional.of(leader);
assert assertInactiveWithNoKnownPeers();
}
}

// exposed to subclasses for testing
protected final boolean holdsLock() {
return Thread.holdsLock(mutex);
}

boolean assertInactiveWithNoKnownPeers() {
assert active == false;
assert peersByAddress.isEmpty();
return true;
}

PeersResponse handlePeersRequest(PeersRequest peersRequest) {
synchronized (mutex) {
assert peersRequest.getSourceNode().equals(getLocalNode()) == false;
if (active) {
startProbe(peersRequest.getSourceNode().getAddress());
peersRequest.getKnownPeers().stream().map(DiscoveryNode::getAddress).forEach(this::startProbe);
return new PeersResponse(Optional.empty(), getFoundPeersUnderLock(), currentTerm);
} else {
return new PeersResponse(leader, Collections.emptyList(), currentTerm);
}
}
}

public void setCurrentTerm(long currentTerm) {
this.currentTerm = currentTerm;
}

private DiscoveryNode getLocalNode() {
final DiscoveryNode localNode = transportService.getLocalNode();
assert localNode != null;
return localNode;
}

/**
* Called on receipt of a PeersResponse from a node that believes it's an active leader, which this node should therefore try and join.
*/
protected abstract void onActiveMasterFound(DiscoveryNode masterNode, long term);

public interface TransportAddressConnector {
/**
* Identify the node at the given address and, if it is a master node and not the local node then establish a full connection to it.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This probably means that we repeatedly establish (light-weight) connections to nodes in the unicast host pings list that are not master-eligible?

Why is the TransportAddressConnector implementation not part of the PR?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this is benign, but there are things we could do in the TransportAddressConnector to avoid this. For instance, if we are already connected to a non-master node then there's no need to identify it via a lightweight connection; perhaps we could cache responses too.

This PR is large enough without another implementation and test suite - the real implementation can come in a followup.

*/
void connectToRemoteMasterNode(TransportAddress transportAddress, ActionListener<DiscoveryNode> listener);
}

public Iterable<DiscoveryNode> getFoundPeers() {
synchronized (mutex) {
return getFoundPeersUnderLock();
}
}

private List<DiscoveryNode> getFoundPeersUnderLock() {
assert active;
assert holdsLock() : "PeerFinder mutex not held";
return peersByAddress.values().stream().map(Peer::getDiscoveryNode).filter(Objects::nonNull).collect(Collectors.toList());
}

private Peer createConnectingPeer(TransportAddress transportAddress) {
Peer peer = new Peer(transportAddress);
peer.establishConnection();
return peer;
}

private void handleWakeUp() {
assert holdsLock() : "PeerFinder mutex not held";

for (final Peer peer : peersByAddress.values()) {
peer.handleWakeUp();
}

if (active == false) {
logger.trace("PeerFinder: not running");
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

do we even need to say that this is PeerFinder? We have a logger for PeerFinder that already spits out the class name (same thing for other logging occurrences in this class).

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Of course. I pushed 7948268.

return;
}

for (ObjectCursor<DiscoveryNode> discoveryNodeObjectCursor : lastAcceptedNodes.getMasterNodes().values()) {
startProbe(discoveryNodeObjectCursor.value.getAddress());
}

configuredHostsResolver.resolveConfiguredHosts(providedAddresses -> {
synchronized (mutex) {
logger.trace("PeerFinder: probing resolved transport addresses {}", providedAddresses);
providedAddresses.forEach(this::startProbe);
}
});

futureExecutor.schedule(new AbstractRunnable() {
@Override
public boolean isForceExecution() {
return true;
}

@Override
public void onFailure(Exception e) {
assert false : e;
logger.debug("unexpected exception in PeerFinder wakeup", e);
}

@Override
protected void doRun() {
synchronized (mutex) {
handleWakeUp();
}
}

@Override
public String toString() {
return "PeerFinder::handleWakeUp";
}
}, findPeersDelay);
}

private void startProbe(TransportAddress transportAddress) {
assert holdsLock() : "PeerFinder mutex not held";
if (active == false) {
logger.trace("startProbe({}) not running", transportAddress);
return;
}

if (transportAddress.equals(getLocalNode().getAddress())) {
logger.trace("startProbe({}) not probing local node", transportAddress);
return;
}

peersByAddress.computeIfAbsent(transportAddress, this::createConnectingPeer);
}

private class Peer {
private final TransportAddress transportAddress;
private SetOnce<DiscoveryNode> discoveryNode = new SetOnce<>();
private volatile boolean peersRequestInFlight;

Peer(TransportAddress transportAddress) {
this.transportAddress = transportAddress;
}

@Nullable
DiscoveryNode getDiscoveryNode() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@Nullable

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

See 4c52e50

return discoveryNode.get();
}

void handleWakeUp() {
assert holdsLock() : "PeerFinder mutex not held";

if (active == false) {
removePeer();
return;
}

final DiscoveryNode discoveryNode = getDiscoveryNode();
// may be null if connection not yet established

if (discoveryNode != null) {
if (transportService.nodeConnected(discoveryNode)) {
if (peersRequestInFlight == false) {
requestPeers();
}
} else {
logger.trace("{} no longer connected to {}", this, discoveryNode);
removePeer();
}
}
}

void establishConnection() {
assert holdsLock() : "PeerFinder mutex not held";
assert getDiscoveryNode() == null : "unexpectedly connected to " + getDiscoveryNode();
assert active;

logger.trace("{} attempting connection", this);
transportAddressConnector.connectToRemoteMasterNode(transportAddress, new ActionListener<DiscoveryNode>() {
@Override
public void onResponse(DiscoveryNode remoteNode) {
assert remoteNode.isMasterNode() : remoteNode + " is not master-eligible";
assert remoteNode.equals(getLocalNode()) == false : remoteNode + " is the local node";
synchronized (mutex) {
if (active) {
assert discoveryNode.get() == null : "discoveryNode unexpectedly already set to " + discoveryNode.get();
discoveryNode.set(remoteNode);
requestPeers();
}
}
}

@Override
public void onFailure(Exception e) {
logger.debug(() -> new ParameterizedMessage("{} connection failed", Peer.this), e);
removePeer();
}
});
}

private void removePeer() {
final Peer removed = peersByAddress.remove(transportAddress);
assert removed == Peer.this;
}

private void requestPeers() {
assert holdsLock() : "PeerFinder mutex not held";
assert peersRequestInFlight == false : "PeersRequest already in flight";
assert active;

final DiscoveryNode discoveryNode = getDiscoveryNode();
assert discoveryNode != null : "cannot request peers without first connecting";

logger.trace("{} requesting peers from {}", this, discoveryNode);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

no need for the from {} with the proposed toString change.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure, see e5414a1

peersRequestInFlight = true;

List<DiscoveryNode> knownNodes = getFoundPeersUnderLock();

transportService.sendRequest(discoveryNode, REQUEST_PEERS_ACTION_NAME,
new PeersRequest(getLocalNode(), knownNodes),
new TransportResponseHandler<PeersResponse>() {

@Override
public PeersResponse read(StreamInput in) throws IOException {
return new PeersResponse(in);
}

@Override
public void handleResponse(PeersResponse response) {
logger.trace("{} received {} from {}", Peer.this, response, discoveryNode);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

with discoveryNode added to Peer.toString, no need to add discoveryNode here.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure, see e5414a1

synchronized (mutex) {
if (active == false) {
return;
}

peersRequestInFlight = false;

if (response.getMasterNode().isPresent()) {
final DiscoveryNode masterNode = response.getMasterNode().get();
if (masterNode.equals(discoveryNode) == false) {
startProbe(masterNode.getAddress());
}
} else {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wonder if we should even make this distinction. I think the following should suffice:

response.getMasterNode().map(DiscoveryNode::getAddress).ifPresent(PeerFinder.this::startProbe);
response.getKnownPeers().stream().map(DiscoveryNode::getAddress).forEach(PeerFinder.this::startProbe);

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's all about the one-liners today. See 221253a.

response.getKnownPeers().stream().map(DiscoveryNode::getAddress)
.forEach(PeerFinder.this::startProbe);
}
}

if (response.getMasterNode().equals(Optional.of(discoveryNode))) {
// Must not hold lock here to avoid deadlock
assert holdsLock() == false : "PeerFinder mutex is held in error";
onActiveMasterFound(discoveryNode, response.getTerm());
}
}

@Override
public void handleException(TransportException exp) {
peersRequestInFlight = false;
logger.debug("PeersRequest failed", exp);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

add Peer.this to message?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure, see e5414a1

}

@Override
public String executor() {
return Names.GENERIC;
}
});
}

@Override
public String toString() {
return "Peer{" + transportAddress + " peersRequestInFlight=" + peersRequestInFlight + "}";
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

also add discoveryNode?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure, see e5414a1

}
}
}
Loading