diff --git a/ethereum/p2p/src/main/java/tech/pegasys/pantheon/ethereum/p2p/discovery/internal/PeerDiscoveryController.java b/ethereum/p2p/src/main/java/tech/pegasys/pantheon/ethereum/p2p/discovery/internal/PeerDiscoveryController.java index 7b53f389fc..0a54b12a0f 100644 --- a/ethereum/p2p/src/main/java/tech/pegasys/pantheon/ethereum/p2p/discovery/internal/PeerDiscoveryController.java +++ b/ethereum/p2p/src/main/java/tech/pegasys/pantheon/ethereum/p2p/discovery/internal/PeerDiscoveryController.java @@ -408,7 +408,7 @@ void bond(final DiscoveryPeer peer) { // The filter condition will be updated as soon as the action is performed. final PeerInteractionState ping = - new PeerInteractionState(action, PacketType.PONG, (packet) -> false, true); + new PeerInteractionState(action, peer.getId(), PacketType.PONG, (packet) -> false, true); dispatchInteraction(peer, ping); } @@ -441,7 +441,7 @@ private void findNodes(final DiscoveryPeer peer, final BytesValue target) { sendPacket(peer, PacketType.FIND_NEIGHBORS, data); }; final PeerInteractionState interaction = - new PeerInteractionState(action, PacketType.NEIGHBORS, packet -> true, true); + new PeerInteractionState(action, peer.getId(), PacketType.NEIGHBORS, packet -> true, true); dispatchInteraction(peer, interaction); } @@ -459,7 +459,7 @@ private void dispatchInteraction(final Peer peer, final PeerInteractionState sta if (previous != null) { previous.cancelTimers(); } - state.execute(0); + state.execute(0, 0); } private void respondToPing( @@ -499,11 +499,15 @@ public void setRetryDelayFunction(final RetryDelayFunction retryDelayFunction) { /** Holds the state machine data for a peer interaction. */ private class PeerInteractionState implements Predicate { + + private static final int MAX_RETRIES = 5; /** * The action that led to the peer being in this state (e.g. sending a PING or NEIGHBORS * message), in case it needs to be retried. */ private final Consumer action; + + private final BytesValue peerId; /** The expected type of the message that will transition the peer out of this state. */ private final PacketType expectedType; /** A custom filter to accept transitions out of this state. */ @@ -515,10 +519,12 @@ private class PeerInteractionState implements Predicate { PeerInteractionState( final Consumer action, + final BytesValue peerId, final PacketType expectedType, final Predicate filter, final boolean retryable) { this.action = action; + this.peerId = peerId; this.expectedType = expectedType; this.filter = filter; this.retryable = retryable; @@ -540,11 +546,15 @@ void updateFilter(final Predicate filter) { * @param lastTimeout the previous timeout, or 0 if this is the first time the action is being * executed. */ - void execute(final long lastTimeout) { + void execute(final long lastTimeout, final int retryCount) { action.accept(this); - if (retryable) { + if (retryable && retryCount < MAX_RETRIES) { final long newTimeout = retryDelayFunction.apply(lastTimeout); - timerId = OptionalLong.of(timerUtil.setTimer(newTimeout, () -> execute(newTimeout))); + timerId = + OptionalLong.of( + timerUtil.setTimer(newTimeout, () -> execute(newTimeout, retryCount + 1))); + } else { + inflightInteractions.remove(peerId); } } diff --git a/ethereum/p2p/src/test/java/tech/pegasys/pantheon/ethereum/p2p/discovery/internal/MockTimerUtil.java b/ethereum/p2p/src/test/java/tech/pegasys/pantheon/ethereum/p2p/discovery/internal/MockTimerUtil.java index b0a60433b6..6f2ce1ec66 100644 --- a/ethereum/p2p/src/test/java/tech/pegasys/pantheon/ethereum/p2p/discovery/internal/MockTimerUtil.java +++ b/ethereum/p2p/src/test/java/tech/pegasys/pantheon/ethereum/p2p/discovery/internal/MockTimerUtil.java @@ -50,8 +50,7 @@ public void runHandlers() { public void runTimerHandlers() { // Create a copy of the handlers to avoid concurrent modification as handlers run - List handlers = new ArrayList<>(); - timerHandlers.forEach((id, handler) -> handlers.add(handler)); + final List handlers = new ArrayList<>(timerHandlers.values()); timerHandlers.clear(); handlers.forEach(TimerHandler::handle); diff --git a/ethereum/p2p/src/test/java/tech/pegasys/pantheon/ethereum/p2p/discovery/internal/PeerDiscoveryControllerTest.java b/ethereum/p2p/src/test/java/tech/pegasys/pantheon/ethereum/p2p/discovery/internal/PeerDiscoveryControllerTest.java index e024d28551..764253c03c 100644 --- a/ethereum/p2p/src/test/java/tech/pegasys/pantheon/ethereum/p2p/discovery/internal/PeerDiscoveryControllerTest.java +++ b/ethereum/p2p/src/test/java/tech/pegasys/pantheon/ethereum/p2p/discovery/internal/PeerDiscoveryControllerTest.java @@ -192,19 +192,54 @@ public void bootstrapPeersRetriesStoppedUponResponse() { controller.onMessage(packet, peers.get(0)); // Invoke timers again - for (int i = 0; i < 4; i++) { + for (int i = 0; i < 2; i++) { timer.runTimerHandlers(); } // Ensure we receive no more PING packets for peer[0]. // Assert PING packet was sent for peer[0] 4 times. for (final DiscoveryPeer peer : peers) { - final int expectedCount = peer.equals(peers.get(0)) ? 4 : 8; + final int expectedCount = peer.equals(peers.get(0)) ? 4 : 6; verify(outboundMessageHandler, times(expectedCount)) .send(eq(peer), matchPacketOfType(PacketType.PING)); } } + @Test + public void shouldStopRetryingInteractionWhenLimitIsReached() { + // Create peers. + final List keyPairs = PeerDiscoveryTestHelper.generateKeyPairs(3); + final List peers = helper.createDiscoveryPeers(keyPairs); + + final MockTimerUtil timer = new MockTimerUtil(); + final OutboundMessageHandler outboundMessageHandler = mock(OutboundMessageHandler.class); + controller = + getControllerBuilder() + .peers(peers) + .timerUtil(timer) + .outboundMessageHandler(outboundMessageHandler) + .build(); + + // Mock the creation of the PING packet, so that we can control the hash, + // which gets validated when receiving the PONG. + final PingPacketData mockPing = + PingPacketData.create(localPeer.getEndpoint(), peers.get(0).getEndpoint()); + final Packet mockPacket = Packet.create(PacketType.PING, mockPing, keyPairs.get(0)); + doReturn(mockPacket).when(controller).createPacket(eq(PacketType.PING), any()); + + controller.start(); + + // Invoke timers several times so that ping to peers should be resent + for (int i = 0; i < 10; i++) { + timer.runTimerHandlers(); + } + + // Assert PING packet was sent only 6 times (initial attempt plus 5 retries) + for (final DiscoveryPeer peer : peers) { + verify(outboundMessageHandler, times(6)).send(eq(peer), matchPacketOfType(PacketType.PING)); + } + } + @Test public void bootstrapPeersPongReceived_HashMatched() { // Create peers.