diff --git a/ethereum/p2p/src/main/java/tech/pegasys/pantheon/ethereum/p2p/network/DefaultP2PNetwork.java b/ethereum/p2p/src/main/java/tech/pegasys/pantheon/ethereum/p2p/network/DefaultP2PNetwork.java index f225ca4093..1b87a3fa91 100644 --- a/ethereum/p2p/src/main/java/tech/pegasys/pantheon/ethereum/p2p/network/DefaultP2PNetwork.java +++ b/ethereum/p2p/src/main/java/tech/pegasys/pantheon/ethereum/p2p/network/DefaultP2PNetwork.java @@ -135,56 +135,44 @@ public class DefaultP2PNetwork implements P2PNetwork { private static final Logger LOG = LogManager.getLogger(); private static final int TIMEOUT_SECONDS = 30; + private ChannelFuture server; + private final EventLoopGroup boss = new NioEventLoopGroup(1); + private final EventLoopGroup workers = new NioEventLoopGroup(1); private final ScheduledExecutorService peerConnectionScheduler = Executors.newSingleThreadScheduledExecutor(); - - final Map>> protocolCallbacks = - new ConcurrentHashMap<>(); - - private final Subscribers> connectCallbacks = new Subscribers<>(); - - private final Subscribers disconnectCallbacks = new Subscribers<>(); - - private final Callbacks callbacks = new Callbacks(protocolCallbacks, disconnectCallbacks); - private final PeerDiscoveryAgent peerDiscoveryAgent; - private final PeerBlacklist peerBlacklist; + private final NetworkingConfiguration config; private final List supportedCapabilities; - private OptionalLong peerBondedObserverId = OptionalLong.empty(); - private OptionalLong peerDroppedObserverId = OptionalLong.empty(); - - @VisibleForTesting public final Collection peerMaintainConnectionList; - - @VisibleForTesting public final PeerConnectionRegistry connections; - - @VisibleForTesting - public final Map> pendingConnections = - new ConcurrentHashMap<>(); - - private final EventLoopGroup boss = new NioEventLoopGroup(1); - - private final EventLoopGroup workers = new NioEventLoopGroup(1); - - private volatile PeerInfo ourPeerInfo; - - private final SECP256K1.KeyPair keyPair; - - private ChannelFuture server; - private final int maxPeers; - private final List subProtocols; - private final LabelledMetric outboundMessagesCounter; - - private final String advertisedHost; - + private final SECP256K1.KeyPair keyPair; + private final BytesValue nodeId; + private volatile OptionalInt listeningPort = OptionalInt.empty(); private volatile Optional localEnode = Optional.empty(); + private volatile Optional ourPeerInfo = Optional.empty(); + private final PeerBlacklist peerBlacklist; private final Optional nodePermissioningController; private final Optional blockchain; + + @VisibleForTesting final Collection peerMaintainConnectionList; + @VisibleForTesting final PeerConnectionRegistry connections; + + @VisibleForTesting + final Map> pendingConnections = new ConcurrentHashMap<>(); + + final Map>> protocolCallbacks = + new ConcurrentHashMap<>(); + private final Subscribers> connectCallbacks = new Subscribers<>(); + private final Subscribers disconnectCallbacks = new Subscribers<>(); + private final Callbacks callbacks = new Callbacks(protocolCallbacks, disconnectCallbacks); + + private final LabelledMetric outboundMessagesCounter; private OptionalLong blockAddedObserverId = OptionalLong.empty(); + private OptionalLong peerBondedObserverId = OptionalLong.empty(); + private OptionalLong peerDroppedObserverId = OptionalLong.empty(); private final AtomicBoolean started = new AtomicBoolean(false); private final AtomicBoolean stopped = new AtomicBoolean(false); @@ -225,8 +213,8 @@ public class DefaultP2PNetwork implements P2PNetwork { this.peerMaintainConnectionList = new HashSet<>(); this.connections = new PeerConnectionRegistry(metricsSystem); + this.nodeId = this.keyPair.getPublicKey().getEncodedBytes(); this.subProtocols = config.getSupportedProtocols(); - this.advertisedHost = config.getDiscovery().getAdvertisedHost(); this.maxPeers = config.getRlpx().getMaxPeers(); peerDiscoveryAgent.addPeerRequirement(() -> connections.size() >= maxPeers); @@ -271,8 +259,12 @@ private Supplier pendingTaskCounter(final EventLoopGroup eventLoopGroup .sum(); } - /** Start listening for incoming connections */ - private void startListening() { + /** + * Start listening for incoming connections. + * + * @return The port on which we're listening for incoming connections. + */ + private int startListening() { server = new ServerBootstrap() .group(boss, workers) @@ -293,13 +285,15 @@ private void startListening() { LOG.error(message, future.cause()); } checkState(socketAddress != null, message); + listeningPort = OptionalInt.of(socketAddress.getPort()); ourPeerInfo = - new PeerInfo( - 5, - config.getClientId(), - supportedCapabilities, - socketAddress.getPort(), - this.keyPair.getPublicKey().getEncodedBytes()); + Optional.of( + new PeerInfo( + 5, + config.getClientId(), + supportedCapabilities, + listeningPort.getAsInt(), + nodeId)); LOG.info("P2PNetwork started and listening on {}", socketAddress); latch.countDown(); }); @@ -309,6 +303,7 @@ private void startListening() { if (!latch.await(1, TimeUnit.MINUTES)) { throw new RuntimeException("Timed out while waiting for network startup"); } + return listeningPort.getAsInt(); } catch (final InterruptedException e) { throw new RuntimeException("Interrupted before startup completed", e); } @@ -332,7 +327,7 @@ protected void initChannel(final SocketChannel ch) { new HandshakeHandlerInbound( keyPair, subProtocols, - ourPeerInfo, + ourPeerInfo.get(), connectionFuture, callbacks, connections, @@ -437,8 +432,14 @@ public Stream streamDiscoveredPeers() { @Override public CompletableFuture connect(final Peer peer) { - LOG.trace("Initiating connection to peer: {}", peer.getId()); final CompletableFuture connectionFuture = new CompletableFuture<>(); + if (!localEnode.isPresent()) { + connectionFuture.completeExceptionally( + new IllegalStateException("Attempt to connect to peer before network is ready")); + return connectionFuture; + } + + LOG.trace("Initiating connection to peer: {}", peer.getId()); final EnodeURL enode = peer.getEnodeURL(); final CompletableFuture existingPendingConnection = pendingConnections.putIfAbsent(peer, connectionFuture); @@ -471,7 +472,7 @@ protected void initChannel(final SocketChannel ch) { keyPair, peer, subProtocols, - ourPeerInfo, + ourPeerInfo.get(), connectionFuture, callbacks, connections, @@ -528,8 +529,8 @@ public void start() { LOG.warn("Attempted to start an already started " + getClass().getSimpleName()); } - startListening(); - peerDiscoveryAgent.start(ourPeerInfo.getPort()).join(); + final int listeningPort = startListening(); + peerDiscoveryAgent.start(listeningPort).join(); peerBondedObserverId = OptionalLong.of(peerDiscoveryAgent.observePeerBondedEvents(handlePeerBondedEvent())); peerDroppedObserverId = @@ -609,22 +610,22 @@ private boolean isPeerAllowed(final Peer peer) { } private boolean isPeerAllowed(final EnodeURL enode) { + final Optional maybeEnode = getLocalEnode(); + if (!maybeEnode.isPresent()) { + // If the network isn't ready yet, deny connections + return false; + } + final EnodeURL localEnode = maybeEnode.get(); + if (peerBlacklist.contains(enode.getNodeId())) { return false; } - if (enode.getNodeId().equals(ourPeerInfo.getNodeId())) { + if (enode.getNodeId().equals(nodeId)) { // Peer matches our node id return false; } - Optional maybeEnode = getLocalEnode(); - if (!maybeEnode.isPresent()) { - // If local enode isn't yet available we can't evaluate permissions - return false; - } - return nodePermissioningController - .map(c -> c.isPermitted(maybeEnode.get(), enode)) - .orElse(true); + return nodePermissioningController.map(c -> c.isPermitted(localEnode, enode)).orElse(true); } @VisibleForTesting @@ -701,12 +702,10 @@ public Optional getLocalEnode() { } private void createLocalEnode() { - if (localEnode.isPresent()) { + if (localEnode.isPresent() || !listeningPort.isPresent()) { return; } - final BytesValue nodeId = ourPeerInfo.getNodeId(); - final int listeningPort = ourPeerInfo.getPort(); final OptionalInt discoveryPort = peerDiscoveryAgent .getAdvertisedPeer() @@ -718,8 +717,8 @@ private void createLocalEnode() { final EnodeURL localEnode = EnodeURL.builder() .nodeId(nodeId) - .ipAddress(advertisedHost) - .listeningPort(listeningPort) + .ipAddress(config.getDiscovery().getAdvertisedHost()) + .listeningPort(listeningPort.getAsInt()) .discoveryPort(discoveryPort) .build(); diff --git a/ethereum/p2p/src/test/java/tech/pegasys/pantheon/ethereum/p2p/network/DefaultP2PNetworkTest.java b/ethereum/p2p/src/test/java/tech/pegasys/pantheon/ethereum/p2p/network/DefaultP2PNetworkTest.java index cea88e0e94..c6e0a2a371 100644 --- a/ethereum/p2p/src/test/java/tech/pegasys/pantheon/ethereum/p2p/network/DefaultP2PNetworkTest.java +++ b/ethereum/p2p/src/test/java/tech/pegasys/pantheon/ethereum/p2p/network/DefaultP2PNetworkTest.java @@ -13,6 +13,7 @@ package tech.pegasys.pantheon.ethereum.p2p.network; import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Java6Assertions.assertThatThrownBy; import static org.assertj.core.api.Java6Assertions.catchThrowable; import static org.mockito.ArgumentMatchers.any; import static org.mockito.ArgumentMatchers.argThat; @@ -112,6 +113,17 @@ public void addingMaintainedNetworkPeerStartsConnection() { verify(network, times(1)).connect(peer); } + @Test + public void addMaintainConnectionPeer_beforeStartingNetwork() { + final DefaultP2PNetwork network = mockNetwork(); + final Peer peer = mockPeer(); + + assertThat(network.addMaintainConnectionPeer(peer)).isTrue(); + + assertThat(network.peerMaintainConnectionList).contains(peer); + verify(network, never()).connect(peer); + } + @Test public void addingRepeatMaintainedPeersReturnsFalse() { final P2PNetwork network = network(); @@ -194,6 +206,7 @@ public void shouldSendClientQuittingWhenNetworkStops() { @Test public void shouldntAttemptNewConnectionToPendingPeer() { final P2PNetwork network = network(); + network.start(); final Peer peer = mockPeer(); final CompletableFuture connectingFuture = network.connect(peer); @@ -477,6 +490,18 @@ public void attemptPeerConnections_withNoSlotsAvailable() { verify(network, times(0)).connect(any()); } + @Test + public void connect_beforeStartingNetwork() { + final DefaultP2PNetwork network = network(); + final Peer peer = mockPeer(); + + final CompletableFuture connectionResult = network.connect(peer); + assertThat(connectionResult).isCompletedExceptionally(); + assertThatThrownBy(connectionResult::get) + .hasCauseInstanceOf(IllegalStateException.class) + .hasMessageContaining("Attempt to connect to peer before network is ready"); + } + private DiscoveryPeer createDiscoveryPeer() { return createDiscoveryPeer(Peer.randomId(), 999); }