Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[PAN-3155] Handle discovery peers with updated endpoints #12

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 0 additions & 2 deletions acceptance-tests/dsl/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,4 @@ dependencies {
implementation 'tech.pegasys.ethsigner.internal:core'
implementation 'tech.pegasys.ethsigner.internal:file-based'
implementation 'tech.pegasys.ethsigner.internal:signing-api'


}
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,11 @@ public Endpoint getEndpoint() {
return endpoint;
}

public boolean discoveryEndpointMatches(final DiscoveryPeer peer) {
return peer.getEndpoint().getHost().equals(endpoint.getHost())
&& peer.getEndpoint().getUdpPort() == endpoint.getUdpPort();
}

@Override
public String toString() {
final StringBuilder sb = new StringBuilder("DiscoveryPeer{");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,7 @@ synchronized Optional<DiscoveryPeer> add(final DiscoveryPeer peer)

// Avoid duplicating the peer if it already exists in the bucket.
for (int i = 0; i <= tailIndex; i++) {
if (peer.equals(kBucket[i])) {
if (peer.getId().equals(kBucket[i].getId())) {
throw new IllegalArgumentException(
String.format("Tried to add duplicate peer to k-bucket: %s", peer.getId()));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -293,7 +293,8 @@ public void onMessage(final Packet packet, final DiscoveryPeer sender) {
}

// Load the peer from the table, or use the instance that comes in.
final Optional<DiscoveryPeer> maybeKnownPeer = peerTable.get(sender);
final Optional<DiscoveryPeer> maybeKnownPeer =
peerTable.get(sender).filter(known -> known.discoveryEndpointMatches(sender));
final DiscoveryPeer peer = maybeKnownPeer.orElse(sender);
final boolean peerKnown = maybeKnownPeer.isPresent();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;

import org.hyperledger.besu.crypto.SECP256K1;
import org.hyperledger.besu.crypto.SECP256K1.KeyPair;
import org.hyperledger.besu.ethereum.p2p.discovery.PeerDiscoveryTestHelper.AgentBuilder;
import org.hyperledger.besu.ethereum.p2p.discovery.internal.FindNeighborsPacketData;
import org.hyperledger.besu.ethereum.p2p.discovery.internal.MockPeerDiscoveryAgent;
Expand Down Expand Up @@ -308,6 +310,104 @@ public void bonding_disallowOutgoingBonding() {
assertThat(remoteIncomingPackets).isEmpty();
}

/**
* These tests simulates the case where a node crashes then comes back up with a new ip address or
* listening port.
*/
@Test
public void bonding_simulatePeerRestartingWithNewEndpoint_updatedPort() {
simulatePeerRestartingOnDifferentEndpoint(false, true);
}

@Test
public void bonding_simulatePeerRestartingWithNewEndpoint_updatedHost() {
simulatePeerRestartingOnDifferentEndpoint(true, false);
}

@Test
public void bonding_simulatePeerRestartingWithNewEndpoint_updatedHostAndPort() {
simulatePeerRestartingOnDifferentEndpoint(true, true);
}

public void simulatePeerRestartingOnDifferentEndpoint(
final boolean updateHost, final boolean updatePort) {
// Setup peer
final MockPeerDiscoveryAgent agent = helper.startDiscoveryAgent();
final DiscoveryPeer agentPeer = agent.getAdvertisedPeer().get();

final KeyPair remoteKeyPair = SECP256K1.KeyPair.generate();
final String remoteIp = "1.2.3.4";
final MockPeerDiscoveryAgent remoteAgent =
helper.createDiscoveryAgent(
helper
.agentBuilder()
.keyPair(remoteKeyPair)
.advertisedHost(remoteIp)
.bootstrapPeers(agentPeer));

agent.start(999);
remoteAgent.start(888);
final DiscoveryPeer remotePeer = remoteAgent.getAdvertisedPeer().get();

// Remote agent should have bonded with agent
assertThat(agent.streamDiscoveredPeers()).hasSize(1);
assertThat(agent.streamDiscoveredPeers()).contains(remoteAgent.getAdvertisedPeer().get());

// Create a new remote agent with same id, and new endpoint
remoteAgent.stop();
final int newPort = updatePort ? 0 : remotePeer.getEndpoint().getUdpPort();
final String newIp = updateHost ? "1.2.3.5" : remoteIp;
final MockPeerDiscoveryAgent updatedRemoteAgent =
helper.createDiscoveryAgent(
helper
.agentBuilder()
.keyPair(remoteKeyPair)
.advertisedHost(newIp)
.bindPort(newPort)
.bootstrapPeers(agentPeer));
updatedRemoteAgent.start(889);
final DiscoveryPeer updatedRemotePeer = updatedRemoteAgent.getAdvertisedPeer().get();

// Sanity check
assertThat(
updatedRemotePeer.getEndpoint().getUdpPort() == remotePeer.getEndpoint().getUdpPort())
.isEqualTo(!updatePort);
assertThat(updatedRemotePeer.getEndpoint().getHost().equals(remotePeer.getEndpoint().getHost()))
.isEqualTo(!updateHost);
assertThat(updatedRemotePeer.getId()).isEqualTo(remotePeer.getId());

// Check that our restarted agent receives a PONG response
final List<IncomingPacket> incomingPackets = updatedRemoteAgent.getIncomingPackets();
assertThat(incomingPackets).hasSizeGreaterThan(0);
final long pongCount =
incomingPackets.stream()
.filter(packet -> packet.fromAgent.equals(agent))
.filter(packet -> packet.packet.getType().equals(PacketType.PONG))
.count();
assertThat(pongCount).isGreaterThan(0);

// Check that agent has an endpoint matching the restarted node
final List<DiscoveryPeer> matchingPeers =
agent
.streamDiscoveredPeers()
.filter(peer -> peer.getId().equals(updatedRemotePeer.getId()))
.collect(toList());
// We should have only one peer matching this id
assertThat(matchingPeers.size()).isEqualTo(1);
final DiscoveryPeer discoveredPeer = matchingPeers.get(0);
assertThat(discoveredPeer.getEndpoint().getUdpPort())
.isEqualTo(updatedRemotePeer.getEndpoint().getUdpPort());
assertThat(discoveredPeer.getEndpoint().getHost())
.isEqualTo(updatedRemotePeer.getEndpoint().getHost());
// Check endpoint is consistent with enodeURL
assertThat(discoveredPeer.getEnodeURL().getDiscoveryPortOrZero())
.isEqualTo(updatedRemotePeer.getEndpoint().getUdpPort());
assertThat(discoveredPeer.getEnodeURL().getListeningPortOrZero())
.isEqualTo(updatedRemotePeer.getEndpoint().getFunctionalTcpPort());
assertThat(discoveredPeer.getEnodeURL().getIpAsString())
.isEqualTo(updatedRemotePeer.getEndpoint().getHost());
}

@Test
public void neighbors_allowOutgoingRequest() {
// Setup peer
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
*/
package org.hyperledger.besu.ethereum.p2p.discovery;

import static com.google.common.base.Preconditions.checkNotNull;
import static java.util.Arrays.asList;

import org.hyperledger.besu.crypto.SECP256K1;
Expand All @@ -31,6 +32,7 @@
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.OptionalInt;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Collectors;
import java.util.stream.Stream;
Expand Down Expand Up @@ -179,6 +181,9 @@ public static class AgentBuilder {
private List<EnodeURL> bootnodes = Collections.emptyList();
private boolean active = true;
private PeerPermissions peerPermissions = PeerPermissions.noop();
private String advertisedHost = "127.0.0.1";
private OptionalInt bindPort = OptionalInt.empty();
private KeyPair keyPair = SECP256K1.KeyPair.generate();

private AgentBuilder(
final Map<BytesValue, MockPeerDiscoveryAgent> agents,
Expand Down Expand Up @@ -215,14 +220,37 @@ public AgentBuilder active(final boolean active) {
return this;
}

public AgentBuilder advertisedHost(final String host) {
checkNotNull(host);
this.advertisedHost = host;
return this;
}

public AgentBuilder bindPort(final int bindPort) {
if (bindPort == 0) {
// Zero means pick the next available port
this.bindPort = OptionalInt.empty();
return this;
}
this.bindPort = OptionalInt.of(bindPort);
return this;
}

public AgentBuilder keyPair(final KeyPair keyPair) {
checkNotNull(keyPair);
this.keyPair = keyPair;
return this;
}

public MockPeerDiscoveryAgent build() {
final int port = bindPort.orElseGet(nextAvailablePort::incrementAndGet);
final DiscoveryConfiguration config = new DiscoveryConfiguration();
config.setBootnodes(bootnodes);
config.setBindPort(nextAvailablePort.incrementAndGet());
config.setAdvertisedHost(advertisedHost);
config.setBindPort(port);
config.setActive(active);

return new MockPeerDiscoveryAgent(
SECP256K1.KeyPair.generate(), config, peerPermissions, agents);
return new MockPeerDiscoveryAgent(keyPair, config, peerPermissions, agents);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ public class MockPeerDiscoveryAgent extends PeerDiscoveryAgent {
// The set of known agents operating on the network
private final Map<BytesValue, MockPeerDiscoveryAgent> agentNetwork;
private final Deque<IncomingPacket> incomingPackets = new ArrayDeque<>();
private boolean isRunning = false;

public MockPeerDiscoveryAgent(
final KeyPair keyPair,
Expand Down Expand Up @@ -65,25 +66,45 @@ public List<IncomingPacket> getIncomingPackets() {

@Override
protected CompletableFuture<InetSocketAddress> listenForConnections() {
isRunning = true;
// Skip network setup for tests
InetSocketAddress address =
new InetSocketAddress(config.getAdvertisedHost(), config.getBindPort());
InetSocketAddress address = new InetSocketAddress(config.getBindHost(), config.getBindPort());
return CompletableFuture.completedFuture(address);
}

@Override
protected CompletableFuture<Void> sendOutgoingPacket(
final DiscoveryPeer toPeer, final Packet packet) {
CompletableFuture<Void> result = new CompletableFuture<>();
if (!this.isRunning) {
result.completeExceptionally(new Exception("Attempt to send message from an inactive agent"));
}

MockPeerDiscoveryAgent toAgent = agentNetwork.get(toPeer.getId());
if (toAgent == null) {
result.completeExceptionally(
new Exception(
"Attempt to send to unknown peer. Agents must be constructed through PeerDiscoveryTestHelper."));
return result;
}

final DiscoveryPeer agentPeer = toAgent.getAdvertisedPeer().get();
if (!toPeer.getEndpoint().getHost().equals(agentPeer.getEndpoint().getHost())) {
LOG.warn(
"Attempt to send packet to discovery peer using the wrong host address. Sending to {}, but discovery peer is listening on {}",
toPeer.getEndpoint().getHost(),
agentPeer.getEndpoint().getHost());
} else if (toPeer.getEndpoint().getUdpPort() != agentPeer.getEndpoint().getUdpPort()) {
LOG.warn(
"Attempt to send packet to discovery peer using the wrong udp port. Sending to {}, but discovery peer is listening on {}",
toPeer.getEndpoint().getUdpPort(),
agentPeer.getEndpoint().getUdpPort());
} else if (!toAgent.isRunning) {
LOG.warn("Attempt to send packet to an inactive peer.");
} else {
toAgent.processIncomingPacket(this, packet);
result.complete(null);
}
result.complete(null);
return result;
}

Expand All @@ -99,6 +120,7 @@ protected AsyncExecutor createWorkerExecutor() {

@Override
public CompletableFuture<?> stop() {
isRunning = false;
return CompletableFuture.completedFuture(null);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,15 +14,19 @@

import static org.assertj.core.api.Assertions.assertThat;

import org.hyperledger.besu.crypto.SECP256K1.KeyPair;
import org.hyperledger.besu.ethereum.p2p.discovery.DiscoveryPeer;
import org.hyperledger.besu.ethereum.p2p.discovery.Endpoint;
import org.hyperledger.besu.ethereum.p2p.discovery.PeerDiscoveryTestHelper;
import org.hyperledger.besu.ethereum.p2p.discovery.internal.PeerTable.AddResult.AddOutcome;
import org.hyperledger.besu.ethereum.p2p.discovery.internal.PeerTable.EvictResult;
import org.hyperledger.besu.ethereum.p2p.discovery.internal.PeerTable.EvictResult.EvictOutcome;
import org.hyperledger.besu.ethereum.p2p.peers.EnodeURL;
import org.hyperledger.besu.ethereum.p2p.peers.Peer;
import org.hyperledger.besu.util.bytes.BytesValue;

import java.util.List;
import java.util.OptionalInt;

import org.junit.Test;

Expand Down Expand Up @@ -73,6 +77,69 @@ public void peerExists() {
});
}

@Test
public void peerExists_withDifferentIp() {
final PeerTable table = new PeerTable(Peer.randomId(), 16);
final BytesValue peerId = KeyPair.generate().getPublicKey().getEncodedBytes();
final DiscoveryPeer peer =
DiscoveryPeer.fromIdAndEndpoint(
peerId, new Endpoint("1.1.1.1", 30303, OptionalInt.empty()));

assertThat(table.tryAdd(peer).getOutcome()).isEqualTo(AddOutcome.ADDED);

final DiscoveryPeer duplicatePeer =
DiscoveryPeer.fromIdAndEndpoint(
peerId, new Endpoint("1.1.1.2", 30303, OptionalInt.empty()));
assertThat(table.tryAdd(duplicatePeer))
.satisfies(
result -> {
assertThat(result.getOutcome()).isEqualTo(AddOutcome.ALREADY_EXISTED);
assertThat(result.getEvictionCandidate()).isNull();
});
}

@Test
public void peerExists_withDifferentUdpPort() {
final PeerTable table = new PeerTable(Peer.randomId(), 16);
final BytesValue peerId = KeyPair.generate().getPublicKey().getEncodedBytes();
final DiscoveryPeer peer =
DiscoveryPeer.fromIdAndEndpoint(
peerId, new Endpoint("1.1.1.1", 30303, OptionalInt.empty()));

assertThat(table.tryAdd(peer).getOutcome()).isEqualTo(AddOutcome.ADDED);

final DiscoveryPeer duplicatePeer =
DiscoveryPeer.fromIdAndEndpoint(
peerId, new Endpoint("1.1.1.1", 30301, OptionalInt.empty()));
assertThat(table.tryAdd(duplicatePeer))
.satisfies(
result -> {
assertThat(result.getOutcome()).isEqualTo(AddOutcome.ALREADY_EXISTED);
assertThat(result.getEvictionCandidate()).isNull();
});
}

@Test
public void peerExists_withDifferentIdAndUdpPort() {
final PeerTable table = new PeerTable(Peer.randomId(), 16);
final BytesValue peerId = KeyPair.generate().getPublicKey().getEncodedBytes();
final DiscoveryPeer peer =
DiscoveryPeer.fromIdAndEndpoint(
peerId, new Endpoint("1.1.1.1", 30303, OptionalInt.empty()));

assertThat(table.tryAdd(peer).getOutcome()).isEqualTo(AddOutcome.ADDED);

final DiscoveryPeer duplicatePeer =
DiscoveryPeer.fromIdAndEndpoint(
peerId, new Endpoint("1.1.1.2", 30301, OptionalInt.empty()));
assertThat(table.tryAdd(duplicatePeer))
.satisfies(
result -> {
assertThat(result.getOutcome()).isEqualTo(AddOutcome.ALREADY_EXISTED);
assertThat(result.getEvictionCandidate()).isNull();
});
}

@Test
public void evictExistingPeerShouldEvict() {
final PeerTable table = new PeerTable(Peer.randomId(), 16);
Expand Down