From 76e258be72511709da61b1e39e0faa64ddf4c1b2 Mon Sep 17 00:00:00 2001 From: Maddiaa0 <47148561+Maddiaa0@users.noreply.github.com> Date: Fri, 20 Dec 2024 18:02:17 +0000 Subject: [PATCH 01/11] feat: reorganise reqresp handlers --- .../p2p/src/services/libp2p/libp2p_service.ts | 2 ++ .../p2p/src/services/reqresp/interface.ts | 16 ++++++++++++---- .../src/services/reqresp/protocols/goodbye.ts | 9 +++++++++ .../p2p/src/services/reqresp/protocols/index.ts | 1 + .../p2p/src/services/reqresp/protocols/tx.ts | 4 ++-- 5 files changed, 26 insertions(+), 6 deletions(-) create mode 100644 yarn-project/p2p/src/services/reqresp/protocols/goodbye.ts diff --git a/yarn-project/p2p/src/services/libp2p/libp2p_service.ts b/yarn-project/p2p/src/services/libp2p/libp2p_service.ts index e302fe2cdb0..f2db7b8ae13 100644 --- a/yarn-project/p2p/src/services/libp2p/libp2p_service.ts +++ b/yarn-project/p2p/src/services/libp2p/libp2p_service.ts @@ -64,6 +64,7 @@ import { type ReqRespSubProtocolHandlers, type SubProtocolMap, } from '../reqresp/interface.js'; +import { goodbyeHandler } from '../reqresp/protocols/goodbye.js'; import { pingHandler, statusHandler } from '../reqresp/protocols/index.js'; import { reqRespTxHandler } from '../reqresp/protocols/tx.js'; import { ReqResp } from '../reqresp/reqresp.js'; @@ -341,6 +342,7 @@ export class LibP2PService extends WithTracer implement [ReqRespSubProtocol.PING]: pingHandler, [ReqRespSubProtocol.STATUS]: statusHandler, [ReqRespSubProtocol.TX]: txHandler, + [ReqRespSubProtocol.GOODBYE]: goodbyeHandler, }; return new LibP2PService( diff --git a/yarn-project/p2p/src/services/reqresp/interface.ts b/yarn-project/p2p/src/services/reqresp/interface.ts index 13efa4f055e..698f88bf319 100644 --- a/yarn-project/p2p/src/services/reqresp/interface.ts +++ b/yarn-project/p2p/src/services/reqresp/interface.ts @@ -5,14 +5,16 @@ import { type PeerId } from '@libp2p/interface'; /* * Request Response Sub Protocols */ -const PING_PROTOCOL = '/aztec/req/ping/0.1.0'; -const STATUS_PROTOCOL = '/aztec/req/status/0.1.0'; -const TX_PROTOCOL = '/aztec/req/tx/0.1.0'; +export const PING_PROTOCOL = '/aztec/req/ping/0.1.0'; +export const STATUS_PROTOCOL = '/aztec/req/status/0.1.0'; +export const GOODBYE_PROTOCOL = '/aztec/req/goodbye/0.1.0'; +export const TX_REQ_PROTOCOL = '/aztec/req/tx/0.1.0'; export enum ReqRespSubProtocol { PING = PING_PROTOCOL, STATUS = STATUS_PROTOCOL, - TX = TX_PROTOCOL, + GOODBYE = GOODBYE_PROTOCOL, + TX = TX_REQ_PROTOCOL, } /** @@ -72,6 +74,7 @@ export const DEFAULT_SUB_PROTOCOL_VALIDATORS: ReqRespSubProtocolValidators = { [ReqRespSubProtocol.PING]: noopValidator, [ReqRespSubProtocol.STATUS]: noopValidator, [ReqRespSubProtocol.TX]: noopValidator, + [ReqRespSubProtocol.GOODBYE]: noopValidator, }; /** @@ -97,6 +100,7 @@ export const DEFAULT_SUB_PROTOCOL_HANDLERS: ReqRespSubProtocolHandlers = { [ReqRespSubProtocol.PING]: defaultHandler, [ReqRespSubProtocol.STATUS]: defaultHandler, [ReqRespSubProtocol.TX]: defaultHandler, + [ReqRespSubProtocol.GOODBYE]: defaultHandler, }; /** @@ -150,4 +154,8 @@ export const subProtocolMap: SubProtocolMap = { request: TxHash, response: Tx, }, + [ReqRespSubProtocol.GOODBYE]: { + request: RequestableBuffer, + response: RequestableBuffer, + }, }; diff --git a/yarn-project/p2p/src/services/reqresp/protocols/goodbye.ts b/yarn-project/p2p/src/services/reqresp/protocols/goodbye.ts new file mode 100644 index 00000000000..bbc5a56cc06 --- /dev/null +++ b/yarn-project/p2p/src/services/reqresp/protocols/goodbye.ts @@ -0,0 +1,9 @@ +// TODO: implement fully +/** + * Handles the goodbye request. + * @param _msg - The goodbye request message. + * @returns A resolved promise with the goodbye response. + */ +export function goodbyeHandler(_msg: any): Promise { + return Promise.resolve(Buffer.from('goodbye')); +} diff --git a/yarn-project/p2p/src/services/reqresp/protocols/index.ts b/yarn-project/p2p/src/services/reqresp/protocols/index.ts index 51902f46f3d..ffc009fe37a 100644 --- a/yarn-project/p2p/src/services/reqresp/protocols/index.ts +++ b/yarn-project/p2p/src/services/reqresp/protocols/index.ts @@ -4,3 +4,4 @@ export * from './ping.js'; export * from './status.js'; export * from './tx.js'; +export * from './goodbye.js'; diff --git a/yarn-project/p2p/src/services/reqresp/protocols/tx.ts b/yarn-project/p2p/src/services/reqresp/protocols/tx.ts index 9aabc5a8185..beed7c45836 100644 --- a/yarn-project/p2p/src/services/reqresp/protocols/tx.ts +++ b/yarn-project/p2p/src/services/reqresp/protocols/tx.ts @@ -1,7 +1,7 @@ -import { type P2PClientType } from '@aztec/circuit-types'; +import { P2PClientType } from '@aztec/circuit-types'; import { TxHash } from '@aztec/circuit-types/tx_hash'; -import { type MemPools } from '../../../mem_pools/interface.js'; +import { MemPools } from '../../../mem_pools/interface.js'; /** * We want to keep the logic of the req resp handler in this file, but we do not have a reference to the mempools here From 5401acabbe887d1fd183f9390edcfc7f21a86f85 Mon Sep 17 00:00:00 2001 From: Maddiaa0 <47148561+Maddiaa0@users.noreply.github.com> Date: Fri, 20 Dec 2024 19:29:27 +0000 Subject: [PATCH 02/11] fix: complete enum migration --- yarn-project/p2p/src/mocks/index.ts | 5 +++-- .../src/services/reqresp/rate_limiter/rate_limits.ts | 11 +++++++++++ 2 files changed, 14 insertions(+), 2 deletions(-) diff --git a/yarn-project/p2p/src/mocks/index.ts b/yarn-project/p2p/src/mocks/index.ts index 6f598a5f1d6..39c589fff99 100644 --- a/yarn-project/p2p/src/mocks/index.ts +++ b/yarn-project/p2p/src/mocks/index.ts @@ -34,8 +34,7 @@ import { type ReqRespSubProtocolValidators, noopValidator, } from '../services/reqresp/interface.js'; -import { pingHandler } from '../services/reqresp/protocols/ping.js'; -import { statusHandler } from '../services/reqresp/protocols/status.js'; +import { pingHandler, statusHandler } from '../services/reqresp/protocols/index.js'; import { ReqResp } from '../services/reqresp/reqresp.js'; import { type PubSubLibp2p } from '../util.js'; @@ -153,6 +152,7 @@ export const MOCK_SUB_PROTOCOL_HANDLERS: ReqRespSubProtocolHandlers = { [ReqRespSubProtocol.PING]: pingHandler, [ReqRespSubProtocol.STATUS]: statusHandler, [ReqRespSubProtocol.TX]: (_msg: any) => Promise.resolve(Buffer.from('tx')), + [ReqRespSubProtocol.GOODBYE]: (_msg: any) => Promise.resolve(Buffer.from('goodbye')), }; // By default, all requests are valid @@ -161,6 +161,7 @@ export const MOCK_SUB_PROTOCOL_VALIDATORS: ReqRespSubProtocolValidators = { [ReqRespSubProtocol.PING]: noopValidator, [ReqRespSubProtocol.STATUS]: noopValidator, [ReqRespSubProtocol.TX]: noopValidator, + [ReqRespSubProtocol.GOODBYE]: noopValidator, }; /** diff --git a/yarn-project/p2p/src/services/reqresp/rate_limiter/rate_limits.ts b/yarn-project/p2p/src/services/reqresp/rate_limiter/rate_limits.ts index 11c46c59551..23bb99dad7a 100644 --- a/yarn-project/p2p/src/services/reqresp/rate_limiter/rate_limits.ts +++ b/yarn-project/p2p/src/services/reqresp/rate_limiter/rate_limits.ts @@ -32,4 +32,15 @@ export const DEFAULT_RATE_LIMITS: ReqRespSubProtocolRateLimits = { quotaCount: 10, }, }, + // TODO(md): feels like goodbye is an exception to the rule here + [ReqRespSubProtocol.GOODBYE]: { + peerLimit: { + quotaTimeMs: 1000, + quotaCount: 5, + }, + globalLimit: { + quotaTimeMs: 1000, + quotaCount: 10, + }, + }, }; From b44d271d29cec8bf118d760b2aeed7b7a4e490cd Mon Sep 17 00:00:00 2001 From: Maddiaa0 <47148561+Maddiaa0@users.noreply.github.com> Date: Fri, 20 Dec 2024 20:59:00 +0000 Subject: [PATCH 03/11] fmt --- .../p2p/src/services/libp2p/libp2p_service.ts | 4 ++-- yarn-project/p2p/src/services/peer_manager.ts | 10 +++++++--- .../src/services/reqresp/protocols/goodbye.ts | 17 +++++++++++++++++ .../p2p/src/services/reqresp/protocols/tx.ts | 4 ++-- 4 files changed, 28 insertions(+), 7 deletions(-) diff --git a/yarn-project/p2p/src/services/libp2p/libp2p_service.ts b/yarn-project/p2p/src/services/libp2p/libp2p_service.ts index f2db7b8ae13..48f269b1bc3 100644 --- a/yarn-project/p2p/src/services/libp2p/libp2p_service.ts +++ b/yarn-project/p2p/src/services/libp2p/libp2p_service.ts @@ -116,10 +116,10 @@ export class LibP2PService extends WithTracer implement private peerDiscoveryService: PeerDiscoveryService, private mempools: MemPools, private l2BlockSource: L2BlockSource, - private epochCache: EpochCache, + epochCache: EpochCache, private proofVerifier: ClientProtocolCircuitVerifier, private worldStateSynchronizer: WorldStateSynchronizer, - private telemetry: TelemetryClient, + telemetry: TelemetryClient, private requestResponseHandlers: ReqRespSubProtocolHandlers = DEFAULT_SUB_PROTOCOL_HANDLERS, private logger = createLogger('p2p:libp2p_service'), ) { diff --git a/yarn-project/p2p/src/services/peer_manager.ts b/yarn-project/p2p/src/services/peer_manager.ts index 59052dfb9ad..7eaff483144 100644 --- a/yarn-project/p2p/src/services/peer_manager.ts +++ b/yarn-project/p2p/src/services/peer_manager.ts @@ -10,6 +10,7 @@ import { inspect } from 'util'; import { type P2PConfig } from '../config.js'; import { type PubSubLibp2p } from '../util.js'; import { PeerScoreState, PeerScoring } from './peer-scoring/peer_scoring.js'; +import { GoodByeReason } from './reqresp/protocols/goodbye.js'; import { type PeerDiscoveryService } from './service.js'; import { PeerEvent } from './types.js'; @@ -229,8 +230,10 @@ export class PeerManager extends WithTracer { switch (score) { // TODO: add goodbye and give reasons case PeerScoreState.Banned: + void this.disconnectPeer(peer.remotePeer, GoodByeReason.BANNED); + break; case PeerScoreState.Disconnect: - void this.disconnectPeer(peer.remotePeer); + void this.disconnectPeer(peer.remotePeer, GoodByeReason.DISCONNECTED); break; case PeerScoreState.Healthy: connectedHealthyPeers.push(peer); @@ -241,8 +244,9 @@ export class PeerManager extends WithTracer { } // TODO: send a goodbye with a reason to the peer - private async disconnectPeer(peer: PeerId) { - this.logger.debug(`Disconnecting peer ${peer.toString()}`); + private async disconnectPeer(peer: PeerId, reason: GoodByeReason) { + this.logger.debug(`Disconnecting peer ${peer.toString()} with reason ${reason}`); + await this.libP2PNode.hangUp(peer); } diff --git a/yarn-project/p2p/src/services/reqresp/protocols/goodbye.ts b/yarn-project/p2p/src/services/reqresp/protocols/goodbye.ts index bbc5a56cc06..b272aa09654 100644 --- a/yarn-project/p2p/src/services/reqresp/protocols/goodbye.ts +++ b/yarn-project/p2p/src/services/reqresp/protocols/goodbye.ts @@ -1,4 +1,21 @@ // TODO: implement fully + +/** + * Enum defining the possible reasons for a goodbye message. + */ +export enum GoodByeReason { + /** The peer has shutdown, will be received whenever a peer's node is routinely stopped */ + SHUTDOWN = 0x1, + // TOOD(md): what is the correct values to put in here - read other specs to see reasons + // what is even the point of the reason + /** Whenever the peer must disconnect due to maintaining max peers */ + DISCONNECTED = 0x2, + /** The peer has a low score, will be received whenever a peer's score is low */ + LOW_SCORE = 0x3, + /** The peer has been banned, will be received whenever a peer is banned */ + BANNED = 0x4, +} + /** * Handles the goodbye request. * @param _msg - The goodbye request message. diff --git a/yarn-project/p2p/src/services/reqresp/protocols/tx.ts b/yarn-project/p2p/src/services/reqresp/protocols/tx.ts index beed7c45836..9aabc5a8185 100644 --- a/yarn-project/p2p/src/services/reqresp/protocols/tx.ts +++ b/yarn-project/p2p/src/services/reqresp/protocols/tx.ts @@ -1,7 +1,7 @@ -import { P2PClientType } from '@aztec/circuit-types'; +import { type P2PClientType } from '@aztec/circuit-types'; import { TxHash } from '@aztec/circuit-types/tx_hash'; -import { MemPools } from '../../../mem_pools/interface.js'; +import { type MemPools } from '../../../mem_pools/interface.js'; /** * We want to keep the logic of the req resp handler in this file, but we do not have a reference to the mempools here From e68436b7647769a7e37ddcf375a5c1fb718e1d12 Mon Sep 17 00:00:00 2001 From: Maddiaa0 <47148561+Maddiaa0@users.noreply.github.com> Date: Fri, 20 Dec 2024 21:33:27 +0000 Subject: [PATCH 04/11] fix: make peer scoring a dep of peer manager, to avoid circular dependency with reqresp --- .../p2p/src/services/libp2p/libp2p_service.ts | 21 +++++++++++----- .../src/services/peer-scoring/peer_scoring.ts | 20 ++++++++++++--- yarn-project/p2p/src/services/peer_manager.ts | 17 ++++++++----- .../src/services/reqresp/protocols/goodbye.ts | 25 +++++++++++++++++++ .../reqresp/protocols/goodbye_protocol.ts | 0 .../reqresp/rate_limiter/rate_limiter.ts | 6 ++--- .../p2p/src/services/reqresp/reqresp.ts | 10 ++++---- 7 files changed, 75 insertions(+), 24 deletions(-) create mode 100644 yarn-project/p2p/src/services/reqresp/protocols/goodbye_protocol.ts diff --git a/yarn-project/p2p/src/services/libp2p/libp2p_service.ts b/yarn-project/p2p/src/services/libp2p/libp2p_service.ts index 48f269b1bc3..e74e5c7e27d 100644 --- a/yarn-project/p2p/src/services/libp2p/libp2p_service.ts +++ b/yarn-project/p2p/src/services/libp2p/libp2p_service.ts @@ -56,6 +56,7 @@ import { import { type PubSubLibp2p, convertToMultiaddr } from '../../util.js'; import { AztecDatastore } from '../data_store.js'; import { SnappyTransform, fastMsgIdFn, getMsgIdFn, msgIdToStrFn } from '../encoding.js'; +import { PeerScoring } from '../peer-scoring/peer_scoring.js'; import { PeerManager } from '../peer_manager.js'; import { DEFAULT_SUB_PROTOCOL_HANDLERS, @@ -65,6 +66,8 @@ import { type SubProtocolMap, } from '../reqresp/interface.js'; import { goodbyeHandler } from '../reqresp/protocols/goodbye.js'; +import { GoodByeReason } from '../reqresp/protocols/goodbye.js'; +import { GoodbyeProtocolHandler } from '../reqresp/protocols/goodbye_protocol.js'; import { pingHandler, statusHandler } from '../reqresp/protocols/index.js'; import { reqRespTxHandler } from '../reqresp/protocols/tx.js'; import { ReqResp } from '../reqresp/reqresp.js'; @@ -125,12 +128,18 @@ export class LibP2PService extends WithTracer implement ) { super(telemetry, 'LibP2PService'); - this.peerManager = new PeerManager(node, peerDiscoveryService, config, telemetry, logger); - this.node.services.pubsub.score.params.appSpecificScore = (peerId: string) => { - return this.peerManager.getPeerScore(peerId); - }; - this.node.services.pubsub.score.params.appSpecificWeight = 10; - this.reqresp = new ReqResp(config, node, this.peerManager); + const peerScoring = new PeerScoring(config); + this.reqresp = new ReqResp(config, node, peerScoring); + + this.peerManager = new PeerManager( + node, + peerDiscoveryService, + config, + telemetry, + logger, + peerScoring, + this.reqresp, + ); this.attestationValidator = new AttestationValidator(epochCache); this.blockProposalValidator = new BlockProposalValidator(epochCache); diff --git a/yarn-project/p2p/src/services/peer-scoring/peer_scoring.ts b/yarn-project/p2p/src/services/peer-scoring/peer_scoring.ts index 34233435b86..79af6fd3325 100644 --- a/yarn-project/p2p/src/services/peer-scoring/peer_scoring.ts +++ b/yarn-project/p2p/src/services/peer-scoring/peer_scoring.ts @@ -1,5 +1,8 @@ import { PeerErrorSeverity } from '@aztec/circuit-types'; import { median } from '@aztec/foundation/collection'; +import { createLogger } from '@aztec/foundation/log'; + +import { PeerId } from '@libp2p/interface'; import { type P2PConfig } from '../../config.js'; @@ -20,6 +23,7 @@ const MIN_SCORE_BEFORE_BAN = -100; const MIN_SCORE_BEFORE_DISCONNECT = -50; export class PeerScoring { + private logger = createLogger('p2p:peer-scoring'); private scores: Map = new Map(); private lastUpdateTime: Map = new Map(); private decayInterval = 1000 * 60; // 1 minute @@ -38,6 +42,14 @@ export class PeerScoring { }; } + public penalizePeer(peerId: PeerId, penalty: PeerErrorSeverity) { + const id = peerId.toString(); + const penaltyValue = this.peerPenalties[penalty]; + const newScore = this.updateScore(id, -penaltyValue); + this.logger.verbose(`Penalizing peer ${id} with ${penalty} (new score is ${newScore})`); + return newScore; + } + updateScore(peerId: string, scoreDelta: number): number { const currentTime = Date.now(); const lastUpdate = this.lastUpdateTime.get(peerId) || currentTime; @@ -75,12 +87,12 @@ export class PeerScoring { return this.scores.get(peerId) || 0; } - getScoreState(peerId: string) { - // TODO: permanently store banned peers??? + public getScoreState(peerId: string): PeerScoreState { const score = this.getScore(peerId); - if (score < MIN_SCORE_BEFORE_BAN) { + if (score <= MIN_SCORE_BEFORE_BAN) { return PeerScoreState.Banned; - } else if (score < MIN_SCORE_BEFORE_DISCONNECT) { + } + if (score <= MIN_SCORE_BEFORE_DISCONNECT) { return PeerScoreState.Disconnect; } return PeerScoreState.Healthy; diff --git a/yarn-project/p2p/src/services/peer_manager.ts b/yarn-project/p2p/src/services/peer_manager.ts index 7eaff483144..6a4b20a2e50 100644 --- a/yarn-project/p2p/src/services/peer_manager.ts +++ b/yarn-project/p2p/src/services/peer_manager.ts @@ -10,7 +10,9 @@ import { inspect } from 'util'; import { type P2PConfig } from '../config.js'; import { type PubSubLibp2p } from '../util.js'; import { PeerScoreState, PeerScoring } from './peer-scoring/peer_scoring.js'; +import { type GoodbyeSender } from './reqresp/goodbye_sender.js'; import { GoodByeReason } from './reqresp/protocols/goodbye.js'; +import { ReqResp, ReqRespSubProtocol } from './reqresp/reqresp.js'; import { type PeerDiscoveryService } from './service.js'; import { PeerEvent } from './types.js'; @@ -34,7 +36,6 @@ type TimedOutPeer = { export class PeerManager extends WithTracer { private cachedPeers: Map = new Map(); - private peerScoring: PeerScoring; private heartbeatCounter: number = 0; private displayPeerCountsPeerHeartbeat: number = 0; private timedOutPeers: Map = new Map(); @@ -45,10 +46,11 @@ export class PeerManager extends WithTracer { private config: P2PConfig, telemetryClient: TelemetryClient, private logger = createLogger('p2p:peer-manager'), + private peerScoring: PeerScoring, + private reqresp: ReqResp, ) { super(telemetryClient, 'PeerManager'); - this.peerScoring = new PeerScoring(config); // Handle new established connections this.libP2PNode.addEventListener(PeerEvent.CONNECTED, this.handleConnectedPeerEvent.bind(this)); // Handle lost connections @@ -115,10 +117,7 @@ export class PeerManager extends WithTracer { } public penalizePeer(peerId: PeerId, penalty: PeerErrorSeverity) { - const id = peerId.toString(); - const penaltyValue = this.peerScoring.peerPenalties[penalty]; - const newScore = this.peerScoring.updateScore(id, -penaltyValue); - this.logger.verbose(`Penalizing peer ${id} with ${penalty} (new score is ${newScore})`); + this.peerScoring.penalizePeer(peerId.toString(), penalty); } public getPeerScore(peerId: string): number { @@ -247,6 +246,12 @@ export class PeerManager extends WithTracer { private async disconnectPeer(peer: PeerId, reason: GoodByeReason) { this.logger.debug(`Disconnecting peer ${peer.toString()} with reason ${reason}`); + try { + await this.reqresp.sendRequestToPeer(peer, ReqRespSubProtocol.GOODBYE, Buffer.from([reason])); + } catch (error) { + this.logger.debug(`Failed to send goodbye to peer ${peer.toString()}: ${error}`); + } + await this.libP2PNode.hangUp(peer); } diff --git a/yarn-project/p2p/src/services/reqresp/protocols/goodbye.ts b/yarn-project/p2p/src/services/reqresp/protocols/goodbye.ts index b272aa09654..9440a2cc003 100644 --- a/yarn-project/p2p/src/services/reqresp/protocols/goodbye.ts +++ b/yarn-project/p2p/src/services/reqresp/protocols/goodbye.ts @@ -1,3 +1,10 @@ +import { createLogger } from '@aztec/foundation/log'; + +import { type PeerId } from '@libp2p/interface'; + +import { ReqRespSubProtocol } from '../interface.js'; +import { type ReqResp } from '../reqresp.js'; + // TODO: implement fully /** @@ -16,6 +23,24 @@ export enum GoodByeReason { BANNED = 0x4, } +/** + * Handles a goodbye message request + */ +export class GoodbyeProtocolHandler { + private logger = createLogger('p2p:goodbye-protocol'); + + constructor(private reqresp: ReqResp) {} + + public async sendGoodbye(peerId: PeerId, reason: GoodByeReason): Promise { + try { + await this.reqresp.sendRequestToPeer(peerId, ReqRespSubProtocol.GOODBYE, Buffer.from([reason])); + this.logger.debug(`Sent goodbye to peer ${peerId.toString()} with reason ${reason}`); + } catch (error) { + this.logger.debug(`Failed to send goodbye to peer ${peerId.toString()}: ${error}`); + } + } +} + /** * Handles the goodbye request. * @param _msg - The goodbye request message. diff --git a/yarn-project/p2p/src/services/reqresp/protocols/goodbye_protocol.ts b/yarn-project/p2p/src/services/reqresp/protocols/goodbye_protocol.ts new file mode 100644 index 00000000000..e69de29bb2d diff --git a/yarn-project/p2p/src/services/reqresp/rate_limiter/rate_limiter.ts b/yarn-project/p2p/src/services/reqresp/rate_limiter/rate_limiter.ts index 495aea8bac2..6ae74dcc5fa 100644 --- a/yarn-project/p2p/src/services/reqresp/rate_limiter/rate_limiter.ts +++ b/yarn-project/p2p/src/services/reqresp/rate_limiter/rate_limiter.ts @@ -7,7 +7,7 @@ import { PeerErrorSeverity } from '@aztec/circuit-types'; import { type PeerId } from '@libp2p/interface'; -import { type PeerManager } from '../../peer_manager.js'; +import { PeerScoring } from '../../peer-scoring/peer_scoring.js'; import { type ReqRespSubProtocol, type ReqRespSubProtocolRateLimits } from '../interface.js'; import { DEFAULT_RATE_LIMITS } from './rate_limits.js'; @@ -169,7 +169,7 @@ export class RequestResponseRateLimiter { private cleanupInterval: NodeJS.Timeout | undefined = undefined; - constructor(private peerManager: PeerManager, rateLimits: ReqRespSubProtocolRateLimits = DEFAULT_RATE_LIMITS) { + constructor(private peerScoring: PeerScoring, rateLimits: ReqRespSubProtocolRateLimits = DEFAULT_RATE_LIMITS) { this.subProtocolRateLimiters = new Map(); for (const [subProtocol, protocolLimits] of Object.entries(rateLimits)) { @@ -200,7 +200,7 @@ export class RequestResponseRateLimiter { switch (rateLimitStatus) { case RateLimitStatus.DeniedPeer: - this.peerManager.penalizePeer(peerId, PeerErrorSeverity.MidToleranceError); + this.peerScoring.penalizePeer(peerId, PeerErrorSeverity.MidToleranceError); return false; case RateLimitStatus.DeniedGlobal: return false; diff --git a/yarn-project/p2p/src/services/reqresp/reqresp.ts b/yarn-project/p2p/src/services/reqresp/reqresp.ts index e6b8d386c9c..5a905ec8723 100644 --- a/yarn-project/p2p/src/services/reqresp/reqresp.ts +++ b/yarn-project/p2p/src/services/reqresp/reqresp.ts @@ -14,7 +14,7 @@ import { InvalidResponseError, } from '../../errors/reqresp.error.js'; import { SnappyTransform } from '../encoding.js'; -import { type PeerManager } from '../peer_manager.js'; +import { PeerScoring } from '../peer-scoring/peer_scoring.js'; import { type P2PReqRespConfig } from './config.js'; import { DEFAULT_SUB_PROTOCOL_HANDLERS, @@ -55,13 +55,13 @@ export class ReqResp { private snappyTransform: SnappyTransform; - constructor(config: P2PReqRespConfig, protected readonly libp2p: Libp2p, private peerManager: PeerManager) { + constructor(config: P2PReqRespConfig, private libp2p: Libp2p, private peerScoring: PeerScoring) { this.logger = createLogger('p2p:reqresp'); this.overallRequestTimeoutMs = config.overallRequestTimeoutMs; this.individualRequestTimeoutMs = config.individualRequestTimeoutMs; - this.rateLimiter = new RequestResponseRateLimiter(peerManager); + this.rateLimiter = new RequestResponseRateLimiter(peerScoring); this.snappyTransform = new SnappyTransform(); } @@ -194,7 +194,7 @@ export class ReqResp { * If the stream is not closed by the dialled peer, and a timeout occurs, then * the stream is closed on the requester's end and sender (us) updates its peer score */ - async sendRequestToPeer( + public async sendRequestToPeer( peerId: PeerId, subProtocol: ReqRespSubProtocol, payload: Buffer, @@ -241,7 +241,7 @@ export class ReqResp { private handleResponseError(e: any, peerId: PeerId, subProtocol: ReqRespSubProtocol): void { const severity = this.categorizeError(e, peerId, subProtocol); if (severity) { - this.peerManager.penalizePeer(peerId, severity); + this.peerScoring.penalizePeer(peerId, severity); } } From e78a9cf164826d24c6d2290eaf39efd94c1f0135 Mon Sep 17 00:00:00 2001 From: Maddiaa0 <47148561+Maddiaa0@users.noreply.github.com> Date: Fri, 20 Dec 2024 22:18:08 +0000 Subject: [PATCH 05/11] feat: send goodbye messages to peers on shutdown --- yarn-project/p2p/src/mocks/index.ts | 10 +- .../p2p/src/services/libp2p/libp2p_service.ts | 213 +++++++++--------- .../src/services/peer-scoring/peer_scoring.ts | 2 +- .../p2p/src/services/peer_manager.test.ts | 33 ++- yarn-project/p2p/src/services/peer_manager.ts | 44 ++-- .../p2p/src/services/reqresp/interface.ts | 2 +- .../src/services/reqresp/protocols/goodbye.ts | 30 ++- .../p2p/src/services/reqresp/protocols/tx.ts | 7 +- .../reqresp/rate_limiter/rate_limiter.test.ts | 13 +- .../reqresp/rate_limiter/rate_limiter.ts | 2 +- .../p2p/src/services/reqresp/reqresp.test.ts | 58 +++-- .../p2p/src/services/reqresp/reqresp.ts | 4 +- 12 files changed, 250 insertions(+), 168 deletions(-) diff --git a/yarn-project/p2p/src/mocks/index.ts b/yarn-project/p2p/src/mocks/index.ts index 39c589fff99..af8366906d5 100644 --- a/yarn-project/p2p/src/mocks/index.ts +++ b/yarn-project/p2p/src/mocks/index.ts @@ -26,7 +26,7 @@ import { type BootnodeConfig, type P2PConfig } from '../config.js'; import { type MemPools } from '../mem_pools/interface.js'; import { DiscV5Service } from '../services/discv5/discV5_service.js'; import { LibP2PService } from '../services/libp2p/libp2p_service.js'; -import { type PeerManager } from '../services/peer_manager.js'; +import { type PeerScoring } from '../services/peer-scoring/peer_scoring.js'; import { type P2PReqRespConfig } from '../services/reqresp/config.js'; import { ReqRespSubProtocol, @@ -168,8 +168,8 @@ export const MOCK_SUB_PROTOCOL_VALIDATORS: ReqRespSubProtocolValidators = { * @param numberOfNodes - the number of nodes to create * @returns An array of the created nodes */ -export const createNodes = async (peerManager: PeerManager, numberOfNodes: number): Promise => { - return await Promise.all(Array.from({ length: numberOfNodes }, () => createReqResp(peerManager))); +export const createNodes = async (peerScoring: PeerScoring, numberOfNodes: number): Promise => { + return await Promise.all(Array.from({ length: numberOfNodes }, () => createReqResp(peerScoring))); }; export const startNodes = async ( @@ -192,13 +192,13 @@ export const stopNodes = async (nodes: ReqRespNode[]): Promise => { }; // Create a req resp node, exposing the underlying p2p node -export const createReqResp = async (peerManager: PeerManager): Promise => { +export const createReqResp = async (peerScoring: PeerScoring): Promise => { const p2p = await createLibp2pNode(); const config: P2PReqRespConfig = { overallRequestTimeoutMs: 4000, individualRequestTimeoutMs: 2000, }; - const req = new ReqResp(config, p2p, peerManager); + const req = new ReqResp(config, p2p, peerScoring); return { p2p, req, diff --git a/yarn-project/p2p/src/services/libp2p/libp2p_service.ts b/yarn-project/p2p/src/services/libp2p/libp2p_service.ts index e74e5c7e27d..8348c2279db 100644 --- a/yarn-project/p2p/src/services/libp2p/libp2p_service.ts +++ b/yarn-project/p2p/src/services/libp2p/libp2p_service.ts @@ -58,16 +58,8 @@ import { AztecDatastore } from '../data_store.js'; import { SnappyTransform, fastMsgIdFn, getMsgIdFn, msgIdToStrFn } from '../encoding.js'; import { PeerScoring } from '../peer-scoring/peer_scoring.js'; import { PeerManager } from '../peer_manager.js'; -import { - DEFAULT_SUB_PROTOCOL_HANDLERS, - DEFAULT_SUB_PROTOCOL_VALIDATORS, - ReqRespSubProtocol, - type ReqRespSubProtocolHandlers, - type SubProtocolMap, -} from '../reqresp/interface.js'; -import { goodbyeHandler } from '../reqresp/protocols/goodbye.js'; -import { GoodByeReason } from '../reqresp/protocols/goodbye.js'; -import { GoodbyeProtocolHandler } from '../reqresp/protocols/goodbye_protocol.js'; +import { DEFAULT_SUB_PROTOCOL_VALIDATORS, ReqRespSubProtocol, type SubProtocolMap } from '../reqresp/interface.js'; +import { reqGoodbyeHandler } from '../reqresp/protocols/goodbye.js'; import { pingHandler, statusHandler } from '../reqresp/protocols/index.js'; import { reqRespTxHandler } from '../reqresp/protocols/tx.js'; import { ReqResp } from '../reqresp/reqresp.js'; @@ -123,7 +115,6 @@ export class LibP2PService extends WithTracer implement private proofVerifier: ClientProtocolCircuitVerifier, private worldStateSynchronizer: WorldStateSynchronizer, telemetry: TelemetryClient, - private requestResponseHandlers: ReqRespSubProtocolHandlers = DEFAULT_SUB_PROTOCOL_HANDLERS, private logger = createLogger('p2p:libp2p_service'), ) { super(telemetry, 'LibP2PService'); @@ -153,95 +144,6 @@ export class LibP2PService extends WithTracer implement }; } - /** - * Starts the LibP2P service. - * @returns An empty promise. - */ - public async start() { - // Check if service is already started - if (this.node.status === 'started') { - throw new Error('P2P service already started'); - } - - // Get listen & announce addresses for logging - const { tcpListenAddress, tcpAnnounceAddress } = this.config; - if (!tcpAnnounceAddress) { - throw new Error('Announce address not provided.'); - } - const announceTcpMultiaddr = convertToMultiaddr(tcpAnnounceAddress, 'tcp'); - - // Start job queue, peer discovery service and libp2p node - this.jobQueue.start(); - await this.peerDiscoveryService.start(); - await this.node.start(); - - // Subscribe to standard GossipSub topics by default - for (const topic of getTopicTypeForClientType(this.clientType)) { - this.subscribeToTopic(TopicTypeMap[topic].p2pTopic); - } - - // Add p2p topic validators - // As they are stored within a kv pair, there is no need to register them conditionally - // based on the client type - const topicValidators = { - [Tx.p2pTopic]: this.validatePropagatedTxFromMessage.bind(this), - [BlockAttestation.p2pTopic]: this.validatePropagatedAttestationFromMessage.bind(this), - [BlockProposal.p2pTopic]: this.validatePropagatedBlockFromMessage.bind(this), - [EpochProofQuote.p2pTopic]: this.validatePropagatedEpochProofQuoteFromMessage.bind(this), - }; - for (const [topic, validator] of Object.entries(topicValidators)) { - this.node.services.pubsub.topicValidators.set(topic, validator); - } - - // add GossipSub listener - this.node.services.pubsub.addEventListener(GossipSubEvent.MESSAGE, this.handleGossipSubEvent.bind(this)); - - // Start running promise for peer discovery - this.discoveryRunningPromise = new RunningPromise( - () => this.peerManager.heartbeat(), - this.logger, - this.config.peerCheckIntervalMS, - ); - this.discoveryRunningPromise.start(); - - // Define the sub protocol validators - This is done within this start() method to gain a callback to the existing validateTx function - const reqrespSubProtocolValidators = { - ...DEFAULT_SUB_PROTOCOL_VALIDATORS, - [ReqRespSubProtocol.TX]: this.validateRequestedTx.bind(this), - }; - await this.reqresp.start(this.requestResponseHandlers, reqrespSubProtocolValidators); - this.logger.info(`Started P2P service`, { - listen: tcpListenAddress, - announce: announceTcpMultiaddr, - peerId: this.node.peerId.toString(), - }); - } - - /** - * Stops the LibP2P service. - * @returns An empty promise. - */ - public async stop() { - // Remove gossip sub listener - this.node.services.pubsub.removeEventListener(GossipSubEvent.MESSAGE, this.handleGossipSubEvent.bind(this)); - - // Stop peer manager - this.logger.debug('Stopping peer manager...'); - this.peerManager.stop(); - - this.logger.debug('Stopping job queue...'); - await this.jobQueue.end(); - this.logger.debug('Stopping running promise...'); - await this.discoveryRunningPromise?.stop(); - this.logger.debug('Stopping peer discovery service...'); - await this.peerDiscoveryService.stop(); - this.logger.debug('Request response service stopped...'); - await this.reqresp.stop(); - this.logger.debug('Stopping LibP2P...'); - await this.stopLibP2P(); - this.logger.info('LibP2P service stopped'); - } - /** * Creates an instance of the LibP2P service. * @param config - The configuration to use when creating the service. @@ -344,16 +246,6 @@ export class LibP2PService extends WithTracer implement }, }); - // Create request response protocol handlers - const txHandler = reqRespTxHandler(mempools); - - const requestResponseHandlers = { - [ReqRespSubProtocol.PING]: pingHandler, - [ReqRespSubProtocol.STATUS]: statusHandler, - [ReqRespSubProtocol.TX]: txHandler, - [ReqRespSubProtocol.GOODBYE]: goodbyeHandler, - }; - return new LibP2PService( clientType, config, @@ -365,10 +257,109 @@ export class LibP2PService extends WithTracer implement proofVerifier, worldStateSynchronizer, telemetry, - requestResponseHandlers, ); } + /** + * Starts the LibP2P service. + * @returns An empty promise. + */ + public async start() { + // Check if service is already started + if (this.node.status === 'started') { + throw new Error('P2P service already started'); + } + + // Get listen & announce addresses for logging + const { tcpListenAddress, tcpAnnounceAddress } = this.config; + if (!tcpAnnounceAddress) { + throw new Error('Announce address not provided.'); + } + const announceTcpMultiaddr = convertToMultiaddr(tcpAnnounceAddress, 'tcp'); + + // Start job queue, peer discovery service and libp2p node + this.jobQueue.start(); + await this.peerDiscoveryService.start(); + await this.node.start(); + + // Subscribe to standard GossipSub topics by default + for (const topic of getTopicTypeForClientType(this.clientType)) { + this.subscribeToTopic(TopicTypeMap[topic].p2pTopic); + } + + // Create request response protocol handlers + const txHandler = reqRespTxHandler(this.mempools); + const goodbyeHandler = reqGoodbyeHandler(this.peerManager); + + const requestResponseHandlers = { + [ReqRespSubProtocol.PING]: pingHandler, + [ReqRespSubProtocol.STATUS]: statusHandler, + [ReqRespSubProtocol.TX]: txHandler.bind(this), + [ReqRespSubProtocol.GOODBYE]: goodbyeHandler.bind(this), + }; + + // Add p2p topic validators + // As they are stored within a kv pair, there is no need to register them conditionally + // based on the client type + const topicValidators = { + [Tx.p2pTopic]: this.validatePropagatedTxFromMessage.bind(this), + [BlockAttestation.p2pTopic]: this.validatePropagatedAttestationFromMessage.bind(this), + [BlockProposal.p2pTopic]: this.validatePropagatedBlockFromMessage.bind(this), + [EpochProofQuote.p2pTopic]: this.validatePropagatedEpochProofQuoteFromMessage.bind(this), + }; + for (const [topic, validator] of Object.entries(topicValidators)) { + this.node.services.pubsub.topicValidators.set(topic, validator); + } + + // add GossipSub listener + this.node.services.pubsub.addEventListener(GossipSubEvent.MESSAGE, this.handleGossipSubEvent.bind(this)); + + // Start running promise for peer discovery + this.discoveryRunningPromise = new RunningPromise( + () => this.peerManager.heartbeat(), + this.logger, + this.config.peerCheckIntervalMS, + ); + this.discoveryRunningPromise.start(); + + // Define the sub protocol validators - This is done within this start() method to gain a callback to the existing validateTx function + const reqrespSubProtocolValidators = { + ...DEFAULT_SUB_PROTOCOL_VALIDATORS, + [ReqRespSubProtocol.TX]: this.validateRequestedTx.bind(this), + }; + await this.reqresp.start(requestResponseHandlers, reqrespSubProtocolValidators); + this.logger.info(`Started P2P service`, { + listen: tcpListenAddress, + announce: announceTcpMultiaddr, + peerId: this.node.peerId.toString(), + }); + } + + /** + * Stops the LibP2P service. + * @returns An empty promise. + */ + public async stop() { + // Remove gossip sub listener + this.node.services.pubsub.removeEventListener(GossipSubEvent.MESSAGE, this.handleGossipSubEvent.bind(this)); + + // Stop peer manager + this.logger.debug('Stopping peer manager...'); + await this.peerManager.stop(); + + this.logger.debug('Stopping job queue...'); + await this.jobQueue.end(); + this.logger.debug('Stopping running promise...'); + await this.discoveryRunningPromise?.stop(); + this.logger.debug('Stopping peer discovery service...'); + await this.peerDiscoveryService.stop(); + this.logger.debug('Request response service stopped...'); + await this.reqresp.stop(); + this.logger.debug('Stopping LibP2P...'); + await this.stopLibP2P(); + this.logger.info('LibP2P service stopped'); + } + public getPeers(includePending?: boolean): PeerInfo[] { return this.peerManager.getPeers(includePending); } diff --git a/yarn-project/p2p/src/services/peer-scoring/peer_scoring.ts b/yarn-project/p2p/src/services/peer-scoring/peer_scoring.ts index 79af6fd3325..9fe98041caf 100644 --- a/yarn-project/p2p/src/services/peer-scoring/peer_scoring.ts +++ b/yarn-project/p2p/src/services/peer-scoring/peer_scoring.ts @@ -2,7 +2,7 @@ import { PeerErrorSeverity } from '@aztec/circuit-types'; import { median } from '@aztec/foundation/collection'; import { createLogger } from '@aztec/foundation/log'; -import { PeerId } from '@libp2p/interface'; +import { type PeerId } from '@libp2p/interface'; import { type P2PConfig } from '../../config.js'; diff --git a/yarn-project/p2p/src/services/peer_manager.test.ts b/yarn-project/p2p/src/services/peer_manager.test.ts index dda028e647d..92b2012e167 100644 --- a/yarn-project/p2p/src/services/peer_manager.test.ts +++ b/yarn-project/p2p/src/services/peer_manager.test.ts @@ -8,8 +8,11 @@ import { jest } from '@jest/globals'; import { createSecp256k1PeerId } from '@libp2p/peer-id-factory'; import { multiaddr } from '@multiformats/multiaddr'; -import { getP2PDefaultConfig } from '../config.js'; +import { type P2PConfig, getP2PDefaultConfig } from '../config.js'; +import { PeerScoring } from './peer-scoring/peer_scoring.js'; import { PeerManager } from './peer_manager.js'; +import { ReqRespSubProtocol } from './reqresp/interface.js'; +import { GoodByeReason } from './reqresp/protocols/index.js'; import { PeerEvent } from './types.js'; describe('PeerManager', () => { @@ -33,6 +36,12 @@ describe('PeerManager', () => { runRandomNodesQuery: jest.fn(), }; + const mockReqResp: any = { + sendRequestToPeer: jest.fn(), + }; + + let peerScoring: PeerScoring; + let peerManager: PeerManager; // The function provided to the discovery servive callback will be run here let discoveredPeerCallback: (enr: ENR) => Promise; @@ -53,12 +62,15 @@ describe('PeerManager', () => { } }); + peerScoring = new PeerScoring({} as P2PConfig); peerManager = new PeerManager( mockLibP2PNode, mockPeerDiscoveryService, getP2PDefaultConfig(), getTelemetryClient(), createLogger('test'), + peerScoring, + mockReqResp, ); }); @@ -289,10 +301,20 @@ describe('PeerManager', () => { await sleep(100); - // Verify that hangUp was called for both unhealthy peers + // Verify that hangUp and a goodbye was sent for both unhealthy peers expect(mockLibP2PNode.hangUp).toHaveBeenCalledWith(bannedPeerId); - expect(mockLibP2PNode.hangUp).toHaveBeenCalledWith(disconnectPeerId); + expect(mockReqResp.sendRequestToPeer).toHaveBeenCalledWith( + bannedPeerId, + ReqRespSubProtocol.GOODBYE, + Buffer.from([GoodByeReason.BANNED]), + ); + expect(mockLibP2PNode.hangUp).toHaveBeenCalledWith(disconnectPeerId); + expect(mockReqResp.sendRequestToPeer).toHaveBeenCalledWith( + disconnectPeerId, + ReqRespSubProtocol.GOODBYE, + Buffer.from([GoodByeReason.DISCONNECTED]), + ); // Verify that hangUp was not called for the healthy peer expect(mockLibP2PNode.hangUp).not.toHaveBeenCalledWith(healthyPeerId); @@ -304,11 +326,14 @@ describe('PeerManager', () => { const enr = await createMockENR(); await discoveredPeerCallback(enr); - peerManager.stop(); + await peerManager.stop(); expect(mockLibP2PNode.removeEventListener).toHaveBeenCalledWith(PeerEvent.CONNECTED, expect.any(Function)); expect(mockLibP2PNode.removeEventListener).toHaveBeenCalledWith(PeerEvent.DISCONNECTED, expect.any(Function)); expect(mockPeerDiscoveryService.off).toHaveBeenCalledWith(PeerEvent.DISCOVERED, expect.any(Function)); + + // Verify that goodbyes were sent to all peers + expect(mockReqResp.sendRequestToPeer).toHaveBeenCalledTimes(2); }); }); }); diff --git a/yarn-project/p2p/src/services/peer_manager.ts b/yarn-project/p2p/src/services/peer_manager.ts index 6a4b20a2e50..99bc0a196ac 100644 --- a/yarn-project/p2p/src/services/peer_manager.ts +++ b/yarn-project/p2p/src/services/peer_manager.ts @@ -9,10 +9,10 @@ import { inspect } from 'util'; import { type P2PConfig } from '../config.js'; import { type PubSubLibp2p } from '../util.js'; -import { PeerScoreState, PeerScoring } from './peer-scoring/peer_scoring.js'; -import { type GoodbyeSender } from './reqresp/goodbye_sender.js'; -import { GoodByeReason } from './reqresp/protocols/goodbye.js'; -import { ReqResp, ReqRespSubProtocol } from './reqresp/reqresp.js'; +import { PeerScoreState, type PeerScoring } from './peer-scoring/peer_scoring.js'; +import { ReqRespSubProtocol } from './reqresp/interface.js'; +import { GoodByeReason, prettyGoodbyeReason } from './reqresp/protocols/goodbye.js'; +import { type ReqResp } from './reqresp/reqresp.js'; import { type PeerDiscoveryService } from './service.js'; import { PeerEvent } from './types.js'; @@ -116,8 +116,15 @@ export class PeerManager extends WithTracer { } } + // TODO: include reason here and add to metrics, but this is fine for now + public goodbyeReceived(peerId: PeerId) { + this.logger.debug(`Goodbye received from peer ${peerId.toString()}`); + + void this.disconnectPeer(peerId); + } + public penalizePeer(peerId: PeerId, penalty: PeerErrorSeverity) { - this.peerScoring.penalizePeer(peerId.toString(), penalty); + this.peerScoring.penalizePeer(peerId, penalty); } public getPeerScore(peerId: string): number { @@ -227,12 +234,11 @@ export class PeerManager extends WithTracer { for (const peer of connections) { const score = this.peerScoring.getScoreState(peer.remotePeer.toString()); switch (score) { - // TODO: add goodbye and give reasons case PeerScoreState.Banned: - void this.disconnectPeer(peer.remotePeer, GoodByeReason.BANNED); + void this.goodbyeAndDisconnectPeer(peer.remotePeer, GoodByeReason.BANNED); break; case PeerScoreState.Disconnect: - void this.disconnectPeer(peer.remotePeer, GoodByeReason.DISCONNECTED); + void this.goodbyeAndDisconnectPeer(peer.remotePeer, GoodByeReason.DISCONNECTED); break; case PeerScoreState.Healthy: connectedHealthyPeers.push(peer); @@ -242,17 +248,24 @@ export class PeerManager extends WithTracer { return connectedHealthyPeers; } - // TODO: send a goodbye with a reason to the peer - private async disconnectPeer(peer: PeerId, reason: GoodByeReason) { - this.logger.debug(`Disconnecting peer ${peer.toString()} with reason ${reason}`); + private async goodbyeAndDisconnectPeer(peer: PeerId, reason: GoodByeReason) { + this.logger.debug(`Disconnecting peer ${peer.toString()} with reason ${prettyGoodbyeReason(reason)}`); try { await this.reqresp.sendRequestToPeer(peer, ReqRespSubProtocol.GOODBYE, Buffer.from([reason])); } catch (error) { this.logger.debug(`Failed to send goodbye to peer ${peer.toString()}: ${error}`); + } finally { + await this.disconnectPeer(peer); } + } - await this.libP2PNode.hangUp(peer); + private async disconnectPeer(peer: PeerId) { + try { + await this.libP2PNode.hangUp(peer); + } catch (error) { + this.logger.debug(`Failed to disconnect peer ${peer.toString()}`, { error: inspect(error) }); + } } /** @@ -380,10 +393,15 @@ export class PeerManager extends WithTracer { * Stops the peer manager. * Removing all event listeners. */ - public stop() { + public async stop() { this.libP2PNode.removeEventListener(PeerEvent.CONNECTED, this.handleConnectedPeerEvent); this.libP2PNode.removeEventListener(PeerEvent.DISCONNECTED, this.handleDisconnectedPeerEvent); this.peerDiscoveryService.off(PeerEvent.DISCOVERED, this.handleDiscoveredPeer); + + // Send goodbyes to all peers + await Promise.all( + this.libP2PNode.getPeers().map(peer => this.goodbyeAndDisconnectPeer(peer, GoodByeReason.DISCONNECTED)), + ); } } diff --git a/yarn-project/p2p/src/services/reqresp/interface.ts b/yarn-project/p2p/src/services/reqresp/interface.ts index 698f88bf319..43e5b9a0ecd 100644 --- a/yarn-project/p2p/src/services/reqresp/interface.ts +++ b/yarn-project/p2p/src/services/reqresp/interface.ts @@ -21,7 +21,7 @@ export enum ReqRespSubProtocol { * A handler for a sub protocol * The message will arrive as a buffer, and the handler must return a buffer */ -export type ReqRespSubProtocolHandler = (msg: Buffer) => Promise; +export type ReqRespSubProtocolHandler = (peerId: PeerId, msg: Buffer) => Promise; /** * A type mapping from supprotocol to it's rate limits diff --git a/yarn-project/p2p/src/services/reqresp/protocols/goodbye.ts b/yarn-project/p2p/src/services/reqresp/protocols/goodbye.ts index 9440a2cc003..4e46bbb3ecb 100644 --- a/yarn-project/p2p/src/services/reqresp/protocols/goodbye.ts +++ b/yarn-project/p2p/src/services/reqresp/protocols/goodbye.ts @@ -2,7 +2,8 @@ import { createLogger } from '@aztec/foundation/log'; import { type PeerId } from '@libp2p/interface'; -import { ReqRespSubProtocol } from '../interface.js'; +import { type PeerManager } from '../../peer_manager.js'; +import { ReqRespSubProtocol, type ReqRespSubProtocolHandler } from '../interface.js'; import { type ReqResp } from '../reqresp.js'; // TODO: implement fully @@ -23,6 +24,24 @@ export enum GoodByeReason { BANNED = 0x4, } +/** + * Pretty prints the goodbye reason. + * @param reason - The goodbye reason. + * @returns The pretty printed goodbye reason. + */ +export function prettyGoodbyeReason(reason: GoodByeReason): string { + switch (reason) { + case GoodByeReason.SHUTDOWN: + return 'shutdown'; + case GoodByeReason.DISCONNECTED: + return 'disconnected'; + case GoodByeReason.LOW_SCORE: + return 'low score'; + case GoodByeReason.BANNED: + return 'banned'; + } +} + /** * Handles a goodbye message request */ @@ -46,6 +65,11 @@ export class GoodbyeProtocolHandler { * @param _msg - The goodbye request message. * @returns A resolved promise with the goodbye response. */ -export function goodbyeHandler(_msg: any): Promise { - return Promise.resolve(Buffer.from('goodbye')); +export function reqGoodbyeHandler(peerManager: PeerManager): ReqRespSubProtocolHandler { + return (peerId: PeerId, _msg: Buffer) => { + peerManager.goodbyeReceived(peerId); + + // TODO(md): they want to receive some kind of response, but we don't have a response here + return Promise.resolve(Buffer.from('')); + }; } diff --git a/yarn-project/p2p/src/services/reqresp/protocols/tx.ts b/yarn-project/p2p/src/services/reqresp/protocols/tx.ts index 9aabc5a8185..415cf4293c6 100644 --- a/yarn-project/p2p/src/services/reqresp/protocols/tx.ts +++ b/yarn-project/p2p/src/services/reqresp/protocols/tx.ts @@ -1,7 +1,10 @@ import { type P2PClientType } from '@aztec/circuit-types'; import { TxHash } from '@aztec/circuit-types/tx_hash'; +import { type PeerId } from '@libp2p/interface'; + import { type MemPools } from '../../../mem_pools/interface.js'; +import { type ReqRespSubProtocolHandler } from '../interface.js'; /** * We want to keep the logic of the req resp handler in this file, but we do not have a reference to the mempools here @@ -11,13 +14,13 @@ import { type MemPools } from '../../../mem_pools/interface.js'; * @param mempools - the mempools * @returns the tx response message */ -export function reqRespTxHandler(mempools: MemPools): (msg: Buffer) => Promise { +export function reqRespTxHandler(mempools: MemPools): ReqRespSubProtocolHandler { /** * Handler for tx requests * @param msg - the tx request message * @returns the tx response message */ - return (msg: Buffer) => { + return (_peerId: PeerId, msg: Buffer) => { const txHash = TxHash.fromBuffer(msg); const foundTx = mempools.txPool.getTxByHash(txHash); const buf = foundTx ? foundTx.toBuffer() : Buffer.alloc(0); diff --git a/yarn-project/p2p/src/services/reqresp/rate_limiter/rate_limiter.test.ts b/yarn-project/p2p/src/services/reqresp/rate_limiter/rate_limiter.test.ts index 08bff68bec9..999cbb16e91 100644 --- a/yarn-project/p2p/src/services/reqresp/rate_limiter/rate_limiter.test.ts +++ b/yarn-project/p2p/src/services/reqresp/rate_limiter/rate_limiter.test.ts @@ -4,6 +4,7 @@ import { jest } from '@jest/globals'; import { type PeerId } from '@libp2p/interface'; import { type MockProxy, mock } from 'jest-mock-extended'; +import { type PeerScoring } from '../../peer-scoring/peer_scoring.js'; import { type PeerManager } from '../../peer_manager.js'; import { ReqRespSubProtocol, type ReqRespSubProtocolRateLimits } from '../interface.js'; import { RequestResponseRateLimiter } from './rate_limiter.js'; @@ -24,7 +25,7 @@ const makePeer = (id: string): PeerId => { describe('rate limiter', () => { let rateLimiter: RequestResponseRateLimiter; - let peerManager: MockProxy; + let peerScoring: MockProxy; beforeEach(() => { jest.useFakeTimers(); @@ -43,9 +44,9 @@ describe('rate limiter', () => { }, } as ReqRespSubProtocolRateLimits; // force type as we will not provide descriptions of all protocols - peerManager = mock(); + peerScoring = mock(); - rateLimiter = new RequestResponseRateLimiter(peerManager, config); + rateLimiter = new RequestResponseRateLimiter(peerScoring, config); }); afterEach(() => { @@ -77,7 +78,7 @@ describe('rate limiter', () => { expect(rateLimiter.allow(ReqRespSubProtocol.TX, peerId)).toBe(false); // Spy on the peer manager and check that penalizePeer is called - expect(peerManager.penalizePeer).toHaveBeenCalledWith(peerId, PeerErrorSeverity.MidToleranceError); + expect(peerScoring.penalizePeer).toHaveBeenCalledWith(peerId, PeerErrorSeverity.MidToleranceError); }); it('Should allow requests within the global limit', () => { @@ -137,7 +138,7 @@ describe('rate limiter', () => { }, }, } as ReqRespSubProtocolRateLimits; - const multiProtocolRateLimiter = new RequestResponseRateLimiter(peerManager, config); + const multiProtocolRateLimiter = new RequestResponseRateLimiter(peerScoring, config); const peerId = makePeer('peer1'); @@ -157,7 +158,7 @@ describe('rate limiter', () => { }); it('Should allow requests if no rate limiter is configured', () => { - const rateLimiter = new RequestResponseRateLimiter(peerManager, {} as ReqRespSubProtocolRateLimits); + const rateLimiter = new RequestResponseRateLimiter(peerScoring, {} as ReqRespSubProtocolRateLimits); expect(rateLimiter.allow(ReqRespSubProtocol.TX, makePeer('peer1'))).toBe(true); }); diff --git a/yarn-project/p2p/src/services/reqresp/rate_limiter/rate_limiter.ts b/yarn-project/p2p/src/services/reqresp/rate_limiter/rate_limiter.ts index 6ae74dcc5fa..cbb41061e77 100644 --- a/yarn-project/p2p/src/services/reqresp/rate_limiter/rate_limiter.ts +++ b/yarn-project/p2p/src/services/reqresp/rate_limiter/rate_limiter.ts @@ -7,7 +7,7 @@ import { PeerErrorSeverity } from '@aztec/circuit-types'; import { type PeerId } from '@libp2p/interface'; -import { PeerScoring } from '../../peer-scoring/peer_scoring.js'; +import { type PeerScoring } from '../../peer-scoring/peer_scoring.js'; import { type ReqRespSubProtocol, type ReqRespSubProtocolRateLimits } from '../interface.js'; import { DEFAULT_RATE_LIMITS } from './rate_limits.js'; diff --git a/yarn-project/p2p/src/services/reqresp/reqresp.test.ts b/yarn-project/p2p/src/services/reqresp/reqresp.test.ts index 86a66f59d4e..a8325dedfdb 100644 --- a/yarn-project/p2p/src/services/reqresp/reqresp.test.ts +++ b/yarn-project/p2p/src/services/reqresp/reqresp.test.ts @@ -2,6 +2,7 @@ import { PeerErrorSeverity, TxHash, mockTx } from '@aztec/circuit-types'; import { sleep } from '@aztec/foundation/sleep'; import { describe, expect, it, jest } from '@jest/globals'; +import { type PeerId } from '@libp2p/interface'; import { type MockProxy, mock } from 'jest-mock-extended'; import { CollectiveReqRespTimeoutError, IndividualReqRespTimeoutError } from '../../errors/reqresp.error.js'; @@ -14,19 +15,21 @@ import { startNodes, stopNodes, } from '../../mocks/index.js'; +import { type PeerScoring } from '../peer-scoring/peer_scoring.js'; import { type PeerManager } from '../peer_manager.js'; import { ReqRespSubProtocol, RequestableBuffer } from './interface.js'; +import { GoodByeReason } from './protocols/goodbye.js'; const PING_REQUEST = RequestableBuffer.fromBuffer(Buffer.from('ping')); // The Req Resp protocol should allow nodes to dial specific peers // and ask for specific data that they missed via the traditional gossip protocol. describe('ReqResp', () => { - let peerManager: MockProxy; + let peerScoring: MockProxy; let nodes: ReqRespNode[]; beforeEach(() => { - peerManager = mock(); + peerScoring = mock(); }); afterEach(async () => { @@ -38,7 +41,7 @@ describe('ReqResp', () => { it('Should perform a ping request', async () => { // Create two nodes // They need to discover each other - nodes = await createNodes(peerManager, 2); + nodes = await createNodes(peerScoring, 2); const { req: pinger } = nodes[0]; await startNodes(nodes); @@ -55,7 +58,7 @@ describe('ReqResp', () => { }); it('Should handle gracefully if a peer connected peer is offline', async () => { - nodes = await createNodes(peerManager, 2); + nodes = await createNodes(peerScoring, 2); const { req: pinger } = nodes[0]; const { req: ponger } = nodes[1]; @@ -74,7 +77,7 @@ describe('ReqResp', () => { }); it('Should request from a later peer if other peers are offline', async () => { - nodes = await createNodes(peerManager, 4); + nodes = await createNodes(peerScoring, 4); await startNodes(nodes); await sleep(500); @@ -110,7 +113,7 @@ describe('ReqResp', () => { }); it('Should hit a rate limit if too many requests are made in quick succession', async () => { - nodes = await createNodes(peerManager, 2); + nodes = await createNodes(peerScoring, 2); await startNodes(nodes); @@ -137,7 +140,7 @@ describe('ReqResp', () => { const txHash = tx.getTxHash(); const protocolHandlers = MOCK_SUB_PROTOCOL_HANDLERS; - protocolHandlers[ReqRespSubProtocol.TX] = (message: Buffer): Promise => { + protocolHandlers[ReqRespSubProtocol.TX] = (_peerId: PeerId, message: Buffer): Promise => { const receivedHash = TxHash.fromBuffer(message); if (txHash.equals(receivedHash)) { return Promise.resolve(tx.toBuffer()); @@ -145,7 +148,7 @@ describe('ReqResp', () => { return Promise.resolve(Buffer.from('')); }; - nodes = await createNodes(peerManager, 2); + nodes = await createNodes(peerScoring, 2); await startNodes(nodes, protocolHandlers); await sleep(500); @@ -163,11 +166,11 @@ describe('ReqResp', () => { const txHash = tx.getTxHash(); const protocolHandlers = MOCK_SUB_PROTOCOL_HANDLERS; - protocolHandlers[ReqRespSubProtocol.TX] = (_message: Buffer): Promise => { + protocolHandlers[ReqRespSubProtocol.TX] = (_peerId: PeerId, _message: Buffer): Promise => { return Promise.resolve(Buffer.alloc(0)); }; - nodes = await createNodes(peerManager, 2); + nodes = await createNodes(peerScoring, 2); const spySendRequestToPeer = jest.spyOn(nodes[0].req, 'sendRequestToPeer'); @@ -187,11 +190,11 @@ describe('ReqResp', () => { const protocolHandlers = MOCK_SUB_PROTOCOL_HANDLERS; // Return nothing - protocolHandlers[ReqRespSubProtocol.TX] = (_message: Buffer): Promise => { + protocolHandlers[ReqRespSubProtocol.TX] = (_peerId: PeerId, _message: Buffer): Promise => { return Promise.resolve(Buffer.from('')); }; - nodes = await createNodes(peerManager, 2); + nodes = await createNodes(peerScoring, 2); await startNodes(nodes, protocolHandlers); await sleep(500); @@ -203,7 +206,7 @@ describe('ReqResp', () => { }); it('Should hit individual timeout if nothing is returned over the stream', async () => { - nodes = await createNodes(peerManager, 2); + nodes = await createNodes(peerScoring, 2); await startNodes(nodes); @@ -235,7 +238,7 @@ describe('ReqResp', () => { ); // Expect the peer to be penalized for timing out - expect(peerManager.penalizePeer).toHaveBeenCalledWith( + expect(peerScoring.penalizePeer).toHaveBeenCalledWith( expect.objectContaining({ publicKey: nodes[1].p2p.peerId.publicKey, // must use objectContaining as we do not match exactly, as private key is contained in this test mapping }), @@ -244,7 +247,7 @@ describe('ReqResp', () => { }); it('Should hit collective timeout if nothing is returned over the stream from multiple peers', async () => { - nodes = await createNodes(peerManager, 4); + nodes = await createNodes(peerScoring, 4); await startNodes(nodes); @@ -276,7 +279,7 @@ describe('ReqResp', () => { // Mock that the node will respond with the tx const protocolHandlers = MOCK_SUB_PROTOCOL_HANDLERS; - protocolHandlers[ReqRespSubProtocol.TX] = (message: Buffer): Promise => { + protocolHandlers[ReqRespSubProtocol.TX] = (_peerId: PeerId, message: Buffer): Promise => { const receivedHash = TxHash.fromBuffer(message); if (txHash.equals(receivedHash)) { return Promise.resolve(tx.toBuffer()); @@ -287,11 +290,11 @@ describe('ReqResp', () => { // Mock that the receiving node will find that the transaction is invalid const protocolValidators = MOCK_SUB_PROTOCOL_VALIDATORS; protocolValidators[ReqRespSubProtocol.TX] = (_request, _response, peer) => { - peerManager.penalizePeer(peer, PeerErrorSeverity.LowToleranceError); + peerScoring.penalizePeer(peer, PeerErrorSeverity.LowToleranceError); return Promise.resolve(false); }; - nodes = await createNodes(peerManager, 2); + nodes = await createNodes(peerScoring, 2); await startNodes(nodes, protocolHandlers, protocolValidators); await sleep(500); @@ -302,7 +305,7 @@ describe('ReqResp', () => { expect(res).toBeUndefined(); // Expect the peer to be penalized for sending an invalid response - expect(peerManager.penalizePeer).toHaveBeenCalledWith( + expect(peerScoring.penalizePeer).toHaveBeenCalledWith( expect.objectContaining({ publicKey: nodes[1].p2p.peerId.publicKey, // must use objectContaining as we do not match exactly, as private key is contained in this test mapping }), @@ -310,4 +313,21 @@ describe('ReqResp', () => { ); }); }); + + describe('Goodbye protocol', () => { + it('Should send a goodbye message to a peer', async () => { + const nodes = await createNodes(peerScoring, 2); + + await startNodes(nodes); + await sleep(500); + await connectToPeers(nodes); + await sleep(500); + + await nodes[0].req.sendRequestToPeer( + nodes[1].p2p.peerId, + ReqRespSubProtocol.GOODBYE, + Buffer.from([GoodByeReason.SHUTDOWN]), + ); + }); + }); }); diff --git a/yarn-project/p2p/src/services/reqresp/reqresp.ts b/yarn-project/p2p/src/services/reqresp/reqresp.ts index 5a905ec8723..8d05c4b1480 100644 --- a/yarn-project/p2p/src/services/reqresp/reqresp.ts +++ b/yarn-project/p2p/src/services/reqresp/reqresp.ts @@ -14,7 +14,7 @@ import { InvalidResponseError, } from '../../errors/reqresp.error.js'; import { SnappyTransform } from '../encoding.js'; -import { PeerScoring } from '../peer-scoring/peer_scoring.js'; +import { type PeerScoring } from '../peer-scoring/peer_scoring.js'; import { type P2PReqRespConfig } from './config.js'; import { DEFAULT_SUB_PROTOCOL_HANDLERS, @@ -339,7 +339,7 @@ export class ReqResp { async function* (source: any) { for await (const chunkList of source) { const msg = Buffer.from(chunkList.subarray()); - const response = await handler(msg); + const response = await handler(connection.remotePeer, msg); yield new Uint8Array(transform.outboundTransformNoTopic(response)); } }, From ecb281d64b86d3db5699e425d610225e7aa8b9a6 Mon Sep 17 00:00:00 2001 From: Maddiaa0 <47148561+Maddiaa0@users.noreply.github.com> Date: Fri, 20 Dec 2024 22:20:11 +0000 Subject: [PATCH 06/11] fix: mock --- yarn-project/p2p/src/services/peer-scoring/peer_scoring.ts | 5 +++-- yarn-project/p2p/src/services/peer_manager.test.ts | 3 +-- .../p2p/src/services/reqresp/protocols/goodbye_protocol.ts | 0 3 files changed, 4 insertions(+), 4 deletions(-) delete mode 100644 yarn-project/p2p/src/services/reqresp/protocols/goodbye_protocol.ts diff --git a/yarn-project/p2p/src/services/peer-scoring/peer_scoring.ts b/yarn-project/p2p/src/services/peer-scoring/peer_scoring.ts index 9fe98041caf..1fa1eb141dc 100644 --- a/yarn-project/p2p/src/services/peer-scoring/peer_scoring.ts +++ b/yarn-project/p2p/src/services/peer-scoring/peer_scoring.ts @@ -88,11 +88,12 @@ export class PeerScoring { } public getScoreState(peerId: string): PeerScoreState { + // TODO: permanently store banned peers? const score = this.getScore(peerId); - if (score <= MIN_SCORE_BEFORE_BAN) { + if (score < MIN_SCORE_BEFORE_BAN) { return PeerScoreState.Banned; } - if (score <= MIN_SCORE_BEFORE_DISCONNECT) { + if (score < MIN_SCORE_BEFORE_DISCONNECT) { return PeerScoreState.Disconnect; } return PeerScoreState.Healthy; diff --git a/yarn-project/p2p/src/services/peer_manager.test.ts b/yarn-project/p2p/src/services/peer_manager.test.ts index 92b2012e167..1d7c0d6acca 100644 --- a/yarn-project/p2p/src/services/peer_manager.test.ts +++ b/yarn-project/p2p/src/services/peer_manager.test.ts @@ -323,8 +323,7 @@ describe('PeerManager', () => { }); it('should properly clean up peers on stop', async () => { - const enr = await createMockENR(); - await discoveredPeerCallback(enr); + mockLibP2PNode.getPeers.mockReturnValue([await createSecp256k1PeerId(), await createSecp256k1PeerId()]); await peerManager.stop(); diff --git a/yarn-project/p2p/src/services/reqresp/protocols/goodbye_protocol.ts b/yarn-project/p2p/src/services/reqresp/protocols/goodbye_protocol.ts deleted file mode 100644 index e69de29bb2d..00000000000 From 8e7da8b282407ccda789033c6e0a30ac90056bc5 Mon Sep 17 00:00:00 2001 From: Maddiaa0 <47148561+Maddiaa0@users.noreply.github.com> Date: Sat, 18 Jan 2025 02:02:03 +0000 Subject: [PATCH 07/11] fix test --- yarn-project/p2p/src/services/peer_manager.test.ts | 2 ++ .../p2p/src/services/reqresp/rate_limiter/rate_limiter.test.ts | 1 - yarn-project/p2p/src/services/reqresp/reqresp.test.ts | 1 - 3 files changed, 2 insertions(+), 2 deletions(-) diff --git a/yarn-project/p2p/src/services/peer_manager.test.ts b/yarn-project/p2p/src/services/peer_manager.test.ts index 1d7c0d6acca..5619c667e38 100644 --- a/yarn-project/p2p/src/services/peer_manager.test.ts +++ b/yarn-project/p2p/src/services/peer_manager.test.ts @@ -292,6 +292,7 @@ describe('PeerManager', () => { // Set the peer scores to trigger different states peerManager.penalizePeer(bannedPeerId, PeerErrorSeverity.LowToleranceError); // Will set score below -100 peerManager.penalizePeer(bannedPeerId, PeerErrorSeverity.LowToleranceError); // Additional penalty to ensure banned state + peerManager.penalizePeer(bannedPeerId, PeerErrorSeverity.HighToleranceError); peerManager.penalizePeer(disconnectPeerId, PeerErrorSeverity.LowToleranceError); // Will set score between -100 and -50 peerManager.penalizePeer(disconnectPeerId, PeerErrorSeverity.HighToleranceError); @@ -315,6 +316,7 @@ describe('PeerManager', () => { ReqRespSubProtocol.GOODBYE, Buffer.from([GoodByeReason.DISCONNECTED]), ); + // Verify that hangUp was not called for the healthy peer expect(mockLibP2PNode.hangUp).not.toHaveBeenCalledWith(healthyPeerId); diff --git a/yarn-project/p2p/src/services/reqresp/rate_limiter/rate_limiter.test.ts b/yarn-project/p2p/src/services/reqresp/rate_limiter/rate_limiter.test.ts index 999cbb16e91..b582a9146fb 100644 --- a/yarn-project/p2p/src/services/reqresp/rate_limiter/rate_limiter.test.ts +++ b/yarn-project/p2p/src/services/reqresp/rate_limiter/rate_limiter.test.ts @@ -5,7 +5,6 @@ import { type PeerId } from '@libp2p/interface'; import { type MockProxy, mock } from 'jest-mock-extended'; import { type PeerScoring } from '../../peer-scoring/peer_scoring.js'; -import { type PeerManager } from '../../peer_manager.js'; import { ReqRespSubProtocol, type ReqRespSubProtocolRateLimits } from '../interface.js'; import { RequestResponseRateLimiter } from './rate_limiter.js'; diff --git a/yarn-project/p2p/src/services/reqresp/reqresp.test.ts b/yarn-project/p2p/src/services/reqresp/reqresp.test.ts index a8325dedfdb..86f655eb14d 100644 --- a/yarn-project/p2p/src/services/reqresp/reqresp.test.ts +++ b/yarn-project/p2p/src/services/reqresp/reqresp.test.ts @@ -16,7 +16,6 @@ import { stopNodes, } from '../../mocks/index.js'; import { type PeerScoring } from '../peer-scoring/peer_scoring.js'; -import { type PeerManager } from '../peer_manager.js'; import { ReqRespSubProtocol, RequestableBuffer } from './interface.js'; import { GoodByeReason } from './protocols/goodbye.js'; From ee9347390000d18bc9a026e1e3d0d5a8c97fab0f Mon Sep 17 00:00:00 2001 From: Maddiaa0 <47148561+Maddiaa0@users.noreply.github.com> Date: Sat, 18 Jan 2025 16:27:53 +0000 Subject: [PATCH 08/11] chore: move peer manager into services, add metrics for disconnected peers --- yarn-project/p2p/src/mocks/index.ts | 2 +- .../p2p/src/services/libp2p/libp2p_service.ts | 4 +-- .../p2p/src/services/peer-manager/metrics.ts | 31 +++++++++++++++++++ .../{ => peer-manager}/peer_manager.test.ts | 10 +++--- .../{ => peer-manager}/peer_manager.ts | 31 ++++++++++++------- .../peer_scoring.test.ts | 0 .../peer_scoring.ts | 0 .../src/services/reqresp/protocols/goodbye.ts | 2 +- .../reqresp/rate_limiter/rate_limiter.test.ts | 2 +- .../reqresp/rate_limiter/rate_limiter.ts | 2 +- .../p2p/src/services/reqresp/reqresp.test.ts | 2 +- .../p2p/src/services/reqresp/reqresp.ts | 2 +- .../telemetry-client/src/attributes.ts | 2 ++ yarn-project/telemetry-client/src/metrics.ts | 2 ++ 14 files changed, 68 insertions(+), 24 deletions(-) create mode 100644 yarn-project/p2p/src/services/peer-manager/metrics.ts rename yarn-project/p2p/src/services/{ => peer-manager}/peer_manager.test.ts (97%) rename yarn-project/p2p/src/services/{ => peer-manager}/peer_manager.ts (94%) rename yarn-project/p2p/src/services/{peer-scoring => peer-manager}/peer_scoring.test.ts (100%) rename yarn-project/p2p/src/services/{peer-scoring => peer-manager}/peer_scoring.ts (100%) diff --git a/yarn-project/p2p/src/mocks/index.ts b/yarn-project/p2p/src/mocks/index.ts index af8366906d5..7788db5e73f 100644 --- a/yarn-project/p2p/src/mocks/index.ts +++ b/yarn-project/p2p/src/mocks/index.ts @@ -26,7 +26,7 @@ import { type BootnodeConfig, type P2PConfig } from '../config.js'; import { type MemPools } from '../mem_pools/interface.js'; import { DiscV5Service } from '../services/discv5/discV5_service.js'; import { LibP2PService } from '../services/libp2p/libp2p_service.js'; -import { type PeerScoring } from '../services/peer-scoring/peer_scoring.js'; +import { type PeerScoring } from '../services/peer-manager/peer_scoring.js'; import { type P2PReqRespConfig } from '../services/reqresp/config.js'; import { ReqRespSubProtocol, diff --git a/yarn-project/p2p/src/services/libp2p/libp2p_service.ts b/yarn-project/p2p/src/services/libp2p/libp2p_service.ts index 8348c2279db..df84659fea4 100644 --- a/yarn-project/p2p/src/services/libp2p/libp2p_service.ts +++ b/yarn-project/p2p/src/services/libp2p/libp2p_service.ts @@ -56,8 +56,8 @@ import { import { type PubSubLibp2p, convertToMultiaddr } from '../../util.js'; import { AztecDatastore } from '../data_store.js'; import { SnappyTransform, fastMsgIdFn, getMsgIdFn, msgIdToStrFn } from '../encoding.js'; -import { PeerScoring } from '../peer-scoring/peer_scoring.js'; -import { PeerManager } from '../peer_manager.js'; +import { PeerManager } from '../peer-manager/peer_manager.js'; +import { PeerScoring } from '../peer-manager/peer_scoring.js'; import { DEFAULT_SUB_PROTOCOL_VALIDATORS, ReqRespSubProtocol, type SubProtocolMap } from '../reqresp/interface.js'; import { reqGoodbyeHandler } from '../reqresp/protocols/goodbye.js'; import { pingHandler, statusHandler } from '../reqresp/protocols/index.js'; diff --git a/yarn-project/p2p/src/services/peer-manager/metrics.ts b/yarn-project/p2p/src/services/peer-manager/metrics.ts new file mode 100644 index 00000000000..f5d0356179a --- /dev/null +++ b/yarn-project/p2p/src/services/peer-manager/metrics.ts @@ -0,0 +1,31 @@ +import { + Attributes, + Metrics, + type TelemetryClient, + type Tracer, + type UpDownCounter, + ValueType, +} from '@aztec/telemetry-client'; + +import { type GoodByeReason, prettyGoodbyeReason } from '../reqresp/protocols/index.js'; + +export class PeerManagerMetrics { + private disconnectedPeers: UpDownCounter; + + public readonly tracer: Tracer; + + constructor(public readonly telemetryClient: TelemetryClient, name = 'PeerManager') { + this.tracer = telemetryClient.getTracer(name); + + const meter = telemetryClient.getMeter(name); + this.disconnectedPeers = meter.createUpDownCounter(Metrics.PEER_MANAGER_DISCONNECTED_PEERS, { + description: 'Number of disconnected peers', + unit: 'peers', + valueType: ValueType.INT, + }); + } + + public recordDisconnectedPeer(reason: GoodByeReason) { + this.disconnectedPeers.add(1, { [Attributes.P2P_GOODBYE_REASON]: prettyGoodbyeReason(reason) }); + } +} diff --git a/yarn-project/p2p/src/services/peer_manager.test.ts b/yarn-project/p2p/src/services/peer-manager/peer_manager.test.ts similarity index 97% rename from yarn-project/p2p/src/services/peer_manager.test.ts rename to yarn-project/p2p/src/services/peer-manager/peer_manager.test.ts index 5619c667e38..41e71c702ca 100644 --- a/yarn-project/p2p/src/services/peer_manager.test.ts +++ b/yarn-project/p2p/src/services/peer-manager/peer_manager.test.ts @@ -8,12 +8,12 @@ import { jest } from '@jest/globals'; import { createSecp256k1PeerId } from '@libp2p/peer-id-factory'; import { multiaddr } from '@multiformats/multiaddr'; -import { type P2PConfig, getP2PDefaultConfig } from '../config.js'; -import { PeerScoring } from './peer-scoring/peer_scoring.js'; +import { type P2PConfig, getP2PDefaultConfig } from '../../config.js'; +import { ReqRespSubProtocol } from '../reqresp/interface.js'; +import { GoodByeReason } from '../reqresp/protocols/index.js'; +import { PeerEvent } from '../types.js'; import { PeerManager } from './peer_manager.js'; -import { ReqRespSubProtocol } from './reqresp/interface.js'; -import { GoodByeReason } from './reqresp/protocols/index.js'; -import { PeerEvent } from './types.js'; +import { PeerScoring } from './peer_scoring.js'; describe('PeerManager', () => { const mockLibP2PNode: any = { diff --git a/yarn-project/p2p/src/services/peer_manager.ts b/yarn-project/p2p/src/services/peer-manager/peer_manager.ts similarity index 94% rename from yarn-project/p2p/src/services/peer_manager.ts rename to yarn-project/p2p/src/services/peer-manager/peer_manager.ts index 99bc0a196ac..38fd02f3e65 100644 --- a/yarn-project/p2p/src/services/peer_manager.ts +++ b/yarn-project/p2p/src/services/peer-manager/peer_manager.ts @@ -7,14 +7,15 @@ import { type Connection, type PeerId } from '@libp2p/interface'; import { type Multiaddr } from '@multiformats/multiaddr'; import { inspect } from 'util'; -import { type P2PConfig } from '../config.js'; -import { type PubSubLibp2p } from '../util.js'; -import { PeerScoreState, type PeerScoring } from './peer-scoring/peer_scoring.js'; -import { ReqRespSubProtocol } from './reqresp/interface.js'; -import { GoodByeReason, prettyGoodbyeReason } from './reqresp/protocols/goodbye.js'; -import { type ReqResp } from './reqresp/reqresp.js'; -import { type PeerDiscoveryService } from './service.js'; -import { PeerEvent } from './types.js'; +import { type P2PConfig } from '../../config.js'; +import { type PubSubLibp2p } from '../../util.js'; +import { ReqRespSubProtocol } from '../reqresp/interface.js'; +import { GoodByeReason, prettyGoodbyeReason } from '../reqresp/protocols/goodbye.js'; +import { type ReqResp } from '../reqresp/reqresp.js'; +import { type PeerDiscoveryService } from '../service.js'; +import { PeerEvent } from '../types.js'; +import { PeerManagerMetrics } from './metrics.js'; +import { PeerScoreState, type PeerScoring } from './peer_scoring.js'; const MAX_DIAL_ATTEMPTS = 3; const MAX_CACHED_PEERS = 100; @@ -34,12 +35,14 @@ type TimedOutPeer = { timeoutUntilMs: number; }; -export class PeerManager extends WithTracer { +export class PeerManager { private cachedPeers: Map = new Map(); private heartbeatCounter: number = 0; private displayPeerCountsPeerHeartbeat: number = 0; private timedOutPeers: Map = new Map(); + private metrics: PeerManagerMetrics; + constructor( private libP2PNode: PubSubLibp2p, private peerDiscoveryService: PeerDiscoveryService, @@ -49,7 +52,7 @@ export class PeerManager extends WithTracer { private peerScoring: PeerScoring, private reqresp: ReqResp, ) { - super(telemetryClient, 'PeerManager'); + this.metrics = new PeerManagerMetrics(telemetryClient, 'PeerManager'); // Handle new established connections this.libP2PNode.addEventListener(PeerEvent.CONNECTED, this.handleConnectedPeerEvent.bind(this)); @@ -63,6 +66,10 @@ export class PeerManager extends WithTracer { this.displayPeerCountsPeerHeartbeat = Math.floor(60_000 / this.config.peerCheckIntervalMS); } + get tracer() { + return this.metrics.tracer; + } + @trackSpan('PeerManager.heartbeat') public heartbeat() { this.heartbeatCounter++; @@ -251,6 +258,8 @@ export class PeerManager extends WithTracer { private async goodbyeAndDisconnectPeer(peer: PeerId, reason: GoodByeReason) { this.logger.debug(`Disconnecting peer ${peer.toString()} with reason ${prettyGoodbyeReason(reason)}`); + this.metrics.recordDisconnectedPeer(reason); + try { await this.reqresp.sendRequestToPeer(peer, ReqRespSubProtocol.GOODBYE, Buffer.from([reason])); } catch (error) { @@ -303,7 +312,7 @@ export class PeerManager extends WithTracer { } // check if peer is already connected const connections = this.libP2PNode.getConnections(); - if (connections.some(conn => conn.remotePeer.equals(peerId))) { + if (connections.some((conn: Connection) => conn.remotePeer.equals(peerId))) { this.logger.trace(`Already connected to peer ${peerId}`); return; } diff --git a/yarn-project/p2p/src/services/peer-scoring/peer_scoring.test.ts b/yarn-project/p2p/src/services/peer-manager/peer_scoring.test.ts similarity index 100% rename from yarn-project/p2p/src/services/peer-scoring/peer_scoring.test.ts rename to yarn-project/p2p/src/services/peer-manager/peer_scoring.test.ts diff --git a/yarn-project/p2p/src/services/peer-scoring/peer_scoring.ts b/yarn-project/p2p/src/services/peer-manager/peer_scoring.ts similarity index 100% rename from yarn-project/p2p/src/services/peer-scoring/peer_scoring.ts rename to yarn-project/p2p/src/services/peer-manager/peer_scoring.ts diff --git a/yarn-project/p2p/src/services/reqresp/protocols/goodbye.ts b/yarn-project/p2p/src/services/reqresp/protocols/goodbye.ts index 4e46bbb3ecb..d9a74de2b39 100644 --- a/yarn-project/p2p/src/services/reqresp/protocols/goodbye.ts +++ b/yarn-project/p2p/src/services/reqresp/protocols/goodbye.ts @@ -2,7 +2,7 @@ import { createLogger } from '@aztec/foundation/log'; import { type PeerId } from '@libp2p/interface'; -import { type PeerManager } from '../../peer_manager.js'; +import { type PeerManager } from '../../peer-manager/peer_manager.js'; import { ReqRespSubProtocol, type ReqRespSubProtocolHandler } from '../interface.js'; import { type ReqResp } from '../reqresp.js'; diff --git a/yarn-project/p2p/src/services/reqresp/rate_limiter/rate_limiter.test.ts b/yarn-project/p2p/src/services/reqresp/rate_limiter/rate_limiter.test.ts index b582a9146fb..68c6e044f3f 100644 --- a/yarn-project/p2p/src/services/reqresp/rate_limiter/rate_limiter.test.ts +++ b/yarn-project/p2p/src/services/reqresp/rate_limiter/rate_limiter.test.ts @@ -4,7 +4,7 @@ import { jest } from '@jest/globals'; import { type PeerId } from '@libp2p/interface'; import { type MockProxy, mock } from 'jest-mock-extended'; -import { type PeerScoring } from '../../peer-scoring/peer_scoring.js'; +import { type PeerScoring } from '../../peer-manager/peer_scoring.js'; import { ReqRespSubProtocol, type ReqRespSubProtocolRateLimits } from '../interface.js'; import { RequestResponseRateLimiter } from './rate_limiter.js'; diff --git a/yarn-project/p2p/src/services/reqresp/rate_limiter/rate_limiter.ts b/yarn-project/p2p/src/services/reqresp/rate_limiter/rate_limiter.ts index cbb41061e77..5477b65d295 100644 --- a/yarn-project/p2p/src/services/reqresp/rate_limiter/rate_limiter.ts +++ b/yarn-project/p2p/src/services/reqresp/rate_limiter/rate_limiter.ts @@ -7,7 +7,7 @@ import { PeerErrorSeverity } from '@aztec/circuit-types'; import { type PeerId } from '@libp2p/interface'; -import { type PeerScoring } from '../../peer-scoring/peer_scoring.js'; +import { type PeerScoring } from '../../peer-manager/peer_scoring.js'; import { type ReqRespSubProtocol, type ReqRespSubProtocolRateLimits } from '../interface.js'; import { DEFAULT_RATE_LIMITS } from './rate_limits.js'; diff --git a/yarn-project/p2p/src/services/reqresp/reqresp.test.ts b/yarn-project/p2p/src/services/reqresp/reqresp.test.ts index 86f655eb14d..3a3e9e4e7ac 100644 --- a/yarn-project/p2p/src/services/reqresp/reqresp.test.ts +++ b/yarn-project/p2p/src/services/reqresp/reqresp.test.ts @@ -15,7 +15,7 @@ import { startNodes, stopNodes, } from '../../mocks/index.js'; -import { type PeerScoring } from '../peer-scoring/peer_scoring.js'; +import { type PeerScoring } from '../peer-manager/peer_scoring.js'; import { ReqRespSubProtocol, RequestableBuffer } from './interface.js'; import { GoodByeReason } from './protocols/goodbye.js'; diff --git a/yarn-project/p2p/src/services/reqresp/reqresp.ts b/yarn-project/p2p/src/services/reqresp/reqresp.ts index 8d05c4b1480..1e4865af2bb 100644 --- a/yarn-project/p2p/src/services/reqresp/reqresp.ts +++ b/yarn-project/p2p/src/services/reqresp/reqresp.ts @@ -14,7 +14,7 @@ import { InvalidResponseError, } from '../../errors/reqresp.error.js'; import { SnappyTransform } from '../encoding.js'; -import { type PeerScoring } from '../peer-scoring/peer_scoring.js'; +import { type PeerScoring } from '../peer-manager/peer_scoring.js'; import { type P2PReqRespConfig } from './config.js'; import { DEFAULT_SUB_PROTOCOL_HANDLERS, diff --git a/yarn-project/telemetry-client/src/attributes.ts b/yarn-project/telemetry-client/src/attributes.ts index 56b1105052b..15fdeafa62f 100644 --- a/yarn-project/telemetry-client/src/attributes.ts +++ b/yarn-project/telemetry-client/src/attributes.ts @@ -70,6 +70,8 @@ export const L1_TX_TYPE = 'aztec.l1.tx_type'; export const L1_SENDER = 'aztec.l1.sender'; /** The phase of the transaction */ export const TX_PHASE_NAME = 'aztec.tx.phase_name'; +/** The reason for disconnecting a peer */ +export const P2P_GOODBYE_REASON = 'aztec.p2p.goodbye.reason'; /** The proving job type */ export const PROVING_JOB_TYPE = 'aztec.proving.job_type'; /** The proving job id */ diff --git a/yarn-project/telemetry-client/src/metrics.ts b/yarn-project/telemetry-client/src/metrics.ts index ccd4ed7c5fc..22a039b8112 100644 --- a/yarn-project/telemetry-client/src/metrics.ts +++ b/yarn-project/telemetry-client/src/metrics.ts @@ -68,6 +68,8 @@ export const L1_PUBLISHER_TX_CALLDATA_GAS = 'aztec.l1_publisher.tx_calldata_gas' export const L1_PUBLISHER_TX_BLOBDATA_GAS_USED = 'aztec.l1_publisher.tx_blobdata_gas_used'; export const L1_PUBLISHER_TX_BLOBDATA_GAS_COST = 'aztec.l1_publisher.tx_blobdata_gas_cost'; +export const PEER_MANAGER_DISCONNECTED_PEERS = 'aztec.peer_manager.disconnected_peers'; + export const PUBLIC_PROCESSOR_TX_DURATION = 'aztec.public_processor.tx_duration'; export const PUBLIC_PROCESSOR_TX_COUNT = 'aztec.public_processor.tx_count'; export const PUBLIC_PROCESSOR_TX_PHASE_COUNT = 'aztec.public_processor.tx_phase_count'; From b755ab56748e07f5c673c0d7051cc67fd40ef37d Mon Sep 17 00:00:00 2001 From: Maddiaa0 <47148561+Maddiaa0@users.noreply.github.com> Date: Sat, 18 Jan 2025 17:38:52 +0000 Subject: [PATCH 09/11] chore: reqresp + goodbye tests --- .../p2p/src/services/peer-manager/metrics.ts | 20 ++++-- .../peer-manager/peer_manager.test.ts | 61 ++++++++++++++++++- .../src/services/peer-manager/peer_manager.ts | 25 +++++--- .../reqresp/protocols/goodbye.test.ts | 15 +++++ .../src/services/reqresp/protocols/goodbye.ts | 38 +++++++++--- .../p2p/src/services/reqresp/reqresp.test.ts | 28 +++++++-- yarn-project/telemetry-client/src/metrics.ts | 3 +- 7 files changed, 162 insertions(+), 28 deletions(-) create mode 100644 yarn-project/p2p/src/services/reqresp/protocols/goodbye.test.ts diff --git a/yarn-project/p2p/src/services/peer-manager/metrics.ts b/yarn-project/p2p/src/services/peer-manager/metrics.ts index f5d0356179a..7725f22ef7d 100644 --- a/yarn-project/p2p/src/services/peer-manager/metrics.ts +++ b/yarn-project/p2p/src/services/peer-manager/metrics.ts @@ -10,7 +10,8 @@ import { import { type GoodByeReason, prettyGoodbyeReason } from '../reqresp/protocols/index.js'; export class PeerManagerMetrics { - private disconnectedPeers: UpDownCounter; + private sentGoodbyes: UpDownCounter; + private receivedGoodbyes: UpDownCounter; public readonly tracer: Tracer; @@ -18,14 +19,23 @@ export class PeerManagerMetrics { this.tracer = telemetryClient.getTracer(name); const meter = telemetryClient.getMeter(name); - this.disconnectedPeers = meter.createUpDownCounter(Metrics.PEER_MANAGER_DISCONNECTED_PEERS, { - description: 'Number of disconnected peers', + this.sentGoodbyes = meter.createUpDownCounter(Metrics.PEER_MANAGER_GOODBYES_SENT, { + description: 'Number of goodbyes sent to peers', unit: 'peers', valueType: ValueType.INT, }); + this.receivedGoodbyes = meter.createUpDownCounter(Metrics.PEER_MANAGER_GOODBYES_RECEIVED, { + description: 'Number of goodbyes received from peers', + unit: 'peers', + valueType: ValueType.INT, + }); + } + + public recordGoodbyeSent(reason: GoodByeReason) { + this.sentGoodbyes.add(1, { [Attributes.P2P_GOODBYE_REASON]: prettyGoodbyeReason(reason) }); } - public recordDisconnectedPeer(reason: GoodByeReason) { - this.disconnectedPeers.add(1, { [Attributes.P2P_GOODBYE_REASON]: prettyGoodbyeReason(reason) }); + public recordGoodbyeReceived(reason: GoodByeReason) { + this.receivedGoodbyes.add(1, { [Attributes.P2P_GOODBYE_REASON]: prettyGoodbyeReason(reason) }); } } diff --git a/yarn-project/p2p/src/services/peer-manager/peer_manager.test.ts b/yarn-project/p2p/src/services/peer-manager/peer_manager.test.ts index 41e71c702ca..a39a9031950 100644 --- a/yarn-project/p2p/src/services/peer-manager/peer_manager.test.ts +++ b/yarn-project/p2p/src/services/peer-manager/peer_manager.test.ts @@ -1,7 +1,7 @@ import { PeerErrorSeverity } from '@aztec/circuit-types'; import { createLogger } from '@aztec/foundation/log'; import { sleep } from '@aztec/foundation/sleep'; -import { getTelemetryClient } from '@aztec/telemetry-client'; +import { Attributes, getTelemetryClient } from '@aztec/telemetry-client'; import { type ENR, SignableENR } from '@chainsafe/enr'; import { jest } from '@jest/globals'; @@ -132,6 +132,20 @@ describe('PeerManager', () => { // Verify that discover was called expect(mockPeerDiscoveryService.runRandomNodesQuery).toHaveBeenCalled(); }); + + it('should send goodbye to peers on shutdown', async () => { + const peerId = await createSecp256k1PeerId(); + const peerId2 = await createSecp256k1PeerId(); + mockLibP2PNode.getPeers.mockReturnValue([peerId, peerId2]); + + const goodbyeAndDisconnectPeerSpy = jest.spyOn(peerManager as any, 'goodbyeAndDisconnectPeer'); + + await peerManager.stop(); + + // Both peers were sent goodbyes on shutdown + expect(goodbyeAndDisconnectPeerSpy).toHaveBeenCalledWith(peerId, GoodByeReason.SHUTDOWN); + expect(goodbyeAndDisconnectPeerSpy).toHaveBeenCalledWith(peerId2, GoodByeReason.SHUTDOWN); + }); }); describe('peer timeout functionality', () => { @@ -337,4 +351,49 @@ describe('PeerManager', () => { expect(mockReqResp.sendRequestToPeer).toHaveBeenCalledTimes(2); }); }); + + describe('goodbye metrics', () => { + it('should record metrics when receiving goodbye messages', async () => { + const peerId = await createSecp256k1PeerId(); + + // Get reference to the counter's add function + const goodbyeReceivedMetric = jest.spyOn((peerManager as any).metrics.receivedGoodbyes, 'add'); + + // Test receiving goodbye for different reasons + peerManager.goodbyeReceived(peerId, GoodByeReason.BANNED); + expect(goodbyeReceivedMetric).toHaveBeenCalledWith(1, { [Attributes.P2P_GOODBYE_REASON]: 'banned' }); + + peerManager.goodbyeReceived(peerId, GoodByeReason.DISCONNECTED); + expect(goodbyeReceivedMetric).toHaveBeenCalledWith(1, { [Attributes.P2P_GOODBYE_REASON]: 'disconnected' }); + + peerManager.goodbyeReceived(peerId, GoodByeReason.SHUTDOWN); + expect(goodbyeReceivedMetric).toHaveBeenCalledWith(1, { [Attributes.P2P_GOODBYE_REASON]: 'shutdown' }); + }); + + it('should record metrics when sending goodbye messages', async () => { + const peerId = await createSecp256k1PeerId(); + + // Get reference to the counter's add function + const goodbyeSentMetric = jest.spyOn((peerManager as any).metrics.sentGoodbyes, 'add'); + + // Mock connections to include our test peer + mockLibP2PNode.getConnections.mockReturnValue([{ remotePeer: peerId }]); + + // Test sending goodbye for different scenarios + + // Test banned scenario + peerManager.penalizePeer(peerId, PeerErrorSeverity.LowToleranceError); // Set score below -100 + peerManager.penalizePeer(peerId, PeerErrorSeverity.LowToleranceError); + peerManager.penalizePeer(peerId, PeerErrorSeverity.HighToleranceError); + peerManager.heartbeat(); + expect(goodbyeSentMetric).toHaveBeenCalledWith(1, { [Attributes.P2P_GOODBYE_REASON]: 'banned' }); + + // Reset mocks + mockLibP2PNode.getPeers.mockReturnValue([{ remotePeer: peerId }]); + + // Test shutdown scenario + await peerManager.stop(); + expect(goodbyeSentMetric).toHaveBeenCalledWith(1, { [Attributes.P2P_GOODBYE_REASON]: 'shutdown' }); + }); + }); }); diff --git a/yarn-project/p2p/src/services/peer-manager/peer_manager.ts b/yarn-project/p2p/src/services/peer-manager/peer_manager.ts index 38fd02f3e65..6b5b9ba4570 100644 --- a/yarn-project/p2p/src/services/peer-manager/peer_manager.ts +++ b/yarn-project/p2p/src/services/peer-manager/peer_manager.ts @@ -1,6 +1,6 @@ import { type PeerErrorSeverity, type PeerInfo } from '@aztec/circuit-types'; import { createLogger } from '@aztec/foundation/log'; -import { type TelemetryClient, WithTracer, trackSpan } from '@aztec/telemetry-client'; +import { type TelemetryClient, trackSpan } from '@aztec/telemetry-client'; import { type ENR } from '@chainsafe/enr'; import { type Connection, type PeerId } from '@libp2p/interface'; @@ -123,9 +123,17 @@ export class PeerManager { } } - // TODO: include reason here and add to metrics, but this is fine for now - public goodbyeReceived(peerId: PeerId) { - this.logger.debug(`Goodbye received from peer ${peerId.toString()}`); + /** + * Handles a goodbye received from a peer. + * + * Used as the reqresp handler when a peer sends us goodbye message. + * @param peerId - The peer ID. + * @param reason - The reason for the goodbye. + */ + public goodbyeReceived(peerId: PeerId, reason: GoodByeReason) { + this.logger.debug(`Goodbye received from peer ${peerId.toString()} with reason ${prettyGoodbyeReason(reason)}`); + + this.metrics.recordGoodbyeReceived(reason); void this.disconnectPeer(peerId); } @@ -258,7 +266,7 @@ export class PeerManager { private async goodbyeAndDisconnectPeer(peer: PeerId, reason: GoodByeReason) { this.logger.debug(`Disconnecting peer ${peer.toString()} with reason ${prettyGoodbyeReason(reason)}`); - this.metrics.recordDisconnectedPeer(reason); + this.metrics.recordGoodbyeSent(reason); try { await this.reqresp.sendRequestToPeer(peer, ReqRespSubProtocol.GOODBYE, Buffer.from([reason])); @@ -403,14 +411,15 @@ export class PeerManager { * Removing all event listeners. */ public async stop() { - this.libP2PNode.removeEventListener(PeerEvent.CONNECTED, this.handleConnectedPeerEvent); - this.libP2PNode.removeEventListener(PeerEvent.DISCONNECTED, this.handleDisconnectedPeerEvent); this.peerDiscoveryService.off(PeerEvent.DISCOVERED, this.handleDiscoveredPeer); // Send goodbyes to all peers await Promise.all( - this.libP2PNode.getPeers().map(peer => this.goodbyeAndDisconnectPeer(peer, GoodByeReason.DISCONNECTED)), + this.libP2PNode.getPeers().map(peer => this.goodbyeAndDisconnectPeer(peer, GoodByeReason.SHUTDOWN)), ); + + this.libP2PNode.removeEventListener(PeerEvent.CONNECTED, this.handleConnectedPeerEvent); + this.libP2PNode.removeEventListener(PeerEvent.DISCONNECTED, this.handleDisconnectedPeerEvent); } } diff --git a/yarn-project/p2p/src/services/reqresp/protocols/goodbye.test.ts b/yarn-project/p2p/src/services/reqresp/protocols/goodbye.test.ts new file mode 100644 index 00000000000..20554469e3d --- /dev/null +++ b/yarn-project/p2p/src/services/reqresp/protocols/goodbye.test.ts @@ -0,0 +1,15 @@ +import { GoodByeReason, decodeGoodbyeReason, encodeGoodbyeReason } from './goodbye.js'; + +describe('goodbye', () => { + it('should encode and decode goodbye reason', () => { + const reason = GoodByeReason.SHUTDOWN; + const encoded = encodeGoodbyeReason(reason); + const decoded = decodeGoodbyeReason(encoded); + expect(decoded).toBe(reason); + }); + + it('should return unknown if the goodbye reason buffer length is invalid', () => { + const invalidBuffer = Buffer.from([0x1, 0x2]); + expect(decodeGoodbyeReason(invalidBuffer)).toBe(GoodByeReason.UNKNOWN); + }); +}); diff --git a/yarn-project/p2p/src/services/reqresp/protocols/goodbye.ts b/yarn-project/p2p/src/services/reqresp/protocols/goodbye.ts index d9a74de2b39..9811a53638e 100644 --- a/yarn-project/p2p/src/services/reqresp/protocols/goodbye.ts +++ b/yarn-project/p2p/src/services/reqresp/protocols/goodbye.ts @@ -6,22 +6,37 @@ import { type PeerManager } from '../../peer-manager/peer_manager.js'; import { ReqRespSubProtocol, type ReqRespSubProtocolHandler } from '../interface.js'; import { type ReqResp } from '../reqresp.js'; -// TODO: implement fully - /** * Enum defining the possible reasons for a goodbye message. */ export enum GoodByeReason { /** The peer has shutdown, will be received whenever a peer's node is routinely stopped */ SHUTDOWN = 0x1, - // TOOD(md): what is the correct values to put in here - read other specs to see reasons - // what is even the point of the reason /** Whenever the peer must disconnect due to maintaining max peers */ DISCONNECTED = 0x2, /** The peer has a low score, will be received whenever a peer's score is low */ LOW_SCORE = 0x3, /** The peer has been banned, will be received whenever a peer is banned */ BANNED = 0x4, + /** Wrong network / fork */ + WRONG_NETWORK = 0x5, + /** Unknown reason */ + UNKNOWN = 0x6, +} + +export function encodeGoodbyeReason(reason: GoodByeReason): Buffer { + return Buffer.from([reason]); +} + +export function decodeGoodbyeReason(buffer: Buffer): GoodByeReason { + try { + if (buffer.length !== 1) { + throw new Error('Invalid goodbye reason buffer length'); + } + return buffer[0] as GoodByeReason; + } catch (error) { + return GoodByeReason.UNKNOWN; + } } /** @@ -36,9 +51,14 @@ export function prettyGoodbyeReason(reason: GoodByeReason): string { case GoodByeReason.DISCONNECTED: return 'disconnected'; case GoodByeReason.LOW_SCORE: - return 'low score'; + return 'low_score'; case GoodByeReason.BANNED: return 'banned'; + // TODO(https://github.com/AztecProtocol/aztec-packages/issues/11328): implement + case GoodByeReason.WRONG_NETWORK: + return 'wrong_network'; + case GoodByeReason.UNKNOWN: + return 'unknown'; } } @@ -67,9 +87,11 @@ export class GoodbyeProtocolHandler { */ export function reqGoodbyeHandler(peerManager: PeerManager): ReqRespSubProtocolHandler { return (peerId: PeerId, _msg: Buffer) => { - peerManager.goodbyeReceived(peerId); + const reason = decodeGoodbyeReason(_msg); + + peerManager.goodbyeReceived(peerId, reason); - // TODO(md): they want to receive some kind of response, but we don't have a response here - return Promise.resolve(Buffer.from('')); + // Return a buffer of length 1 as an acknowledgement + return Promise.resolve(Buffer.from([0x0])); }; } diff --git a/yarn-project/p2p/src/services/reqresp/reqresp.test.ts b/yarn-project/p2p/src/services/reqresp/reqresp.test.ts index 3a3e9e4e7ac..f611f38af06 100644 --- a/yarn-project/p2p/src/services/reqresp/reqresp.test.ts +++ b/yarn-project/p2p/src/services/reqresp/reqresp.test.ts @@ -15,20 +15,23 @@ import { startNodes, stopNodes, } from '../../mocks/index.js'; +import { PeerManager } from '../peer-manager/peer_manager.js'; import { type PeerScoring } from '../peer-manager/peer_scoring.js'; import { ReqRespSubProtocol, RequestableBuffer } from './interface.js'; -import { GoodByeReason } from './protocols/goodbye.js'; +import { GoodByeReason, reqGoodbyeHandler } from './protocols/goodbye.js'; const PING_REQUEST = RequestableBuffer.fromBuffer(Buffer.from('ping')); // The Req Resp protocol should allow nodes to dial specific peers // and ask for specific data that they missed via the traditional gossip protocol. describe('ReqResp', () => { + let peerManager: MockProxy; let peerScoring: MockProxy; let nodes: ReqRespNode[]; beforeEach(() => { peerScoring = mock(); + peerManager = mock(); }); afterEach(async () => { @@ -314,19 +317,34 @@ describe('ReqResp', () => { }); describe('Goodbye protocol', () => { - it('Should send a goodbye message to a peer', async () => { - const nodes = await createNodes(peerScoring, 2); + it('should send a goodbye message to a peer', async () => { + nodes = await createNodes(peerScoring, 2); - await startNodes(nodes); + const protocolHandlers = MOCK_SUB_PROTOCOL_HANDLERS; + // Req Goodbye Handler is defined in the reqresp.ts file + protocolHandlers[ReqRespSubProtocol.GOODBYE] = reqGoodbyeHandler(peerManager); + + await startNodes(nodes, protocolHandlers); await sleep(500); await connectToPeers(nodes); await sleep(500); - await nodes[0].req.sendRequestToPeer( + const response = await nodes[0].req.sendRequestToPeer( nodes[1].p2p.peerId, ReqRespSubProtocol.GOODBYE, Buffer.from([GoodByeReason.SHUTDOWN]), ); + + // Node 1 Peer manager receives the goodbye from the sending node + expect(peerManager.goodbyeReceived).toHaveBeenCalledWith( + expect.objectContaining({ + publicKey: nodes[0].p2p.peerId.publicKey, + }), + GoodByeReason.SHUTDOWN, + ); + + // Expect the response to be a buffer of length 1 + expect(response).toEqual(Buffer.from([0x0])); }); }); }); diff --git a/yarn-project/telemetry-client/src/metrics.ts b/yarn-project/telemetry-client/src/metrics.ts index 22a039b8112..f755bbde8f4 100644 --- a/yarn-project/telemetry-client/src/metrics.ts +++ b/yarn-project/telemetry-client/src/metrics.ts @@ -68,7 +68,8 @@ export const L1_PUBLISHER_TX_CALLDATA_GAS = 'aztec.l1_publisher.tx_calldata_gas' export const L1_PUBLISHER_TX_BLOBDATA_GAS_USED = 'aztec.l1_publisher.tx_blobdata_gas_used'; export const L1_PUBLISHER_TX_BLOBDATA_GAS_COST = 'aztec.l1_publisher.tx_blobdata_gas_cost'; -export const PEER_MANAGER_DISCONNECTED_PEERS = 'aztec.peer_manager.disconnected_peers'; +export const PEER_MANAGER_GOODBYES_SENT = 'aztec.peer_manager.goodbyes_sent'; +export const PEER_MANAGER_GOODBYES_RECEIVED = 'aztec.peer_manager.goodbyes_received'; export const PUBLIC_PROCESSOR_TX_DURATION = 'aztec.public_processor.tx_duration'; export const PUBLIC_PROCESSOR_TX_COUNT = 'aztec.public_processor.tx_count'; From cc0ca5071035f1b6874f1a03566232b059735a3d Mon Sep 17 00:00:00 2001 From: Maddiaa0 <47148561+Maddiaa0@users.noreply.github.com> Date: Sat, 18 Jan 2025 18:03:25 +0000 Subject: [PATCH 10/11] fmt --- yarn-project/p2p/src/services/libp2p/libp2p_service.ts | 6 ++++++ .../p2p/src/services/peer-manager/peer_scoring.ts | 2 +- .../p2p/src/services/reqresp/protocols/goodbye.ts | 10 +++++++--- .../reqresp/{rate_limiter => rate-limiter}/index.ts | 0 .../rate_limiter.test.ts | 0 .../{rate_limiter => rate-limiter}/rate_limiter.ts | 0 .../{rate_limiter => rate-limiter}/rate_limits.ts | 1 - yarn-project/p2p/src/services/reqresp/reqresp.test.ts | 2 +- 8 files changed, 15 insertions(+), 6 deletions(-) rename yarn-project/p2p/src/services/reqresp/{rate_limiter => rate-limiter}/index.ts (100%) rename yarn-project/p2p/src/services/reqresp/{rate_limiter => rate-limiter}/rate_limiter.test.ts (100%) rename yarn-project/p2p/src/services/reqresp/{rate_limiter => rate-limiter}/rate_limiter.ts (100%) rename yarn-project/p2p/src/services/reqresp/{rate_limiter => rate-limiter}/rate_limits.ts (93%) diff --git a/yarn-project/p2p/src/services/libp2p/libp2p_service.ts b/yarn-project/p2p/src/services/libp2p/libp2p_service.ts index df84659fea4..7394469bb28 100644 --- a/yarn-project/p2p/src/services/libp2p/libp2p_service.ts +++ b/yarn-project/p2p/src/services/libp2p/libp2p_service.ts @@ -132,6 +132,12 @@ export class LibP2PService extends WithTracer implement this.reqresp, ); + // Update gossipsub score params + this.node.services.pubsub.score.params.appSpecificScore = (peerId: string) => { + return this.peerManager.getPeerScore(peerId); + }; + this.node.services.pubsub.score.params.appSpecificWeight = 10; + this.attestationValidator = new AttestationValidator(epochCache); this.blockProposalValidator = new BlockProposalValidator(epochCache); this.epochProofQuoteValidator = new EpochProofQuoteValidator(epochCache); diff --git a/yarn-project/p2p/src/services/peer-manager/peer_scoring.ts b/yarn-project/p2p/src/services/peer-manager/peer_scoring.ts index 1fa1eb141dc..ffc1b65501f 100644 --- a/yarn-project/p2p/src/services/peer-manager/peer_scoring.ts +++ b/yarn-project/p2p/src/services/peer-manager/peer_scoring.ts @@ -88,7 +88,7 @@ export class PeerScoring { } public getScoreState(peerId: string): PeerScoreState { - // TODO: permanently store banned peers? + // TODO(#11329): permanently store banned peers? const score = this.getScore(peerId); if (score < MIN_SCORE_BEFORE_BAN) { return PeerScoreState.Banned; diff --git a/yarn-project/p2p/src/services/reqresp/protocols/goodbye.ts b/yarn-project/p2p/src/services/reqresp/protocols/goodbye.ts index 9811a53638e..888e9e8d2cd 100644 --- a/yarn-project/p2p/src/services/reqresp/protocols/goodbye.ts +++ b/yarn-project/p2p/src/services/reqresp/protocols/goodbye.ts @@ -54,7 +54,7 @@ export function prettyGoodbyeReason(reason: GoodByeReason): string { return 'low_score'; case GoodByeReason.BANNED: return 'banned'; - // TODO(https://github.com/AztecProtocol/aztec-packages/issues/11328): implement + // TODO(#11328): implement case GoodByeReason.WRONG_NETWORK: return 'wrong_network'; case GoodByeReason.UNKNOWN: @@ -81,8 +81,12 @@ export class GoodbyeProtocolHandler { } /** - * Handles the goodbye request. - * @param _msg - The goodbye request message. + * Handles the goodbye request. In request response, the goodbye request is handled by the peer manager. + * + * @dev Implemented as returning a function as the function is bound in the libp2p service, however + * its implementation is here to keep functionality together. + * + * @param peerManager - The peer manager. * @returns A resolved promise with the goodbye response. */ export function reqGoodbyeHandler(peerManager: PeerManager): ReqRespSubProtocolHandler { diff --git a/yarn-project/p2p/src/services/reqresp/rate_limiter/index.ts b/yarn-project/p2p/src/services/reqresp/rate-limiter/index.ts similarity index 100% rename from yarn-project/p2p/src/services/reqresp/rate_limiter/index.ts rename to yarn-project/p2p/src/services/reqresp/rate-limiter/index.ts diff --git a/yarn-project/p2p/src/services/reqresp/rate_limiter/rate_limiter.test.ts b/yarn-project/p2p/src/services/reqresp/rate-limiter/rate_limiter.test.ts similarity index 100% rename from yarn-project/p2p/src/services/reqresp/rate_limiter/rate_limiter.test.ts rename to yarn-project/p2p/src/services/reqresp/rate-limiter/rate_limiter.test.ts diff --git a/yarn-project/p2p/src/services/reqresp/rate_limiter/rate_limiter.ts b/yarn-project/p2p/src/services/reqresp/rate-limiter/rate_limiter.ts similarity index 100% rename from yarn-project/p2p/src/services/reqresp/rate_limiter/rate_limiter.ts rename to yarn-project/p2p/src/services/reqresp/rate-limiter/rate_limiter.ts diff --git a/yarn-project/p2p/src/services/reqresp/rate_limiter/rate_limits.ts b/yarn-project/p2p/src/services/reqresp/rate-limiter/rate_limits.ts similarity index 93% rename from yarn-project/p2p/src/services/reqresp/rate_limiter/rate_limits.ts rename to yarn-project/p2p/src/services/reqresp/rate-limiter/rate_limits.ts index 23bb99dad7a..e0c2dbdb6e4 100644 --- a/yarn-project/p2p/src/services/reqresp/rate_limiter/rate_limits.ts +++ b/yarn-project/p2p/src/services/reqresp/rate-limiter/rate_limits.ts @@ -32,7 +32,6 @@ export const DEFAULT_RATE_LIMITS: ReqRespSubProtocolRateLimits = { quotaCount: 10, }, }, - // TODO(md): feels like goodbye is an exception to the rule here [ReqRespSubProtocol.GOODBYE]: { peerLimit: { quotaTimeMs: 1000, diff --git a/yarn-project/p2p/src/services/reqresp/reqresp.test.ts b/yarn-project/p2p/src/services/reqresp/reqresp.test.ts index f611f38af06..c5b0fd43e5b 100644 --- a/yarn-project/p2p/src/services/reqresp/reqresp.test.ts +++ b/yarn-project/p2p/src/services/reqresp/reqresp.test.ts @@ -15,7 +15,7 @@ import { startNodes, stopNodes, } from '../../mocks/index.js'; -import { PeerManager } from '../peer-manager/peer_manager.js'; +import { type PeerManager } from '../peer-manager/peer_manager.js'; import { type PeerScoring } from '../peer-manager/peer_scoring.js'; import { ReqRespSubProtocol, RequestableBuffer } from './interface.js'; import { GoodByeReason, reqGoodbyeHandler } from './protocols/goodbye.js'; From ad86e983b5c719b1d3da28ba4115cdc76271efae Mon Sep 17 00:00:00 2001 From: Maddiaa0 <47148561+Maddiaa0@users.noreply.github.com> Date: Sat, 18 Jan 2025 20:08:30 +0000 Subject: [PATCH 11/11] fix: rebase --- yarn-project/p2p/src/services/reqresp/reqresp.ts | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/yarn-project/p2p/src/services/reqresp/reqresp.ts b/yarn-project/p2p/src/services/reqresp/reqresp.ts index 1e4865af2bb..a242e1357ee 100644 --- a/yarn-project/p2p/src/services/reqresp/reqresp.ts +++ b/yarn-project/p2p/src/services/reqresp/reqresp.ts @@ -25,7 +25,7 @@ import { type SubProtocolMap, subProtocolMap, } from './interface.js'; -import { RequestResponseRateLimiter } from './rate_limiter/rate_limiter.js'; +import { RequestResponseRateLimiter } from './rate-limiter/rate_limiter.js'; /** * The Request Response Service