-
Notifications
You must be signed in to change notification settings - Fork 130
Integration of RecursivePeerRefreshState and PeerDiscoveryController #420
Integration of RecursivePeerRefreshState and PeerDiscoveryController #420
Conversation
15a7c0b
to
c04e6f3
Compare
7062a0e
to
6081235
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
My main concern is with the core RecursivePeerRefreshState
functionality - we can discuss offline.
} | ||
addToPeerTable(peer); | ||
if (interaction.isBootstrap()) { | ||
findNodes(peer, agent.getAdvertisedPeer().getId()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think this findNodes request should be handled in recursivePeerRefreshState
final PeerInteractionState interaction = | ||
new PeerInteractionState(action, PacketType.NEIGHBORS, packet -> true, true, false); | ||
dispatchInteraction(peer, interaction); | ||
new PeerInteractionState(action, PacketType.NEIGHBORS, filter -> true, true, false); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think the original packet
var name is clearer
new PeerInteractionState(action, PacketType.NEIGHBORS, packet -> true, true, false); | ||
dispatchInteraction(peer, interaction); | ||
new PeerInteractionState(action, PacketType.NEIGHBORS, filter -> true, true, false); | ||
interaction.execute(0); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Not sure if you still need this interaction object or not? It does contain some retry functionality. If you think its worth using interaction, you'll need to clean up the retry timer.
final BytesValue target, | ||
final PeerBlacklist peerBlacklist, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Any reason to cut the blacklist here? This should be passed by reference, so no need to keep sending in the same instance of the blacklist via onNeighboursPacketReceived
.
private final BondingAgent bondingAgent; | ||
private final NeighborFinder neighborFinder; | ||
private final List<PeerDistance> anteList; | ||
private final List<OutstandingRequest> outstandingRequestList; | ||
private final List<BytesValue> contactedInCurrentExecution; | ||
private final List<BytesValue> dispatchedFindNeighbours; | ||
private final List<BytesValue> dispatchedBond; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should these be maps rather than lists? A HashMap
will have O(1) rather than O(n) lookup speed.
|
||
final RecursivePeerRefreshState recursivePeerRefreshState = | ||
controller.getRecursivePeerRefreshState(); | ||
recursivePeerRefreshState.addToOutstandingRequestList(peers[0]); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why was this block added?
|
||
await() | ||
.atMost(5, TimeUnit.SECONDS) | ||
.untilAsserted(() -> assertThat(peerDiscoveryAgent_TEST.getPeers()).hasSize(2)); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What is this test meant to prove? It looks like you set up agent0 with no peers, agent1 with peers [agent0], then wait for those two nodes to exchange messages so that they are each tracking the other. Then agent TEST is setup with peers [agent0], and TEST will ask agent0 for all of its peers. This doesn't seem get at the iterative search though since agent0 already has the other peer in its table, so no recursion is needed.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This test was initially failing as a result of the changes I made to the controller, so I deconstructed it to figure out exactly why.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
So, can we cut this? Sounds like you added this as a debugging step?
...main/java/tech/pegasys/pantheon/ethereum/p2p/discovery/internal/PeerDiscoveryController.java
Show resolved
Hide resolved
|
||
recursivePeerRefreshState = | ||
new RecursivePeerRefreshState(Peer.randomId(), this::bond, this::findNodes); | ||
recursivePeerRefreshState.kickstartBootstrapPeers( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Don't you also need to run the recursive lookup when refreshTable
is invoked?
|
||
RecursivePeerRefreshState( | ||
public RecursivePeerRefreshState( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
General comment - this class doesn't seem to follow the pattern described in the docs: https://github.com/ethereum/devp2p/blob/master/discv4.md#recursive-lookup
I don't see a terminating condition for when this search would conclude.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I've added a short blerb to initiatePeerRefreshCycle
that describes the terminating condition.
a64ab90
to
8c75ee1
Compare
&& !outstandingRequestList.contains(new OutstandingRequest(candidate))) { | ||
outstandingRequestList.remove(i); | ||
&& !outstandingNeighboursRequestList.contains(new OutstandingRequest(candidate))) { | ||
outstandingNeighboursRequestList.remove(i); | ||
executeFindNodeRequest(candidate); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Adding some notes based on our offline discussion:
We talked about reworking the way that "rounds" are handled. Where a round is defined as the point at which we issue requests to a new set of peers. As it stands now, we can cycle in one or more new requests as timeouts are processed. This means that you can end up sending out < concurrentRequestLimit
requests at this point.
Instead, we should issue up to concurrentRequestLimit
messages at each round. We can do this by initiating a new round when either: a) all peers contacted during the previous round have responded with their neighbors or b) the previous round has timed out.
...in/java/tech/pegasys/pantheon/ethereum/p2p/discovery/internal/RecursivePeerRefreshState.java
Show resolved
Hide resolved
@@ -151,19 +156,15 @@ void onNeighboursPacketReceived( | |||
|
|||
private void queryNearestNodes() { | |||
final int concurrentRequestLimit = 3; | |||
if (outstandingRequestList.isEmpty()) { | |||
if (outstandingNeighboursRequestList.isEmpty()) { | |||
final List<DiscoveryPeer> queryCandidates = | |||
determineFindNodeCandidates(concurrentRequestLimit); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Adding some notes based on our offline discussion:
It looks to me like the algorithm for selecting nodes to query (defined in determineFindNodeCandidates
) is not quite right. The docs describe how to select the next set of nodes to query:
Of the k nodes the initiator has heard of closest to the target, it picks α that it has not yet queried and resends FindNode to them.
As it stands currently, it looks like you're always selecting the peers closest to the target regardless of whether we have contacted those peers already, or if those peers are contained in the set of the k
closest peers.
084fdee
to
ba895b8
Compare
ada398d
to
d91761c
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks like its getting close! I didn't get through everything, but left a few comments in the meantime.
@@ -342,24 +340,29 @@ private void refreshTableIfRequired() { | |||
} | |||
} | |||
|
|||
@VisibleForTesting | |||
public RecursivePeerRefreshState getRecursivePeerRefreshState() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can the visibility here be limited? Looks like package-private should work.
captor | ||
.getAllValues() | ||
.stream() | ||
.filter(p -> p.getType().equals(PacketType.FIND_NEIGHBORS)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It seems that this test isn't checking what it was originally intended to check. I think you probably want to revert this back to filtering for neighbors packets.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
that's inaccurate. this test does check what it was originally intended to check, however the mechanism that effectuates the action that it evaluates has changed. the now defunct comment that reads "as the controller performs refreshes, it'll send FIND_NEIGHBORS packets with random target IDs every time" provides a clue, since that doesn't describe the refresh functionality introduced in this pr. the new comment highlights specifically what has changed.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Originally, this test was causing multiple rounds of table refreshes, and inspecting the dispatched FIND_NEIGHBORS requests in each round, verifying that a unique target was picked on each refresh. As the code currently stands, you're checking that several rounds of PINGS have been sent out, and doing extra work to verify that each PING is sent to the singular peer in the table, which doesn't seem all that useful.
I suggest adding PONG responses in your for loop from the singular peer so that you can capture the FIND_NEIGHBORS requests and run the original target checks.
for (final Map.Entry<BytesValue, MetadataPeer> entry : oneTrueMap.entrySet()) { | ||
final MetadataPeer metadataPeer = entry.getValue(); | ||
if (metadataPeer.hasOutstandingBondRequest()) { | ||
metadataPeer.bondingComplete(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It's a little confusing that we're setting bonding to "complete" here when we're "cancelling" the outstanding request. It may be worth adding an explanatory comment.
private final BytesValue target; | ||
public class RecursivePeerRefreshState { | ||
private static final Logger LOG = LogManager.getLogger(); | ||
private static final int MAX_CONCURRENT_REQUESTS = 3; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Are these parameters where we want them (MAX_CONCURRENT_REQUESTS
and maxRounds
)? Seems like we could have a higher level of concurrency and a lower number of rounds.
} | ||
|
||
@VisibleForTesting | ||
List<OutstandingRequest> getOutstandingRequestList() { | ||
return outstandingRequestList; | ||
void cancelCurrentRound() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think plain cancel
might be a better name. Since we're cancelling the search altogether, not just the current round within the search.
@@ -39,6 +41,11 @@ public Bytes32 keccak256() { | |||
return keccak256; | |||
} | |||
|
|||
@VisibleForTesting | |||
public void setKeccak256(final Bytes32 keccak256) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we mock this where we need it for testing? Instead of adding this setter to the production code?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It has been mocked before, but the new paradigm of peer refresh requires that we have actual peers being created in the createPeersInLastBucket
method of PeerDiscoveryControllerTest
, so that we can query them for associated metadata.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You can stub out the behavior of real peer objects there like so:
final DiscoveryPeer peer =
spy(new DiscoveryPeer(
id,
new Endpoint(
localPeer.getEndpoint().getHost(),
100 + counter.incrementAndGet(),
OptionalInt.empty())));
doReturn(keccak).when(peer).keccak256();
for (int i = 0; i < timeouts; i++) { | ||
timer.runTimerHandlers(); | ||
} | ||
int expectedTimerEvents = (timeouts + 1) * peerCount; | ||
final int expectedTimerEvents = 1 + (timeouts + 1) * peerCount; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why add the 1 here? Is it more accurate to change times
to atLeast
on line 125: verify(timer, atLeast(expectedTimerEvents)).setTimer(anyLong(), any());
?
final Packet packet = Packet.create(PacketType.PONG, packetData, keyPairs.get(0)); | ||
controller.onMessage(packet, peers.get(0)); | ||
|
||
for (int i = 0; i < 3; i++) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
for (int i = 0; i < 3; i++) { | |
for (int i = 0; i < peers.size(); i++) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actually - I'm wondering why you changed this test to have all peers responding instead of just one?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
the "bonding round" needs to conclude before the "find neighbours round" can begin. one way to achieve that is for all peers to respond, the other way is for the round to timeout.
.../java/tech/pegasys/pantheon/ethereum/p2p/discovery/internal/PeerDiscoveryControllerTest.java
Outdated
Show resolved
Hide resolved
PongPacketData.create(localPeer.getEndpoint(), mockPacket.getHash()); | ||
final Packet packet0 = Packet.create(PacketType.PONG, packetData, keyPairs.get(i)); | ||
controller.onMessage(packet0, peers.get(i)); | ||
} | ||
|
||
// Ensure that the peer controller is now sending FIND_NEIGHBORS messages for this peer. | ||
verify(outboundMessageHandler, times(1)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You can loop over your peers here (and below) instead of explicitly checking each one by hard-coded index
…discovery/internal/PeerDiscoveryControllerTest.java Co-Authored-By: s-matthew-english <[email protected]>
@@ -180,7 +179,6 @@ private void buildBloomFilter() { | |||
public List<DiscoveryPeer> nearestPeers(final BytesValue target, final int limit) { | |||
final BytesValue keccak256 = Hash.keccak256(target); | |||
return getAllPeers().stream() | |||
.filter(p -> p.getStatus() == PeerDiscoveryStatus.BONDED) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why is this status filtering being removed?
} | ||
|
||
@VisibleForTesting | ||
public BytesValue getTarget() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks like this method is unused - can we cut it?
LOG.debug("Received neighbours packet with {} neighbours", neighboursPacket.getNodes().size()); | ||
for (final DiscoveryPeer receivedDiscoPeer : neighboursPacket.getNodes()) { | ||
if (!oneTrueMap.containsKey(receivedDiscoPeer.getId()) | ||
&& !peerBlacklist.contains(receivedDiscoPeer) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Previously, we were also filtering out known peers here (peers already in the peer table). It seems to me that we should still be checking these peers against the peer table.
@@ -39,6 +41,11 @@ public Bytes32 keccak256() { | |||
return keccak256; | |||
} | |||
|
|||
@VisibleForTesting | |||
public void setKeccak256(final Bytes32 keccak256) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You can stub out the behavior of real peer objects there like so:
final DiscoveryPeer peer =
spy(new DiscoveryPeer(
id,
new Endpoint(
localPeer.getEndpoint().getHost(),
100 + counter.incrementAndGet(),
OptionalInt.empty())));
doReturn(keccak).when(peer).keccak256();
|
||
await() | ||
.atMost(5, TimeUnit.SECONDS) | ||
.untilAsserted(() -> assertThat(peerDiscoveryAgent_TEST.getPeers()).hasSize(2)); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
So, can we cut this? Sounds like you added this as a debugging step?
...ava/tech/pegasys/pantheon/ethereum/p2p/discovery/internal/RecursivePeerRefreshStateTest.java
Outdated
Show resolved
Hide resolved
...ava/tech/pegasys/pantheon/ethereum/p2p/discovery/internal/RecursivePeerRefreshStateTest.java
Outdated
Show resolved
Hide resolved
...ava/tech/pegasys/pantheon/ethereum/p2p/discovery/internal/RecursivePeerRefreshStateTest.java
Outdated
Show resolved
Hide resolved
...ava/tech/pegasys/pantheon/ethereum/p2p/discovery/internal/RecursivePeerRefreshStateTest.java
Outdated
Show resolved
Hide resolved
this.maxRounds = maxRounds; | ||
} | ||
|
||
synchronized void start(final List<DiscoveryPeer> initialPeers, final BytesValue target) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I started to add notes on some thread-safety issues I noticed, but I looked into it more and this should all actually be single-threaded within the vertx event loop. So you should be able to cut all of the synchronized keywords.
…discovery/internal/RecursivePeerRefreshStateTest.java Co-Authored-By: s-matthew-english <[email protected]>
…discovery/internal/RecursivePeerRefreshStateTest.java Co-Authored-By: s-matthew-english <[email protected]>
…discovery/internal/RecursivePeerRefreshStateTest.java Co-Authored-By: s-matthew-english <[email protected]>
…discovery/internal/RecursivePeerRefreshStateTest.java Co-Authored-By: s-matthew-english <[email protected]>
As specified by [Implement iterative peer search] (NC-875).