Skip to content

Commit

Permalink
Limit the number of times we retry peer discovery interactions. (Pega…
Browse files Browse the repository at this point in the history
  • Loading branch information
ajsutton authored and tmohay committed Feb 20, 2019
1 parent f37cc2d commit ffe38f1
Show file tree
Hide file tree
Showing 3 changed files with 54 additions and 10 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -408,7 +408,7 @@ void bond(final DiscoveryPeer peer) {

// The filter condition will be updated as soon as the action is performed.
final PeerInteractionState ping =
new PeerInteractionState(action, PacketType.PONG, (packet) -> false, true);
new PeerInteractionState(action, peer.getId(), PacketType.PONG, (packet) -> false, true);
dispatchInteraction(peer, ping);
}

Expand Down Expand Up @@ -441,7 +441,7 @@ private void findNodes(final DiscoveryPeer peer, final BytesValue target) {
sendPacket(peer, PacketType.FIND_NEIGHBORS, data);
};
final PeerInteractionState interaction =
new PeerInteractionState(action, PacketType.NEIGHBORS, packet -> true, true);
new PeerInteractionState(action, peer.getId(), PacketType.NEIGHBORS, packet -> true, true);
dispatchInteraction(peer, interaction);
}

Expand All @@ -459,7 +459,7 @@ private void dispatchInteraction(final Peer peer, final PeerInteractionState sta
if (previous != null) {
previous.cancelTimers();
}
state.execute(0);
state.execute(0, 0);
}

private void respondToPing(
Expand Down Expand Up @@ -499,11 +499,15 @@ public void setRetryDelayFunction(final RetryDelayFunction retryDelayFunction) {

/** Holds the state machine data for a peer interaction. */
private class PeerInteractionState implements Predicate<Packet> {

private static final int MAX_RETRIES = 5;
/**
* The action that led to the peer being in this state (e.g. sending a PING or NEIGHBORS
* message), in case it needs to be retried.
*/
private final Consumer<PeerInteractionState> action;

private final BytesValue peerId;
/** The expected type of the message that will transition the peer out of this state. */
private final PacketType expectedType;
/** A custom filter to accept transitions out of this state. */
Expand All @@ -515,10 +519,12 @@ private class PeerInteractionState implements Predicate<Packet> {

PeerInteractionState(
final Consumer<PeerInteractionState> action,
final BytesValue peerId,
final PacketType expectedType,
final Predicate<Packet> filter,
final boolean retryable) {
this.action = action;
this.peerId = peerId;
this.expectedType = expectedType;
this.filter = filter;
this.retryable = retryable;
Expand All @@ -540,11 +546,15 @@ void updateFilter(final Predicate<Packet> filter) {
* @param lastTimeout the previous timeout, or 0 if this is the first time the action is being
* executed.
*/
void execute(final long lastTimeout) {
void execute(final long lastTimeout, final int retryCount) {
action.accept(this);
if (retryable) {
if (retryable && retryCount < MAX_RETRIES) {
final long newTimeout = retryDelayFunction.apply(lastTimeout);
timerId = OptionalLong.of(timerUtil.setTimer(newTimeout, () -> execute(newTimeout)));
timerId =
OptionalLong.of(
timerUtil.setTimer(newTimeout, () -> execute(newTimeout, retryCount + 1)));
} else {
inflightInteractions.remove(peerId);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,8 +50,7 @@ public void runHandlers() {

public void runTimerHandlers() {
// Create a copy of the handlers to avoid concurrent modification as handlers run
List<TimerHandler> handlers = new ArrayList<>();
timerHandlers.forEach((id, handler) -> handlers.add(handler));
final List<TimerHandler> handlers = new ArrayList<>(timerHandlers.values());
timerHandlers.clear();

handlers.forEach(TimerHandler::handle);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -192,19 +192,54 @@ public void bootstrapPeersRetriesStoppedUponResponse() {
controller.onMessage(packet, peers.get(0));

// Invoke timers again
for (int i = 0; i < 4; i++) {
for (int i = 0; i < 2; i++) {
timer.runTimerHandlers();
}

// Ensure we receive no more PING packets for peer[0].
// Assert PING packet was sent for peer[0] 4 times.
for (final DiscoveryPeer peer : peers) {
final int expectedCount = peer.equals(peers.get(0)) ? 4 : 8;
final int expectedCount = peer.equals(peers.get(0)) ? 4 : 6;
verify(outboundMessageHandler, times(expectedCount))
.send(eq(peer), matchPacketOfType(PacketType.PING));
}
}

@Test
public void shouldStopRetryingInteractionWhenLimitIsReached() {
// Create peers.
final List<SECP256K1.KeyPair> keyPairs = PeerDiscoveryTestHelper.generateKeyPairs(3);
final List<DiscoveryPeer> peers = helper.createDiscoveryPeers(keyPairs);

final MockTimerUtil timer = new MockTimerUtil();
final OutboundMessageHandler outboundMessageHandler = mock(OutboundMessageHandler.class);
controller =
getControllerBuilder()
.peers(peers)
.timerUtil(timer)
.outboundMessageHandler(outboundMessageHandler)
.build();

// Mock the creation of the PING packet, so that we can control the hash,
// which gets validated when receiving the PONG.
final PingPacketData mockPing =
PingPacketData.create(localPeer.getEndpoint(), peers.get(0).getEndpoint());
final Packet mockPacket = Packet.create(PacketType.PING, mockPing, keyPairs.get(0));
doReturn(mockPacket).when(controller).createPacket(eq(PacketType.PING), any());

controller.start();

// Invoke timers several times so that ping to peers should be resent
for (int i = 0; i < 10; i++) {
timer.runTimerHandlers();
}

// Assert PING packet was sent only 6 times (initial attempt plus 5 retries)
for (final DiscoveryPeer peer : peers) {
verify(outboundMessageHandler, times(6)).send(eq(peer), matchPacketOfType(PacketType.PING));
}
}

@Test
public void bootstrapPeersPongReceived_HashMatched() {
// Create peers.
Expand Down

0 comments on commit ffe38f1

Please sign in to comment.