Skip to content
This repository has been archived by the owner on Sep 26, 2019. It is now read-only.

Ensure devp2p ports are written to ports file correctly #1020

Merged
merged 9 commits into from
Mar 4, 2019
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ public TestNode create(

public void startNetworks() {
for (final TestNode node : nodes) {
node.network.run();
node.network.start();
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,15 +22,13 @@
import tech.pegasys.pantheon.ethereum.jsonrpc.internal.response.JsonRpcSuccessResponse;
import tech.pegasys.pantheon.ethereum.p2p.P2pDisabledException;
import tech.pegasys.pantheon.ethereum.p2p.api.P2PNetwork;
import tech.pegasys.pantheon.ethereum.p2p.wire.PeerInfo;
import tech.pegasys.pantheon.util.bytes.BytesValue;

import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.util.HashMap;
import java.util.Map;

import com.google.common.collect.ImmutableMap;
import com.google.common.net.InetAddresses;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;

Expand Down Expand Up @@ -67,25 +65,26 @@ public JsonRpcResponse response(final JsonRpcRequest req) {

try {
final Map<String, Object> response = new HashMap<>();
final BytesValue nodeId = peerNetwork.getLocalPeerInfo().getNodeId();
final InetSocketAddress address = peerNetwork.getDiscoverySocketAddress();
final int port = peerNetwork.getLocalPeerInfo().getPort();
final Map<String, Integer> ports = new HashMap<>();

final InetAddress inetAddress = address.getAddress();
response.put(
"enode",
"enode://"
+ nodeId.toString().substring(2)
+ "@"
+ InetAddresses.toUriString(inetAddress)
+ ":"
+ port);
final PeerInfo peerInfo = peerNetwork.getLocalPeerInfo();
final BytesValue nodeId = peerInfo.getNodeId();
peerNetwork
.getAdvertisedPeer()
.ifPresent(
advertisedPeer -> {
response.put("enode", advertisedPeer.getEnodeURI());
ports.put("discovery", advertisedPeer.getEndpoint().getUdpPort());
response.put("ip", advertisedPeer.getEndpoint().getHost());
response.put(
"listenAddr",
advertisedPeer.getEndpoint().getHost() + ":" + peerInfo.getPort());
});
response.put("id", nodeId.toString().substring(2));
// this doesn't provide a useful value yet.
// response.put("ip", inetAddress.getHostAddress());
response.put("listenAddr", InetAddresses.toUriString(inetAddress) + ":" + port);
response.put("name", clientVersion);
response.put("ports", ImmutableMap.of("discovery", port, "listener", port /*??*/));

ports.put("listener", peerInfo.getPort());
response.put("ports", ports);

final ChainHead chainHead = blockchainQueries.getBlockchain().getChainHead();
response.put(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@

import static org.assertj.core.api.Assertions.assertThat;
import static org.mockito.ArgumentMatchers.anyLong;
import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.when;

import tech.pegasys.pantheon.config.GenesisConfigOptions;
Expand All @@ -25,11 +26,11 @@
import tech.pegasys.pantheon.ethereum.jsonrpc.internal.queries.BlockchainQueries;
import tech.pegasys.pantheon.ethereum.jsonrpc.internal.response.JsonRpcSuccessResponse;
import tech.pegasys.pantheon.ethereum.p2p.api.P2PNetwork;
import tech.pegasys.pantheon.ethereum.p2p.peers.DefaultPeer;
import tech.pegasys.pantheon.ethereum.p2p.wire.PeerInfo;
import tech.pegasys.pantheon.util.bytes.BytesValue;
import tech.pegasys.pantheon.util.uint.UInt256;

import java.net.InetSocketAddress;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
Expand All @@ -51,17 +52,19 @@ public class AdminNodeInfoTest {

private AdminNodeInfo method;

private final PeerInfo localPeer =
new PeerInfo(5, "0x0", Collections.emptyList(), 30303, BytesValue.EMPTY);
private final InetSocketAddress discoverySocketAddress = new InetSocketAddress("1.2.3.4", 7890);
private final BytesValue nodeId =
BytesValue.fromHexString(
"0x0f1b319e32017c3fcb221841f0f978701b4e9513fe6a567a2db43d43381a9c7e3dfe7cae13cbc2f56943400bacaf9082576ab087cd51983b17d729ae796f6807");
private final PeerInfo localPeer = new PeerInfo(5, "0x0", Collections.emptyList(), 30303, nodeId);
private final ChainHead testChainHead = new ChainHead(Hash.EMPTY, UInt256.ONE);
private final GenesisConfigOptions genesisConfigOptions =
new StubGenesisConfigOptions().chainId(2019);
private final DefaultPeer defaultPeer = new DefaultPeer(nodeId, "1.2.3.4", 7890, 30303);

@Before
public void setup() {
when(p2pNetwork.getLocalPeerInfo()).thenReturn(localPeer);
when(p2pNetwork.getDiscoverySocketAddress()).thenReturn(discoverySocketAddress);
doReturn(Optional.of(this.defaultPeer)).when(p2pNetwork).getAdvertisedPeer();
when(blockchainQueries.getBlockchain()).thenReturn(blockchain);
when(blockchainQueries.getBlockHashByNumber(anyLong())).thenReturn(Optional.of(Hash.EMPTY));
when(blockchain.getChainHead()).thenReturn(testChainHead);
Expand All @@ -77,11 +80,16 @@ public void shouldReturnCorrectResult() {

final JsonRpcSuccessResponse actual = (JsonRpcSuccessResponse) method.response(request);
final Map<String, Object> expected = new HashMap<>();
expected.put("enode", "enode://@1.2.3.4:30303");
expected.put("id", "");
expected.put(
"enode",
"enode://0f1b319e32017c3fcb221841f0f978701b4e9513fe6a567a2db43d43381a9c7e3dfe7cae13cbc2f56943400bacaf9082576ab087cd51983b17d729ae796f6807@1.2.3.4:30303?discport=7890");
expected.put(
"id",
"0f1b319e32017c3fcb221841f0f978701b4e9513fe6a567a2db43d43381a9c7e3dfe7cae13cbc2f56943400bacaf9082576ab087cd51983b17d729ae796f6807");
expected.put("ip", "1.2.3.4");
expected.put("listenAddr", "1.2.3.4:30303");
expected.put("name", "testnet/1.0/this/that");
expected.put("ports", ImmutableMap.of("discovery", 30303, "listener", 30303));
expected.put("ports", ImmutableMap.of("discovery", 7890, "listener", 30303));
expected.put(
"protocols",
ImmutableMap.of(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
import tech.pegasys.pantheon.ethereum.p2p.api.MessageData;
import tech.pegasys.pantheon.ethereum.p2p.api.P2PNetwork;
import tech.pegasys.pantheon.ethereum.p2p.api.PeerConnection;
import tech.pegasys.pantheon.ethereum.p2p.peers.DefaultPeer;
import tech.pegasys.pantheon.ethereum.p2p.peers.Peer;
import tech.pegasys.pantheon.ethereum.p2p.wire.Capability;
import tech.pegasys.pantheon.ethereum.p2p.wire.DefaultMessage;
Expand All @@ -25,7 +26,6 @@
import tech.pegasys.pantheon.ethereum.permissioning.NodeWhitelistController;
import tech.pegasys.pantheon.util.Subscribers;

import java.net.InetSocketAddress;
import java.net.SocketAddress;
import java.util.ArrayList;
import java.util.Collection;
Expand Down Expand Up @@ -177,12 +177,12 @@ public void stop() {}
public void awaitStop() {}

@Override
public InetSocketAddress getDiscoverySocketAddress() {
return null;
public Optional<Peer> getAdvertisedPeer() {
return Optional.of(new DefaultPeer(self.getId(), "127.0.0.1", 0, 0));
}

@Override
public void run() {}
public void start() {}

@Override
public void close() {}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,15 +29,13 @@
import java.util.List;
import java.util.Map;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Function;
import java.util.stream.Collectors;

import com.google.common.util.concurrent.ThreadFactoryBuilder;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;

Expand All @@ -48,9 +46,6 @@ public class NetworkRunner implements AutoCloseable {
private final AtomicBoolean started = new AtomicBoolean(false);
private final AtomicBoolean stopped = new AtomicBoolean(false);

private final ExecutorService networkExecutor =
Executors.newFixedThreadPool(
1, new ThreadFactoryBuilder().setNameFormat(this.getClass().getSimpleName()).build());
private final ScheduledExecutorService networkCheckExecutor =
Executors.newSingleThreadScheduledExecutor();

Expand Down Expand Up @@ -89,7 +84,7 @@ public void start() {
if (started.compareAndSet(false, true)) {
LOG.info("Starting Network.");
setupHandlers();
networkExecutor.submit(network);
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not entirely sure why this ever had its own dedicated executor - starting the network is actually very quick as it basically just opens a port and returns.

network.start();
networkCheckExecutor.scheduleWithFixedDelay(
network::checkMaintainedConnectionPeers, 60, 60, TimeUnit.SECONDS);
} else {
Expand All @@ -104,7 +99,6 @@ public void stop() {
for (final ProtocolManager protocolManager : protocolManagers) {
protocolManager.stop();
}
networkExecutor.shutdown();
networkCheckExecutor.shutdown();
shutdown.countDown();
} else {
Expand All @@ -118,11 +112,6 @@ public void awaitStop() throws InterruptedException {
for (final ProtocolManager protocolManager : protocolManagers) {
protocolManager.awaitStop();
}
if (!networkExecutor.awaitTermination(2L, TimeUnit.MINUTES)) {
LOG.error("Network executor did not shutdown cleanly.");
networkExecutor.shutdownNow();
networkExecutor.awaitTermination(2L, TimeUnit.MINUTES);
}
LOG.info("Network stopped.");
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@
import tech.pegasys.pantheon.ethereum.permissioning.NodeWhitelistController;

import java.io.IOException;
import java.net.InetSocketAddress;
import java.util.Collection;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
Expand Down Expand Up @@ -63,9 +62,8 @@ public void stop() {}
public void awaitStop() {}

@Override
public InetSocketAddress getDiscoverySocketAddress() {
throw new P2pDisabledException(
"P2P networking disabled. Discovery socket address unavailable.");
public Optional<Peer> getAdvertisedPeer() {
return Optional.empty();
}

@Override
Expand All @@ -92,5 +90,5 @@ public Optional<NodeWhitelistController> getNodeWhitelistController() {
public void close() throws IOException {}

@Override
public void run() {}
public void start() {}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,14 +18,15 @@
import tech.pegasys.pantheon.ethereum.permissioning.NodeWhitelistController;

import java.io.Closeable;
import java.net.InetSocketAddress;
import java.util.Collection;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.function.Consumer;

/** P2P Network Interface. */
public interface P2PNetwork extends Closeable, Runnable {
public interface P2PNetwork extends Closeable {

void start();

/**
* Returns a snapshot of the currently connected peer connections.
Expand All @@ -44,8 +45,8 @@ public interface P2PNetwork extends Closeable, Runnable {

/**
* Subscribe a {@link Consumer} to all incoming {@link Message} of a given sub-protocol. Calling
* {@link #run()} on an implementation without at least having one subscribed {@link Consumer} per
* supported sub-protocol should throw a {@link RuntimeException}.
* {@link #start()} on an implementation without at least having one subscribed {@link Consumer}
* per supported sub-protocol should throw a {@link RuntimeException}.
*
* @param capability Capability (sub-protocol) to subscribe to.
* @param consumer Consumer to subscribe
Expand Down Expand Up @@ -87,7 +88,7 @@ public interface P2PNetwork extends Closeable, Runnable {
/** Blocks until the P2P network layer has stopped. */
void awaitStop();

InetSocketAddress getDiscoverySocketAddress();
Optional<? extends Peer> getAdvertisedPeer();

/**
* Returns {@link PeerInfo} object for this node
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@

import static com.google.common.base.Preconditions.checkArgument;
import static com.google.common.base.Preconditions.checkNotNull;
import static com.google.common.base.Preconditions.checkState;
import static java.util.concurrent.TimeUnit.MILLISECONDS;
import static tech.pegasys.pantheon.util.bytes.BytesValue.wrapBuffer;

Expand Down Expand Up @@ -83,7 +82,6 @@ public abstract class PeerDiscoveryAgent implements DisconnectCallback {

/* This is the {@link tech.pegasys.pantheon.ethereum.p2p.Peer} object holding who we are. */
private DiscoveryPeer advertisedPeer;
private InetSocketAddress localAddress;

/* Is discovery enabled? */
private boolean isActive = false;
Expand Down Expand Up @@ -135,7 +133,6 @@ public CompletableFuture<?> start(final int tcpPort) {
.thenAccept(
(InetSocketAddress localAddress) -> {
// Once listener is set up, finish initializing
this.localAddress = localAddress;
advertisedPeer =
new DiscoveryPeer(
id, config.getAdvertisedHost(), localAddress.getPort(), tcpPort);
Expand Down Expand Up @@ -221,19 +218,14 @@ public Collection<DiscoveryPeer> getPeers() {
.orElse(Collections.emptyList());
}

public DiscoveryPeer getAdvertisedPeer() {
return advertisedPeer;
public Optional<DiscoveryPeer> getAdvertisedPeer() {
return Optional.ofNullable(advertisedPeer);
}

public BytesValue getId() {
return id;
}

public InetSocketAddress localAddress() {
checkState(localAddress != null, "Uninitialized discovery agent");
return localAddress;
}

/**
* Adds an observer that will get called when a new peer is bonded with and added to the peer
* table.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -164,7 +164,7 @@ public PeerDiscoveryController(
this.peerDroppedObservers = peerDroppedObservers;
}

public CompletableFuture<?> start() {
public void start() {
if (!started.compareAndSet(false, true)) {
throw new IllegalStateException("The peer table had already been started");
}
Expand Down Expand Up @@ -199,8 +199,6 @@ public CompletableFuture<?> start() {

nodeWhitelistController.ifPresent(
c -> c.subscribeToListUpdatedEvent(this::handleNodeWhitelistUpdatedEvent));

return CompletableFuture.completedFuture(null);
}

public CompletableFuture<?> stop() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -467,16 +467,12 @@ public void subscribeDisconnect(final DisconnectCallback callback) {
}

@Override
public void run() {
try {
peerDiscoveryAgent.start(ourPeerInfo.getPort()).join();
peerBondedObserverId =
OptionalLong.of(peerDiscoveryAgent.observePeerBondedEvents(handlePeerBondedEvent()));
peerDroppedObserverId =
OptionalLong.of(peerDiscoveryAgent.observePeerDroppedEvents(handlePeerDroppedEvents()));
} catch (final Exception ex) {
throw new IllegalStateException(ex);
}
public void start() {
peerDiscoveryAgent.start(ourPeerInfo.getPort()).join();
peerBondedObserverId =
OptionalLong.of(peerDiscoveryAgent.observePeerBondedEvents(handlePeerBondedEvent()));
peerDroppedObserverId =
OptionalLong.of(peerDiscoveryAgent.observePeerDroppedEvents(handlePeerDroppedEvents()));
}

private Consumer<PeerBondedEvent> handlePeerBondedEvent() {
Expand Down Expand Up @@ -546,8 +542,8 @@ public Collection<DiscoveryPeer> getDiscoveryPeers() {
}

@Override
public InetSocketAddress getDiscoverySocketAddress() {
return peerDiscoveryAgent.localAddress();
public Optional<? extends Peer> getAdvertisedPeer() {
return peerDiscoveryAgent.getAdvertisedPeer();
}

@Override
Expand Down
Loading