Skip to content
This repository has been archived by the owner on Sep 26, 2019. It is now read-only.

Fix p2p PeerInfo handling #1428

Merged
merged 7 commits into from
May 10, 2019
Merged
Show file tree
Hide file tree
Changes from 6 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -135,56 +135,44 @@ public class DefaultP2PNetwork implements P2PNetwork {
private static final Logger LOG = LogManager.getLogger();
private static final int TIMEOUT_SECONDS = 30;

private ChannelFuture server;
private final EventLoopGroup boss = new NioEventLoopGroup(1);
private final EventLoopGroup workers = new NioEventLoopGroup(1);
private final ScheduledExecutorService peerConnectionScheduler =
Executors.newSingleThreadScheduledExecutor();

final Map<Capability, Subscribers<Consumer<Message>>> protocolCallbacks =
new ConcurrentHashMap<>();

private final Subscribers<Consumer<PeerConnection>> connectCallbacks = new Subscribers<>();

private final Subscribers<DisconnectCallback> disconnectCallbacks = new Subscribers<>();

private final Callbacks callbacks = new Callbacks(protocolCallbacks, disconnectCallbacks);

private final PeerDiscoveryAgent peerDiscoveryAgent;
private final PeerBlacklist peerBlacklist;

private final NetworkingConfiguration config;
private final List<Capability> supportedCapabilities;
private OptionalLong peerBondedObserverId = OptionalLong.empty();
private OptionalLong peerDroppedObserverId = OptionalLong.empty();

@VisibleForTesting public final Collection<Peer> peerMaintainConnectionList;

@VisibleForTesting public final PeerConnectionRegistry connections;

@VisibleForTesting
public final Map<Peer, CompletableFuture<PeerConnection>> pendingConnections =
new ConcurrentHashMap<>();

private final EventLoopGroup boss = new NioEventLoopGroup(1);

private final EventLoopGroup workers = new NioEventLoopGroup(1);

private volatile PeerInfo ourPeerInfo;

private final SECP256K1.KeyPair keyPair;

private ChannelFuture server;

private final int maxPeers;

private final List<SubProtocol> subProtocols;

private final LabelledMetric<Counter> outboundMessagesCounter;

private final String advertisedHost;

private final SECP256K1.KeyPair keyPair;
private final BytesValue nodeId;
private volatile OptionalInt listeningPort = OptionalInt.empty();
private volatile Optional<EnodeURL> localEnode = Optional.empty();
private volatile Optional<PeerInfo> ourPeerInfo = Optional.empty();

private final PeerBlacklist peerBlacklist;
private final Optional<NodePermissioningController> nodePermissioningController;
private final Optional<Blockchain> blockchain;

@VisibleForTesting final Collection<Peer> peerMaintainConnectionList;
@VisibleForTesting final PeerConnectionRegistry connections;

@VisibleForTesting
final Map<Peer, CompletableFuture<PeerConnection>> pendingConnections = new ConcurrentHashMap<>();

final Map<Capability, Subscribers<Consumer<Message>>> protocolCallbacks =
new ConcurrentHashMap<>();
private final Subscribers<Consumer<PeerConnection>> connectCallbacks = new Subscribers<>();
private final Subscribers<DisconnectCallback> disconnectCallbacks = new Subscribers<>();
private final Callbacks callbacks = new Callbacks(protocolCallbacks, disconnectCallbacks);

private final LabelledMetric<Counter> outboundMessagesCounter;
private OptionalLong blockAddedObserverId = OptionalLong.empty();
private OptionalLong peerBondedObserverId = OptionalLong.empty();
private OptionalLong peerDroppedObserverId = OptionalLong.empty();

private final AtomicBoolean started = new AtomicBoolean(false);
private final AtomicBoolean stopped = new AtomicBoolean(false);
Expand Down Expand Up @@ -225,8 +213,8 @@ public class DefaultP2PNetwork implements P2PNetwork {
this.peerMaintainConnectionList = new HashSet<>();
this.connections = new PeerConnectionRegistry(metricsSystem);

this.nodeId = this.keyPair.getPublicKey().getEncodedBytes();
this.subProtocols = config.getSupportedProtocols();
this.advertisedHost = config.getDiscovery().getAdvertisedHost();
this.maxPeers = config.getRlpx().getMaxPeers();

peerDiscoveryAgent.addPeerRequirement(() -> connections.size() >= maxPeers);
Expand Down Expand Up @@ -271,8 +259,12 @@ private Supplier<Integer> pendingTaskCounter(final EventLoopGroup eventLoopGroup
.sum();
}

/** Start listening for incoming connections */
private void startListening() {
/**
* Start listening for incoming connections.
*
* @return The port on which we're listening for incoming connections.
*/
private int startListening() {
server =
new ServerBootstrap()
.group(boss, workers)
Expand All @@ -293,13 +285,15 @@ private void startListening() {
LOG.error(message, future.cause());
}
checkState(socketAddress != null, message);
listeningPort = OptionalInt.of(socketAddress.getPort());
ourPeerInfo =
new PeerInfo(
5,
config.getClientId(),
supportedCapabilities,
socketAddress.getPort(),
this.keyPair.getPublicKey().getEncodedBytes());
Optional.of(
new PeerInfo(
5,
config.getClientId(),
supportedCapabilities,
listeningPort.getAsInt(),
nodeId));
LOG.info("P2PNetwork started and listening on {}", socketAddress);
latch.countDown();
});
Expand All @@ -309,6 +303,7 @@ private void startListening() {
if (!latch.await(1, TimeUnit.MINUTES)) {
throw new RuntimeException("Timed out while waiting for network startup");
}
return listeningPort.getAsInt();
} catch (final InterruptedException e) {
throw new RuntimeException("Interrupted before startup completed", e);
}
Expand All @@ -332,7 +327,7 @@ protected void initChannel(final SocketChannel ch) {
new HandshakeHandlerInbound(
keyPair,
subProtocols,
ourPeerInfo,
ourPeerInfo.get(),
connectionFuture,
callbacks,
connections,
Expand Down Expand Up @@ -437,8 +432,14 @@ public Stream<DiscoveryPeer> streamDiscoveredPeers() {

@Override
public CompletableFuture<PeerConnection> connect(final Peer peer) {
LOG.trace("Initiating connection to peer: {}", peer.getId());
final CompletableFuture<PeerConnection> connectionFuture = new CompletableFuture<>();
if (!localEnode.isPresent()) {
connectionFuture.completeExceptionally(
new IllegalStateException("Attempt to connect to peer before network is ready"));
return connectionFuture;
}

LOG.trace("Initiating connection to peer: {}", peer.getId());
final EnodeURL enode = peer.getEnodeURL();
final CompletableFuture<PeerConnection> existingPendingConnection =
pendingConnections.putIfAbsent(peer, connectionFuture);
Expand Down Expand Up @@ -471,7 +472,7 @@ protected void initChannel(final SocketChannel ch) {
keyPair,
peer,
subProtocols,
ourPeerInfo,
ourPeerInfo.get(),
connectionFuture,
callbacks,
connections,
Expand Down Expand Up @@ -528,8 +529,8 @@ public void start() {
LOG.warn("Attempted to start an already started " + getClass().getSimpleName());
}

startListening();
peerDiscoveryAgent.start(ourPeerInfo.getPort()).join();
final int listeningPort = startListening();
peerDiscoveryAgent.start(listeningPort).join();
peerBondedObserverId =
OptionalLong.of(peerDiscoveryAgent.observePeerBondedEvents(handlePeerBondedEvent()));
peerDroppedObserverId =
Expand Down Expand Up @@ -609,22 +610,22 @@ private boolean isPeerAllowed(final Peer peer) {
}

private boolean isPeerAllowed(final EnodeURL enode) {
final Optional<EnodeURL> maybeEnode = getLocalEnode();
if (!maybeEnode.isPresent()) {
// If the network isn't ready yet, deny connections
return false;
}
final EnodeURL localEnode = maybeEnode.get();

if (peerBlacklist.contains(enode.getNodeId())) {
return false;
}
if (enode.getNodeId().equals(ourPeerInfo.getNodeId())) {
if (enode.getNodeId().equals(nodeId)) {
// Peer matches our node id
return false;
}

Optional<EnodeURL> maybeEnode = getLocalEnode();
if (!maybeEnode.isPresent()) {
// If local enode isn't yet available we can't evaluate permissions
return false;
}
return nodePermissioningController
.map(c -> c.isPermitted(maybeEnode.get(), enode))
.orElse(true);
return nodePermissioningController.map(c -> c.isPermitted(localEnode, enode)).orElse(true);
}

@VisibleForTesting
Expand Down Expand Up @@ -701,12 +702,10 @@ public Optional<EnodeURL> getLocalEnode() {
}

private void createLocalEnode() {
if (localEnode.isPresent()) {
if (localEnode.isPresent() || !listeningPort.isPresent()) {
return;
}

final BytesValue nodeId = ourPeerInfo.getNodeId();
final int listeningPort = ourPeerInfo.getPort();
final OptionalInt discoveryPort =
peerDiscoveryAgent
.getAdvertisedPeer()
Expand All @@ -718,8 +717,8 @@ private void createLocalEnode() {
final EnodeURL localEnode =
EnodeURL.builder()
.nodeId(nodeId)
.ipAddress(advertisedHost)
.listeningPort(listeningPort)
.ipAddress(config.getDiscovery().getAdvertisedHost())
.listeningPort(listeningPort.getAsInt())
.discoveryPort(discoveryPort)
.build();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
package tech.pegasys.pantheon.ethereum.p2p.network;

import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Java6Assertions.assertThatThrownBy;
import static org.assertj.core.api.Java6Assertions.catchThrowable;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.argThat;
Expand Down Expand Up @@ -112,6 +113,17 @@ public void addingMaintainedNetworkPeerStartsConnection() {
verify(network, times(1)).connect(peer);
}

@Test
public void addMaintainConnectionPeer_beforeStartingNetwork() {
final DefaultP2PNetwork network = mockNetwork();
final Peer peer = mockPeer();

assertThat(network.addMaintainConnectionPeer(peer)).isTrue();

assertThat(network.peerMaintainConnectionList).contains(peer);
verify(network, never()).connect(peer);
}

@Test
public void addingRepeatMaintainedPeersReturnsFalse() {
final P2PNetwork network = network();
Expand Down Expand Up @@ -194,6 +206,7 @@ public void shouldSendClientQuittingWhenNetworkStops() {
@Test
public void shouldntAttemptNewConnectionToPendingPeer() {
final P2PNetwork network = network();
network.start();
final Peer peer = mockPeer();

final CompletableFuture<PeerConnection> connectingFuture = network.connect(peer);
Expand Down Expand Up @@ -477,6 +490,18 @@ public void attemptPeerConnections_withNoSlotsAvailable() {
verify(network, times(0)).connect(any());
}

@Test
public void connect_beforeStartingNetwork() {
final DefaultP2PNetwork network = network();
final Peer peer = mockPeer();

final CompletableFuture<PeerConnection> connectionResult = network.connect(peer);
assertThat(connectionResult).isCompletedExceptionally();
assertThatThrownBy(connectionResult::get)
.hasCauseInstanceOf(IllegalStateException.class)
.hasMessageContaining("Attempt to connect to peer before network is ready");
}

private DiscoveryPeer createDiscoveryPeer() {
return createDiscoveryPeer(Peer.randomId(), 999);
}
Expand Down