Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Change map key in eth peers #3383

Closed
wants to merge 3 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ public EthSynchronizerUpdater(final EthPeers ethPeers) {
@Override
public void updatePeerChainState(
final long knownBlockNumber, final PeerConnection peerConnection) {
final EthPeer ethPeer = ethPeers.peer(peerConnection);
final EthPeer ethPeer = ethPeers.peer(peerConnection.getPeer().getId());
if (ethPeer == null) {
LOG.debug("Received message from a peer with no corresponding EthPeer.");
return;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,8 @@
import java.util.stream.Collectors;
import java.util.stream.Stream;

import org.apache.tuweni.bytes.Bytes;

public class EthPeers {
public static final Comparator<EthPeer> TOTAL_DIFFICULTY =
Comparator.comparing(((final EthPeer p) -> p.chainState().getEstimatedTotalDifficulty()));
Expand All @@ -49,7 +51,7 @@ public class EthPeers {
Comparator.comparing(EthPeer::outstandingRequests)
.thenComparing(EthPeer::getLastRequestTimestamp);

private final Map<PeerConnection, EthPeer> connections = new ConcurrentHashMap<>();
private final Map<Bytes, EthPeer> connections = new ConcurrentHashMap<>();
private final String protocolName;
private final Clock clock;
private final List<NodeMessagePermissioningProvider> permissioningProviders;
Expand Down Expand Up @@ -93,11 +95,11 @@ public void registerConnection(
peerValidators,
clock,
permissioningProviders);
connections.putIfAbsent(peerConnection, peer);
connections.putIfAbsent(peerConnection.getPeer().getId(), peer);
}

public void registerDisconnect(final PeerConnection connection) {
final EthPeer peer = connections.remove(connection);
final EthPeer peer = connections.remove(connection.getPeer().getId());
if (peer != null) {
disconnectCallbacks.forEach(callback -> callback.onDisconnect(peer));
peer.handleDisconnect();
Expand All @@ -118,8 +120,8 @@ private void abortPendingRequestsAssignedToDisconnectedPeers() {
}
}

public EthPeer peer(final PeerConnection peerConnection) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think the public interface method to this class needs a type change. It's fine to make the internal map keyed by Bytes, but I think the public interface method into EthPeers gets less readable when we make the parameter more primitive. Can we keep the
parameter as a PeerConnection, and just interact with the Map using its getPeer().getId()?

return connections.get(peerConnection);
public EthPeer peer(final Bytes peerId) {
return connections.get(peerId);
}

public PendingPeerRequest executePeerRequest(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -234,7 +234,7 @@ public void processMessage(final Capability cap, final Message message) {
final MessageData messageData = message.getData();
final int code = messageData.getCode();
LOG.trace("Process message {}, {}", cap, code);
final EthPeer ethPeer = ethPeers.peer(message.getConnection());
final EthPeer ethPeer = ethPeers.peer(message.getConnection().getPeer().getId());
if (ethPeer == null) {
LOG.debug(
"Ignoring message received from unknown peer connection: " + message.getConnection());
Expand Down Expand Up @@ -306,7 +306,7 @@ public void processMessage(final Capability cap, final Message message) {
@Override
public void handleNewConnection(final PeerConnection connection) {
ethPeers.registerConnection(connection, peerValidators);
final EthPeer peer = ethPeers.peer(connection);
final EthPeer peer = ethPeers.peer(connection.getPeer().getId());
if (peer.statusHasBeenSentToPeer()) {
return;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,7 @@ public void processMessage(final Capability cap, final Message message) {
final MessageData messageData = AbstractSnapMessageData.create(message);
final int code = messageData.getCode();
LOG.trace("Process snap message {}, {}", cap, code);
final EthPeer ethPeer = ethPeers.peer(message.getConnection());
final EthPeer ethPeer = ethPeers.peer(message.getConnection().getPeer().getId());
if (ethPeer == null) {
LOG.debug(
"Ignoring message received from unknown peer connection: " + message.getConnection());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -124,7 +124,7 @@ private static RespondingEthPeer create(
new MockPeerConnection(
caps, (cap, msg, conn) -> outgoingMessages.add(new OutgoingMessage(cap, msg)));
ethPeers.registerConnection(peerConnection, peerValidators);
final EthPeer peer = ethPeers.peer(peerConnection);
final EthPeer peer = ethPeers.peer(peerConnection.getPeer().getId());
peer.registerStatusReceived(chainHeadHash, totalDifficulty, 63);
estimatedHeight.ifPresent(height -> peer.chainState().update(chainHeadHash, height));
peer.registerStatusSent();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,7 @@ protected AsyncExecutor createWorkerExecutor() {

@Override
protected CompletableFuture<InetSocketAddress> listenForConnections() {
CompletableFuture<InetSocketAddress> future = new CompletableFuture<>();
final CompletableFuture<InetSocketAddress> future = new CompletableFuture<>();
vertx
.createDatagramSocket(new DatagramSocketOptions().setIpV6(NetworkUtility.isIPv6Available()))
.listen(
Expand Down Expand Up @@ -135,8 +135,7 @@ protected void handleListenerSetup(
this.socket = listenResult.result();

// TODO: when using wildcard hosts (0.0.0.0), we need to handle multiple addresses by
// selecting
// the correct 'announce' address.
// selecting the correct 'announce' address.
final String effectiveHost = socket.localAddress().host();
final int effectivePort = socket.localAddress().port();

Expand All @@ -148,15 +147,15 @@ protected void handleListenerSetup(
socket.exceptionHandler(this::handleException);
socket.handler(this::handlePacket);

InetSocketAddress address =
final InetSocketAddress address =
new InetSocketAddress(socket.localAddress().host(), socket.localAddress().port());
addressFuture.complete(address);
}

@Override
protected CompletableFuture<Void> sendOutgoingPacket(
final DiscoveryPeer peer, final Packet packet) {
CompletableFuture<Void> result = new CompletableFuture<>();
final CompletableFuture<Void> result = new CompletableFuture<>();
socket.send(
packet.encode(),
peer.getEndpoint().getUdpPort(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -108,12 +108,12 @@ public static EnodeURL fromURI(final URI uri, final EnodeDnsConfiguration enodeD
"Invalid node ID: node ID must have exactly 128 hexadecimal characters and should not include any '0x' hex prefix.");

final Bytes id = Bytes.fromHexString(uri.getUserInfo());
String host = uri.getHost();
int tcpPort = uri.getPort();
final String host = uri.getHost();
final int tcpPort = uri.getPort();

// Parse discport if it exists
Optional<Integer> discoveryPort = Optional.empty();
String query = uri.getQuery();
final String query = uri.getQuery();
if (query != null) {
final Matcher discPortMatcher = DISCPORT_QUERY_STRING_REGEX.matcher(query);
if (discPortMatcher.matches()) {
Expand Down Expand Up @@ -322,11 +322,11 @@ private void validate() {
checkState(ip != null, "Ip address must be configured.");
}

public Builder configureFromEnode(final EnodeURL enode) {
return this.nodeId(enode.getNodeId())
.listeningPort(enode.getListeningPort())
.discoveryPort(enode.getDiscoveryPort())
.ipAddress(enode.getIp());
public Builder configureFromEnode(final EnodeURL enodeURL) {
return this.nodeId(enodeURL.getNodeId())
.listeningPort(enodeURL.getListeningPort())
.discoveryPort(enodeURL.getDiscoveryPort())
.ipAddress(enodeURL.getIp());
}

public Builder nodeId(final Bytes nodeId) {
Expand Down