Skip to content

Commit

Permalink
feat: only dial subnet peers if needed (#5782)
Browse files Browse the repository at this point in the history
* fix: do not dial more peers if node has enough subnet peers

* fix: reorder conditions in shouldDialPeer
  • Loading branch information
twoeths authored Jul 26, 2023
1 parent 11bbca1 commit c112f01
Show file tree
Hide file tree
Showing 2 changed files with 56 additions and 15 deletions.
10 changes: 10 additions & 0 deletions packages/beacon-node/src/network/core/metrics.ts
Original file line number Diff line number Diff line change
Expand Up @@ -115,6 +115,16 @@ export function createNetworkCoreMetrics(register: RegistryMetricCreator) {
name: "lodestar_discovery_peers_to_connect",
help: "Current peers to connect count from discoverPeers requests",
}),
subnetPeersToConnect: register.gauge<"type">({
name: "lodestar_discovery_subnet_peers_to_connect",
help: "Current peers to connect count from discoverPeers requests",
labelNames: ["type"],
}),
subnetsToConnect: register.gauge<"type">({
name: "lodestar_discovery_subnets_to_connect",
help: "Current subnets to connect count from discoverPeers requests",
labelNames: ["type"],
}),
cachedENRsSize: register.gauge({
name: "lodestar_discovery_cached_enrs_size",
help: "Current size of the cachedENRs Set",
Expand Down
61 changes: 46 additions & 15 deletions packages/beacon-node/src/network/peers/discover.ts
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,16 @@ enum DiscoveredPeerStatus {
}

type UnixMs = number;
/**
* Maintain peersToConnect to avoid having too many topic peers at some point.
* See https://github.com/ChainSafe/lodestar/issues/5741#issuecomment-1643113577
*/
type SubnetRequestInfo = {
toUnixMs: UnixMs;
// when node is stable this should be 0
peersToConnect: number;
};

export type SubnetDiscvQueryMs = {
subnet: number;
type: SubnetType;
Expand Down Expand Up @@ -83,9 +93,9 @@ export class PeerDiscovery {
private cachedENRs = new Map<PeerIdStr, CachedENR>();
private randomNodeQuery: QueryStatus = {code: QueryStatusCode.NotActive};
private peersToConnect = 0;
private subnetRequests: Record<SubnetType, Map<number, UnixMs>> = {
private subnetRequests: Record<SubnetType, Map<number, SubnetRequestInfo>> = {
attnets: new Map(),
syncnets: new Map([[10, Date.now() + 2 * 60 * 60 * 1000]]),
syncnets: new Map(),
};

/** The maximum number of peers we allow (exceptions for subnet peers) */
Expand Down Expand Up @@ -134,6 +144,14 @@ export class PeerDiscovery {
metrics.discovery.cachedENRsSize.addCollect(() => {
metrics.discovery.cachedENRsSize.set(this.cachedENRs.size);
metrics.discovery.peersToConnect.set(this.peersToConnect);
for (const type of [SubnetType.attnets, SubnetType.syncnets]) {
const subnetPeersToConnect = Array.from(this.subnetRequests[type].values()).reduce(
(acc, {peersToConnect}) => acc + peersToConnect,
0
);
metrics.discovery.subnetPeersToConnect.set({type}, subnetPeersToConnect);
metrics.discovery.subnetsToConnect.set({type}, this.subnetRequests[type].size);
}
});
}
}
Expand Down Expand Up @@ -186,12 +204,6 @@ export class PeerDiscovery {
this.peersToConnect += peersToConnect;

subnet: for (const subnetRequest of subnetRequests) {
// Extend the toUnixMs for this subnet
const prevUnixMs = this.subnetRequests[subnetRequest.type].get(subnetRequest.subnet);
if (prevUnixMs === undefined || prevUnixMs < subnetRequest.toUnixMs) {
this.subnetRequests[subnetRequest.type].set(subnetRequest.subnet, subnetRequest.toUnixMs);
}

// Get cached ENRs from the discovery service that are in the requested `subnetId`, but not connected yet
let cachedENRsInSubnet = 0;
for (const cachedENR of cachedENRsReverse) {
Expand All @@ -204,6 +216,17 @@ export class PeerDiscovery {
}
}

const subnetPeersToConnect = Math.max(subnetRequest.maxPeersToDiscover - cachedENRsInSubnet, 0);

// Extend the toUnixMs for this subnet
const prevUnixMs = this.subnetRequests[subnetRequest.type].get(subnetRequest.subnet)?.toUnixMs;
const newUnixMs =
prevUnixMs !== undefined && prevUnixMs > subnetRequest.toUnixMs ? prevUnixMs : subnetRequest.toUnixMs;
this.subnetRequests[subnetRequest.type].set(subnetRequest.subnet, {
toUnixMs: newUnixMs,
peersToConnect: subnetPeersToConnect,
});

// Query a discv5 query if more peers are needed
subnetsToDiscoverPeers.push(subnetRequest);
}
Expand Down Expand Up @@ -372,23 +395,31 @@ export class PeerDiscovery {
}

private shouldDialPeer(peer: CachedENR): boolean {
if (this.peersToConnect > 0) {
return true;
}

for (const type of [SubnetType.attnets, SubnetType.syncnets]) {
for (const [subnet, toUnixMs] of this.subnetRequests[type].entries()) {
if (toUnixMs < Date.now()) {
// Prune all requests
for (const [subnet, {toUnixMs, peersToConnect}] of this.subnetRequests[type].entries()) {
if (toUnixMs < Date.now() || peersToConnect === 0) {
// Prune all requests so that we don't have to loop again
// if we have low subnet peers then PeerManager will update us again with subnet + toUnixMs + peersToConnect
this.subnetRequests[type].delete(subnet);
} else {
// not expired and peersToConnect > 0
// if we have enough subnet peers, no need to dial more or we may have performance issues
// see https://github.com/ChainSafe/lodestar/issues/5741#issuecomment-1643113577
if (peer.subnets[type][subnet]) {
this.subnetRequests[type].set(subnet, {toUnixMs, peersToConnect: Math.max(peersToConnect - 1, 0)});
return true;
}
}
}
}

// ideally we may want to leave this cheap condition at the top of the function
// however we want to also update peersToConnect in this.subnetRequests
// the this.subnetRequests[type] gradually has 0 subnet so this function should be cheap enough
if (this.peersToConnect > 0) {
return true;
}

return false;
}

Expand Down

0 comments on commit c112f01

Please sign in to comment.