Skip to content
This repository has been archived by the owner on Sep 26, 2019. It is now read-only.

[PAN-2730] Create MaintainedPeers class #1484

Merged
merged 4 commits into from
May 22, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@
*/
package tech.pegasys.pantheon.ethereum.p2p.network;

import static com.google.common.base.Preconditions.checkArgument;
import static com.google.common.base.Preconditions.checkNotNull;
import static com.google.common.base.Preconditions.checkState;

Expand All @@ -35,6 +34,7 @@
import tech.pegasys.pantheon.ethereum.p2p.network.netty.PeerConnectionRegistry;
import tech.pegasys.pantheon.ethereum.p2p.network.netty.TimeoutHandler;
import tech.pegasys.pantheon.ethereum.p2p.peers.DefaultPeer;
import tech.pegasys.pantheon.ethereum.p2p.peers.MaintainedPeers;
import tech.pegasys.pantheon.ethereum.p2p.peers.Peer;
import tech.pegasys.pantheon.ethereum.p2p.permissions.PeerPermissions;
import tech.pegasys.pantheon.ethereum.p2p.permissions.PeerPermissionsBlacklist;
Expand All @@ -56,7 +56,6 @@
import java.util.Arrays;
import java.util.Collection;
import java.util.Comparator;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Optional;
Expand Down Expand Up @@ -158,7 +157,7 @@ public class DefaultP2PNetwork implements P2PNetwork {
private final PeerPermissions peerPermissions;
private volatile Optional<PeerRlpxPermissions> rlpxPermissions = Optional.empty();

@VisibleForTesting final Collection<Peer> peerMaintainConnectionList;
private final MaintainedPeers maintainedPeers;
@VisibleForTesting final PeerConnectionRegistry connections;

@VisibleForTesting
Expand Down Expand Up @@ -196,13 +195,14 @@ public class DefaultP2PNetwork implements P2PNetwork {
final NetworkingConfiguration config,
final List<Capability> supportedCapabilities,
final PeerPermissions peerPermissions,
final MaintainedPeers maintainedPeers,
final MetricsSystem metricsSystem) {

this.peerDiscoveryAgent = peerDiscoveryAgent;
this.keyPair = keyPair;
this.config = config;
this.supportedCapabilities = supportedCapabilities;
this.peerMaintainConnectionList = new HashSet<>();
this.maintainedPeers = maintainedPeers;
this.connections = new PeerConnectionRegistry(metricsSystem);

this.nodeId = this.keyPair.getPublicKey().getEncodedBytes();
Expand Down Expand Up @@ -358,46 +358,19 @@ protected void initChannel(final SocketChannel ch) {

@Override
public boolean addMaintainConnectionPeer(final Peer peer) {
checkArgument(
peer.getEnodeURL().isListening(),
"Invalid enode url. Enode url must contain a non-zero listening port.");
final boolean added = peerMaintainConnectionList.add(peer);
final boolean allowConnection =
rlpxPermissions.isPresent() && rlpxPermissions.get().allowNewOutboundConnectionTo(peer);
if (allowConnection && !isConnectingOrConnected(peer)) {
// Connect immediately if appropriate
connect(peer);
}

return added;
return maintainedPeers.add(peer);
}

@Override
public boolean removeMaintainedConnectionPeer(final Peer peer) {
final boolean removed = peerMaintainConnectionList.remove(peer);

final CompletableFuture<PeerConnection> connectionFuture = pendingConnections.get(peer);
if (connectionFuture != null) {
connectionFuture.thenAccept(connection -> connection.disconnect(DisconnectReason.REQUESTED));
}

final Optional<PeerConnection> peerConnection = connections.getConnectionForPeer(peer.getId());
peerConnection.ifPresent(pc -> pc.disconnect(DisconnectReason.REQUESTED));

peerDiscoveryAgent.dropPeer(peer);

return removed;
return maintainedPeers.remove(peer);
}

void checkMaintainedConnectionPeers() {
if (!rlpxPermissions.isPresent()) {
return;
}
final PeerRlpxPermissions permissions = rlpxPermissions.get();
peerMaintainConnectionList.stream()
.filter(p -> !isConnectingOrConnected(p))
.filter(permissions::allowNewOutboundConnectionTo)
.forEach(this::connect);
maintainedPeers.streamPeers().forEach(this::connect);
}

@VisibleForTesting
Expand Down Expand Up @@ -459,20 +432,35 @@ public CompletableFuture<PeerConnection> connect(final Peer peer) {
return connectionFuture;
}

LOG.trace("Initiating connection to peer: {}", peer.getId());
// Check for existing connection
final Optional<PeerConnection> existingConnection =
connections.getConnectionForPeer(peer.getId());
if (existingConnection.isPresent()) {
connectionFuture.complete(existingConnection.get());
return connectionFuture;
}
// Check for existing pending connection
final CompletableFuture<PeerConnection> existingPendingConnection =
pendingConnections.putIfAbsent(peer, connectionFuture);
if (existingPendingConnection != null) {
LOG.debug("Attempted to connect to peer with pending connection: {}", peer.getId());
return existingPendingConnection;
}

initiateOutboundConnection(peer, connectionFuture);
return connectionFuture;
}

@VisibleForTesting
void initiateOutboundConnection(
final Peer peer, final CompletableFuture<PeerConnection> connectionFuture) {
LOG.trace("Initiating connection to peer: {}", peer.getId());
final EnodeURL enode = peer.getEnodeURL();
if (!enode.isListening()) {
final String errorMsg =
"Attempt to connect to peer with no listening port: " + enode.toString();
LOG.warn(errorMsg);
connectionFuture.completeExceptionally(new IllegalStateException(errorMsg));
return connectionFuture;
connectionFuture.completeExceptionally(new IllegalArgumentException(errorMsg));
return;
}

if (peer instanceof DiscoveryPeer) {
Expand Down Expand Up @@ -528,7 +516,6 @@ protected void initChannel(final SocketChannel ch) {
}
logConnections();
});
return connectionFuture;
}

private void logConnections() {
Expand Down Expand Up @@ -567,12 +554,32 @@ public void start() {
final Peer ourNode = createLocalNode();
this.rlpxPermissions = Optional.of(new PeerRlpxPermissions(ourNode, peerPermissions));

this.maintainedPeers.subscribeAdd(this::handleMaintainedPeerAdded);
this.maintainedPeers.subscribeRemove(this::handleMaintainedPeerRemoved);

peerConnectionScheduler.scheduleWithFixedDelay(
this::checkMaintainedConnectionPeers, 2, 60, TimeUnit.SECONDS);
peerConnectionScheduler.scheduleWithFixedDelay(
this::attemptPeerConnections, 30, 30, TimeUnit.SECONDS);
}

private void handleMaintainedPeerRemoved(final Peer peer, final boolean wasRemoved) {
// Drop peer from peer table
peerDiscoveryAgent.dropPeer(peer);

// Disconnect if connected or connecting
final CompletableFuture<PeerConnection> connectionFuture = pendingConnections.get(peer);
if (connectionFuture != null) {
connectionFuture.thenAccept(connection -> connection.disconnect(DisconnectReason.REQUESTED));
}
final Optional<PeerConnection> peerConnection = connections.getConnectionForPeer(peer.getId());
peerConnection.ifPresent(pc -> pc.disconnect(DisconnectReason.REQUESTED));
}

private void handleMaintainedPeerAdded(final Peer peer, final boolean wasAdded) {
this.connect(peer);
}

@VisibleForTesting
Consumer<PeerBondedEvent> handlePeerBondedEvent() {
return event -> {
Expand Down Expand Up @@ -626,10 +633,6 @@ boolean isConnected(final Peer peer) {
return connections.isAlreadyConnected(peer.getId());
}

private boolean isConnectingOrConnected(final Peer peer) {
return isConnected(peer) || isConnecting(peer);
}

@Override
public void stop() {
if (!this.started.get() || !stopped.compareAndSet(false, true)) {
Expand Down Expand Up @@ -728,6 +731,7 @@ public static class Builder {
private Optional<NodePermissioningController> nodePermissioningController = Optional.empty();
private Blockchain blockchain = null;
private Vertx vertx;
private MaintainedPeers maintainedPeers = new MaintainedPeers();

public P2PNetwork build() {
validate();
Expand All @@ -750,6 +754,7 @@ private P2PNetwork doBuild() {
config,
supportedCapabilities,
peerPermissions,
maintainedPeers,
metricsSystem);
}

Expand Down Expand Up @@ -829,5 +834,11 @@ public Builder blockchain(final Blockchain blockchain) {
this.blockchain = blockchain;
return this;
}

public Builder maintainedPeers(final MaintainedPeers maintainedPeers) {
checkNotNull(maintainedPeers);
this.maintainedPeers = maintainedPeers;
return this;
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,74 @@
/*
* Copyright 2019 ConsenSys AG.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*/
package tech.pegasys.pantheon.ethereum.p2p.peers;

import static com.google.common.base.Preconditions.checkArgument;

import tech.pegasys.pantheon.util.Subscribers;

import java.util.Set;
import java.util.stream.Stream;

import io.vertx.core.impl.ConcurrentHashSet;

/** Represents a set of peers for which connections should be actively maintained. */
public class MaintainedPeers {
private final Set<Peer> maintainedPeers = new ConcurrentHashSet<>();
private final Subscribers<PeerAddedCallback> addedSubscribers = new Subscribers<>();
private final Subscribers<PeerRemovedCallback> removedCallbackSubscribers = new Subscribers<>();

public boolean add(final Peer peer) {
checkArgument(
peer.getEnodeURL().isListening(),
"Invalid enode url. Enode url must contain a non-zero listening port.");
boolean wasAdded = maintainedPeers.add(peer);
addedSubscribers.forEach(s -> s.onPeerAdded(peer, wasAdded));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why not invoking subscribers in parallel ? Since there is no particular check in the foreach loop it is doable.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Leaving this as-is for now because I don't want to get into changing the Subscribers interface.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You don't have too.
I was just suggesting to do : addedSubscribers.parallelStream().forEach(s -> s.onPeerAdded(peer, wasAdded)); instead of addedSubscribers.forEach(s -> s.onPeerAdded(peer, wasAdded));

return wasAdded;
}

public boolean remove(final Peer peer) {
boolean wasRemoved = maintainedPeers.remove(peer);
removedCallbackSubscribers.forEach(s -> s.onPeerRemoved(peer, wasRemoved));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same as previous.

return wasRemoved;
}

public boolean contains(final Peer peer) {
return maintainedPeers.contains(peer);
}

public int size() {
return maintainedPeers.size();
}

public void subscribeAdd(final PeerAddedCallback callback) {
addedSubscribers.subscribe(callback);
}

public void subscribeRemove(final PeerRemovedCallback callback) {
removedCallbackSubscribers.subscribe(callback);
}

public Stream<Peer> streamPeers() {
return maintainedPeers.stream();
}

@FunctionalInterface
public interface PeerAddedCallback {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

FunctionalInterface ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

good call - done

void onPeerAdded(Peer peer, boolean wasAdded);
}

@FunctionalInterface
public interface PeerRemovedCallback {
void onPeerRemoved(Peer peer, boolean wasRemoved);
}
}
Loading