diff --git a/yarn-project/end-to-end/src/e2e_p2p/p2p_network.ts b/yarn-project/end-to-end/src/e2e_p2p/p2p_network.ts index c1bf1f211b1..34c5f9dce4c 100644 --- a/yarn-project/end-to-end/src/e2e_p2p/p2p_network.ts +++ b/yarn-project/end-to-end/src/e2e_p2p/p2p_network.ts @@ -8,7 +8,7 @@ import { type Logger, createLogger } from '@aztec/foundation/log'; import { ForwarderAbi, ForwarderBytecode, RollupAbi, TestERC20Abi } from '@aztec/l1-artifacts'; import { SpamContract } from '@aztec/noir-contracts.js/Spam'; import { type BootstrapNode } from '@aztec/p2p'; -import { createBootstrapNodeFromPrivateKey } from '@aztec/p2p/mocks'; +import { createBootstrapNodeFromPrivateKey } from '@aztec/p2p/test-helpers'; import getPort from 'get-port'; import { getContract } from 'viem'; diff --git a/yarn-project/epoch-cache/src/epoch_cache.ts b/yarn-project/epoch-cache/src/epoch_cache.ts index eaa2fca5d45..bb61ce13295 100644 --- a/yarn-project/epoch-cache/src/epoch_cache.ts +++ b/yarn-project/epoch-cache/src/epoch_cache.ts @@ -20,6 +20,20 @@ type EpochAndSlot = { ts: bigint; }; +export interface EpochCacheInterface { + getCommittee(nextSlot: boolean): Promise; + getEpochAndSlotNow(): EpochAndSlot; + getProposerIndexEncoding(epoch: bigint, slot: bigint, seed: bigint): `0x${string}`; + computeProposerIndex(slot: bigint, epoch: bigint, seed: bigint, size: bigint): bigint; + getProposerInCurrentOrNextSlot(): Promise<{ + currentProposer: EthAddress; + nextProposer: EthAddress; + currentSlot: bigint; + nextSlot: bigint; + }>; + isInCommittee(validator: EthAddress): Promise; +} + /** * Epoch cache * @@ -30,7 +44,10 @@ type EpochAndSlot = { * * Note: This class is very dependent on the system clock being in sync. */ -export class EpochCache extends EventEmitter<{ committeeChanged: [EthAddress[], bigint] }> { +export class EpochCache + extends EventEmitter<{ committeeChanged: [EthAddress[], bigint] }> + implements EpochCacheInterface +{ private committee: EthAddress[]; private cachedEpoch: bigint; private cachedSampleSeed: bigint; @@ -99,12 +116,12 @@ export class EpochCache extends EventEmitter<{ committeeChanged: [EthAddress[], return this.getEpochAndSlotAtTimestamp(this.nowInSeconds()); } - getEpochAndSlotInNextSlot(): EpochAndSlot { + private getEpochAndSlotInNextSlot(): EpochAndSlot { const nextSlotTs = this.nowInSeconds() + BigInt(this.l1constants.slotDuration); return this.getEpochAndSlotAtTimestamp(nextSlotTs); } - getEpochAndSlotAtTimestamp(ts: bigint): EpochAndSlot { + private getEpochAndSlotAtTimestamp(ts: bigint): EpochAndSlot { return { epoch: getEpochNumberAtTimestamp(ts, this.l1constants), slot: getSlotAtTimestamp(ts, this.l1constants), diff --git a/yarn-project/foundation/src/config/env_var.ts b/yarn-project/foundation/src/config/env_var.ts index 366bf694a08..8d23aa4250a 100644 --- a/yarn-project/foundation/src/config/env_var.ts +++ b/yarn-project/foundation/src/config/env_var.ts @@ -42,6 +42,8 @@ export type EnvVar = | 'DATA_DIRECTORY' | 'DATA_STORE_MAP_SIZE_KB' | 'DEBUG' + | 'DEBUG_P2P_DISABLE_MESSAGE_VALIDATION' + | 'DEBUG_P2P_DISABLE_COLOCATION_PENALTY' | 'DEPLOY_AZTEC_CONTRACTS_SALT' | 'DEPLOY_AZTEC_CONTRACTS' | 'ENFORCE_FEES' @@ -83,6 +85,8 @@ export type EnvVar = | 'P2P_GOSSIPSUB_D' | 'P2P_GOSSIPSUB_DHI' | 'P2P_GOSSIPSUB_DLO' + | 'P2P_GOSSIPSUB_DLAZY' + | 'P2P_GOSSIPSUB_FLOOD_PUBLISH' | 'P2P_GOSSIPSUB_INTERVAL_MS' | 'P2P_GOSSIPSUB_MCACHE_GOSSIP' | 'P2P_GOSSIPSUB_MCACHE_LENGTH' diff --git a/yarn-project/foundation/src/log/log-filters.ts b/yarn-project/foundation/src/log/log-filters.ts index b03188193a6..1884f093f98 100644 --- a/yarn-project/foundation/src/log/log-filters.ts +++ b/yarn-project/foundation/src/log/log-filters.ts @@ -4,8 +4,16 @@ export type LogFilters = [string, LogLevel][]; export function getLogLevelFromFilters(filters: LogFilters, module: string): LogLevel | undefined { for (const [filterModule, level] of filters) { - if (module.startsWith(filterModule)) { - return level as LogLevel; + try { + const regex = new RegExp(filterModule); + if (regex.test(module)) { + return level as LogLevel; + } + } catch { + // If regex is invalid, fall back to startsWith check + if (module.startsWith(filterModule)) { + return level as LogLevel; + } } } return undefined; diff --git a/yarn-project/foundation/src/log/pino-logger.ts b/yarn-project/foundation/src/log/pino-logger.ts index e64ebe5e345..10e5847ffd0 100644 --- a/yarn-project/foundation/src/log/pino-logger.ts +++ b/yarn-project/foundation/src/log/pino-logger.ts @@ -11,14 +11,23 @@ import { getLogLevelFromFilters, parseEnv } from './log-filters.js'; import { type LogLevel } from './log-levels.js'; import { type LogData, type LogFn } from './log_fn.js'; -export function createLogger(module: string): Logger { +export function createLogger(module: string, fixedTerms = {}): Logger { module = logNameHandlers.reduce((moduleName, handler) => handler(moduleName), module.replace(/^aztec:/, '')); const pinoLogger = logger.child({ module }, { level: getLogLevelFromFilters(logFilters, module) }); + // Only perform copy of data if fixed terms are provided + const hasFixedTerms = Object.keys(fixedTerms).length > 0; + // We check manually for isLevelEnabled to avoid calling processLogData unnecessarily. // Note that isLevelEnabled is missing from the browser version of pino. const logFn = (level: LogLevel, msg: string, data?: unknown) => - isLevelEnabled(pinoLogger, level) && pinoLogger[level](processLogData((data as LogData) ?? {}), msg); + isLevelEnabled(pinoLogger, level) && + pinoLogger[level]( + hasFixedTerms + ? processLogData({ ...fixedTerms, ...(data ?? {}) } as LogData) + : processLogData((data as LogData) ?? {}), + msg, + ); return { silent: () => {}, @@ -149,7 +158,10 @@ function makeLogger() { if (!isNode) { // We are on the browser. return pino({ ...pinoOpts, browser: { asObject: false } }); - } else if (process.env.JEST_WORKER_ID) { + } + // If running in a child process then cancel this if statement section by uncommenting below + // else if (false) { + else if (process.env.JEST_WORKER_ID) { // We are on jest, so we need sync logging and stream to stderr. // We expect jest/setup.mjs to kick in later and replace set up a pretty logger, // but if for some reason it doesn't, at least we're covered with a default logger. diff --git a/yarn-project/p2p/package.json b/yarn-project/p2p/package.json index cfdf14ea188..7772e62b2fe 100644 --- a/yarn-project/p2p/package.json +++ b/yarn-project/p2p/package.json @@ -4,7 +4,7 @@ "type": "module", "exports": { ".": "./dest/index.js", - "./mocks": "./dest/mocks/index.js", + "./test-helpers": "./dest/test-helpers/index.js", "./bootstrap": "./dest/bootstrap/bootstrap.js", "./config": "./dest/config.js", "./msg_validators": "./dest/msg_validators/index.js" diff --git a/yarn-project/p2p/src/client/factory.ts b/yarn-project/p2p/src/client/factory.ts index d319f746c1b..9b054134962 100644 --- a/yarn-project/p2p/src/client/factory.ts +++ b/yarn-project/p2p/src/client/factory.ts @@ -4,8 +4,8 @@ import { P2PClientType, type WorldStateSynchronizer, } from '@aztec/circuit-types'; -import { type EpochCache } from '@aztec/epoch-cache'; -import { createLogger } from '@aztec/foundation/log'; +import { type EpochCacheInterface } from '@aztec/epoch-cache'; +import { type Logger, createLogger } from '@aztec/foundation/log'; import { type AztecAsyncKVStore } from '@aztec/kv-store'; import { type DataStoreConfig } from '@aztec/kv-store/config'; import { createStore } from '@aztec/kv-store/lmdb-v2'; @@ -29,6 +29,7 @@ type P2PClientDeps = { store?: AztecAsyncKVStore; attestationPool?: T extends P2PClientType.Full ? AttestationPool : undefined; epochProofQuotePool?: EpochProofQuotePool; + logger?: Logger; }; export const createP2PClient = async ( @@ -37,12 +38,12 @@ export const createP2PClient = async ( l2BlockSource: L2BlockSource, proofVerifier: ClientProtocolCircuitVerifier, worldStateSynchronizer: WorldStateSynchronizer, - epochCache: EpochCache, + epochCache: EpochCacheInterface, telemetry: TelemetryClient = getTelemetryClient(), deps: P2PClientDeps = {}, ) => { let config = { ..._config }; - const logger = createLogger('p2p'); + const logger = deps.logger ?? createLogger('p2p'); const store = deps.store ?? (await createStore('p2p', config, createLogger('p2p:lmdb-v2'))); const archive = await createStore('p2p-archive', config, createLogger('p2p-archive:lmdb-v2')); @@ -66,7 +67,12 @@ export const createP2PClient = async ( // Create peer discovery service const peerIdPrivateKey = await getPeerIdPrivateKey(config, store); const peerId = await createLibP2PPeerIdFromPrivateKey(peerIdPrivateKey); - const discoveryService = new DiscV5Service(peerId, config, telemetry); + const discoveryService = new DiscV5Service( + peerId, + config, + telemetry, + createLogger(`${logger.module}:discv5_service`), + ); p2pService = await LibP2PService.new( clientType, @@ -80,6 +86,7 @@ export const createP2PClient = async ( worldStateSynchronizer, store, telemetry, + createLogger(`${logger.module}:libp2p_service`), ); } else { logger.verbose('P2P is disabled. Using dummy P2P service'); diff --git a/yarn-project/p2p/src/client/p2p_client.integration.test.ts b/yarn-project/p2p/src/client/p2p_client.integration.test.ts new file mode 100644 index 00000000000..d2eb6f39efe --- /dev/null +++ b/yarn-project/p2p/src/client/p2p_client.integration.test.ts @@ -0,0 +1,204 @@ +// An integration test for the p2p client to test req resp protocols +import { PeerErrorSeverity, type Tx, type WorldStateSynchronizer, mockTx } from '@aztec/circuit-types'; +import { emptyChainConfig } from '@aztec/circuit-types/config'; +import { type EpochCache } from '@aztec/epoch-cache'; +import { sleep } from '@aztec/foundation/sleep'; + +import { describe, expect, it, jest } from '@jest/globals'; +import { type MockProxy, mock } from 'jest-mock-extended'; + +import { type P2PClient } from '../client/p2p_client.js'; +import { type P2PConfig, getP2PDefaultConfig } from '../config.js'; +import { type AttestationPool } from '../mem_pools/attestation_pool/attestation_pool.js'; +import { type EpochProofQuotePool } from '../mem_pools/epoch_proof_quote_pool/epoch_proof_quote_pool.js'; +import { type TxPool } from '../mem_pools/tx_pool/index.js'; +import { makeTestP2PClients } from '../test-helpers/make-test-p2p-clients.js'; + +const TEST_TIMEOUT = 80000; + +const NUMBER_OF_PEERS = 2; + +describe('p2p client integration', () => { + let txPool: MockProxy; + let attestationPool: MockProxy; + let epochProofQuotePool: MockProxy; + let epochCache: MockProxy; + let worldState: MockProxy; + + let p2pBaseConfig: P2PConfig; + + let clients: P2PClient[] = []; + + beforeEach(() => { + txPool = mock(); + attestationPool = mock(); + epochProofQuotePool = mock(); + epochCache = mock(); + worldState = mock(); + + p2pBaseConfig = { ...emptyChainConfig, ...getP2PDefaultConfig() }; + + txPool.getAllTxs.mockImplementation(() => { + return Promise.resolve([] as Tx[]); + }); + }); + + afterEach(async () => { + if (clients.length > 0) { + await shutdown(clients); + } + }); + + // Shutdown all test clients + const shutdown = async (clients: P2PClient[]) => { + await Promise.all([...clients.map(client => client.stop())]); + await sleep(1000); + clients = []; + }; + + describe('Req Resp', () => { + it( + 'Returns undefined if unable to find a transaction from another peer', + async () => { + // We want to create a set of nodes and request transaction from them + // Not using a before each as a the wind down is not working as expected + clients = await makeTestP2PClients(NUMBER_OF_PEERS, { + p2pBaseConfig: { ...emptyChainConfig, ...getP2PDefaultConfig() }, + mockAttestationPool: attestationPool, + mockEpochProofQuotePool: epochProofQuotePool, + mockTxPool: txPool, + mockEpochCache: epochCache, + mockWorldState: worldState, + }); + const [client1] = clients; + + await sleep(2000); + + // Perform a get tx request from client 1 + const tx = await mockTx(); + const txHash = await tx.getTxHash(); + + const requestedTx = await client1.requestTxByHash(txHash); + expect(requestedTx).toBeUndefined(); + + // await shutdown(clients, bootstrapNode); + await shutdown(clients); + }, + TEST_TIMEOUT, + ); + + it( + 'Can request a transaction from another peer', + async () => { + // We want to create a set of nodes and request transaction from them + clients = await makeTestP2PClients(NUMBER_OF_PEERS, { + p2pBaseConfig, + mockAttestationPool: attestationPool, + mockEpochProofQuotePool: epochProofQuotePool, + mockTxPool: txPool, + mockEpochCache: epochCache, + mockWorldState: worldState, + }); + const [client1] = clients; + + // Give the nodes time to discover each other + await sleep(6000); + + // Perform a get tx request from client 1 + const tx = await mockTx(); + const txHash = await tx.getTxHash(); + // Mock the tx pool to return the tx we are looking for + txPool.getTxByHash.mockImplementationOnce(() => Promise.resolve(tx)); + + const requestedTx = await client1.requestTxByHash(txHash); + + // Expect the tx to be the returned tx to be the same as the one we mocked + expect(requestedTx?.toBuffer()).toStrictEqual(tx.toBuffer()); + + await shutdown(clients); + }, + TEST_TIMEOUT, + ); + + it( + 'Will penalize peers that send invalid proofs', + async () => { + // We want to create a set of nodes and request transaction from them + clients = await makeTestP2PClients(NUMBER_OF_PEERS, { + p2pBaseConfig, + mockAttestationPool: attestationPool, + mockEpochProofQuotePool: epochProofQuotePool, + mockTxPool: txPool, + mockEpochCache: epochCache, + mockWorldState: worldState, + alwaysTrueVerifier: false, + }); + const [client1, client2] = clients; + const client2PeerId = await client2.getEnr()!.peerId(); + + // Give the nodes time to discover each other + await sleep(6000); + + const penalizePeerSpy = jest.spyOn((client1 as any).p2pService.peerManager, 'penalizePeer'); + + // Perform a get tx request from client 1 + const tx = await mockTx(); + const txHash = await tx.getTxHash(); + + // Return the correct tx with an invalid proof -> active attack + txPool.getTxByHash.mockImplementationOnce(() => Promise.resolve(tx)); + + const requestedTx = await client1.requestTxByHash(txHash); + // Even though we got a response, the proof was deemed invalid + expect(requestedTx).toBeUndefined(); + + // Low tolerance error is due to the invalid proof + expect(penalizePeerSpy).toHaveBeenCalledWith(client2PeerId, PeerErrorSeverity.LowToleranceError); + + await shutdown(clients); + }, + TEST_TIMEOUT, + ); + + it( + 'Will penalize peers that send the wrong transaction', + async () => { + // We want to create a set of nodes and request transaction from them + clients = await makeTestP2PClients(NUMBER_OF_PEERS, { + p2pBaseConfig, + mockAttestationPool: attestationPool, + mockEpochProofQuotePool: epochProofQuotePool, + mockTxPool: txPool, + mockEpochCache: epochCache, + mockWorldState: worldState, + alwaysTrueVerifier: true, + }); + const [client1, client2] = clients; + const client2PeerId = (await client2.getEnr()?.peerId())!; + + // Give the nodes time to discover each other + await sleep(6000); + + const penalizePeerSpy = jest.spyOn((client1 as any).p2pService.peerManager, 'penalizePeer'); + + // Perform a get tx request from client 1 + const tx = await mockTx(); + const txHash = await tx.getTxHash(); + const tx2 = await mockTx(420); + + // Return an invalid tx + txPool.getTxByHash.mockImplementationOnce(() => Promise.resolve(tx2)); + + const requestedTx = await client1.requestTxByHash(txHash); + // Even though we got a response, the proof was deemed invalid + expect(requestedTx).toBeUndefined(); + + // Received wrong tx + expect(penalizePeerSpy).toHaveBeenCalledWith(client2PeerId, PeerErrorSeverity.MidToleranceError); + + await shutdown(clients); + }, + TEST_TIMEOUT, + ); + }); +}); diff --git a/yarn-project/p2p/src/config.ts b/yarn-project/p2p/src/config.ts index c5be25ae8e5..16d02482d9b 100644 --- a/yarn-project/p2p/src/config.ts +++ b/yarn-project/p2p/src/config.ts @@ -30,6 +30,16 @@ export interface P2PConfig extends P2PReqRespConfig, ChainConfig { */ blockRequestBatchSize: number; + /** + * DEBUG: Disable message validation - for testing purposes only + */ + debugDisableMessageValidation: boolean; + + /** + * DEBUG: Disable colocation penalty - for testing purposes only + */ + debugDisableColocationPenalty: boolean; + /** * The frequency in which to check for new peers. */ @@ -119,6 +129,16 @@ export interface P2PConfig extends P2PReqRespConfig, ChainConfig { */ gossipsubDhi: number; + /** + * The Dlazy parameter for the gossipsub protocol. + */ + gossipsubDLazy: number; + + /** + * Whether to flood publish messages. - For testing purposes only + */ + gossipsubFloodPublish: boolean; + /** * The number of gossipsub interval message cache windows to keep. */ @@ -169,6 +189,16 @@ export const p2pConfigMappings: ConfigMappingsType = { description: 'The frequency in which to check for new L2 blocks.', ...numberConfigHelper(100), }, + debugDisableMessageValidation: { + env: 'DEBUG_P2P_DISABLE_MESSAGE_VALIDATION', + description: 'DEBUG: Disable message validation - NEVER set to true in production', + ...booleanConfigHelper(false), + }, + debugDisableColocationPenalty: { + env: 'DEBUG_P2P_DISABLE_COLOCATION_PENALTY', + description: 'DEBUG: Disable colocation penalty - NEVER set to true in production', + ...booleanConfigHelper(false), + }, peerCheckIntervalMS: { env: 'P2P_PEER_CHECK_INTERVAL_MS', description: 'The frequency in which to check for new peers.', @@ -248,7 +278,7 @@ export const p2pConfigMappings: ConfigMappingsType = { gossipsubInterval: { env: 'P2P_GOSSIPSUB_INTERVAL_MS', description: 'The interval of the gossipsub heartbeat to perform maintenance tasks.', - ...numberConfigHelper(1_000), + ...numberConfigHelper(700), }, gossipsubD: { env: 'P2P_GOSSIPSUB_D', @@ -265,10 +295,20 @@ export const p2pConfigMappings: ConfigMappingsType = { description: 'The Dhi parameter for the gossipsub protocol.', ...numberConfigHelper(12), }, + gossipsubDLazy: { + env: 'P2P_GOSSIPSUB_DLAZY', + description: 'The Dlazy parameter for the gossipsub protocol.', + ...numberConfigHelper(6), + }, + gossipsubFloodPublish: { + env: 'P2P_GOSSIPSUB_FLOOD_PUBLISH', + description: 'Whether to flood publish messages. - For testing purposes only', + ...booleanConfigHelper(true), + }, gossipsubMcacheLength: { env: 'P2P_GOSSIPSUB_MCACHE_LENGTH', description: 'The number of gossipsub interval message cache windows to keep.', - ...numberConfigHelper(5), + ...numberConfigHelper(6), }, gossipsubMcacheGossip: { env: 'P2P_GOSSIPSUB_MCACHE_GOSSIP', diff --git a/yarn-project/p2p/src/msg_validators/attestation_validator/attestation_validator.ts b/yarn-project/p2p/src/msg_validators/attestation_validator/attestation_validator.ts index 098e55a8e83..67d5207bc1c 100644 --- a/yarn-project/p2p/src/msg_validators/attestation_validator/attestation_validator.ts +++ b/yarn-project/p2p/src/msg_validators/attestation_validator/attestation_validator.ts @@ -1,10 +1,10 @@ import { type BlockAttestation, type P2PValidator, PeerErrorSeverity } from '@aztec/circuit-types'; -import { type EpochCache } from '@aztec/epoch-cache'; +import { type EpochCacheInterface } from '@aztec/epoch-cache'; export class AttestationValidator implements P2PValidator { - private epochCache: EpochCache; + private epochCache: EpochCacheInterface; - constructor(epochCache: EpochCache) { + constructor(epochCache: EpochCacheInterface) { this.epochCache = epochCache; } diff --git a/yarn-project/p2p/src/msg_validators/block_proposal_validator/block_proposal_validator.ts b/yarn-project/p2p/src/msg_validators/block_proposal_validator/block_proposal_validator.ts index e3f1da35a70..7286165b8e2 100644 --- a/yarn-project/p2p/src/msg_validators/block_proposal_validator/block_proposal_validator.ts +++ b/yarn-project/p2p/src/msg_validators/block_proposal_validator/block_proposal_validator.ts @@ -1,10 +1,10 @@ import { type BlockProposal, type P2PValidator, PeerErrorSeverity } from '@aztec/circuit-types'; -import { type EpochCache } from '@aztec/epoch-cache'; +import { type EpochCacheInterface } from '@aztec/epoch-cache'; export class BlockProposalValidator implements P2PValidator { - private epochCache: EpochCache; + private epochCache: EpochCacheInterface; - constructor(epochCache: EpochCache) { + constructor(epochCache: EpochCacheInterface) { this.epochCache = epochCache; } diff --git a/yarn-project/p2p/src/msg_validators/epoch_proof_quote_validator/epoch_proof_quote_validator.ts b/yarn-project/p2p/src/msg_validators/epoch_proof_quote_validator/epoch_proof_quote_validator.ts index 99eec5be73d..826334d18eb 100644 --- a/yarn-project/p2p/src/msg_validators/epoch_proof_quote_validator/epoch_proof_quote_validator.ts +++ b/yarn-project/p2p/src/msg_validators/epoch_proof_quote_validator/epoch_proof_quote_validator.ts @@ -1,10 +1,10 @@ import { type EpochProofQuote, type P2PValidator, PeerErrorSeverity } from '@aztec/circuit-types'; -import { type EpochCache } from '@aztec/epoch-cache'; +import { type EpochCacheInterface } from '@aztec/epoch-cache'; export class EpochProofQuoteValidator implements P2PValidator { - private epochCache: EpochCache; + private epochCache: EpochCacheInterface; - constructor(epochCache: EpochCache) { + constructor(epochCache: EpochCacheInterface) { this.epochCache = epochCache; } diff --git a/yarn-project/p2p/src/services/discv5/discV5_service.ts b/yarn-project/p2p/src/services/discv5/discV5_service.ts index 7ecf7c331eb..cc0964b8bd9 100644 --- a/yarn-project/p2p/src/services/discv5/discV5_service.ts +++ b/yarn-project/p2p/src/services/discv5/discV5_service.ts @@ -35,7 +35,7 @@ export class DiscV5Service extends EventEmitter implements PeerDiscoveryService private currentState = PeerDiscoveryState.STOPPED; - private bootstrapNodes: string[]; + public readonly bootstrapNodes: string[] = []; private bootstrapNodePeerIds: PeerId[] = []; private startTime = 0; @@ -48,7 +48,7 @@ export class DiscV5Service extends EventEmitter implements PeerDiscoveryService ) { super(); const { tcpAnnounceAddress, udpAnnounceAddress, udpListenAddress, bootstrapNodes } = config; - this.bootstrapNodes = bootstrapNodes; + this.bootstrapNodes = bootstrapNodes ?? []; // create ENR from PeerId this.enr = SignableENR.createFromPeerId(peerId); // Add aztec identification to ENR diff --git a/yarn-project/p2p/src/services/dummy_service.ts b/yarn-project/p2p/src/services/dummy_service.ts index 1f74cc796f6..68d3167b766 100644 --- a/yarn-project/p2p/src/services/dummy_service.ts +++ b/yarn-project/p2p/src/services/dummy_service.ts @@ -88,6 +88,8 @@ export class DummyP2PService implements P2PService { */ export class DummyPeerDiscoveryService extends EventEmitter implements PeerDiscoveryService { private currentState = PeerDiscoveryState.STOPPED; + public bootstrapNodes: string[] = []; + /** * Starts the dummy implementation. * @returns A resolved promise. diff --git a/yarn-project/p2p/src/services/libp2p/libp2p_logger.ts b/yarn-project/p2p/src/services/libp2p/libp2p_logger.ts new file mode 100644 index 00000000000..02dc823c663 --- /dev/null +++ b/yarn-project/p2p/src/services/libp2p/libp2p_logger.ts @@ -0,0 +1,78 @@ +import { createLogger } from '@aztec/foundation/log'; + +import { type ComponentLogger, type Logger } from '@libp2p/interface'; + +/** + * Creates a libp2p compatible logger that wraps our pino logger. + * This adapter implements the ComponentLogger interface required by libp2p. + */ +export function createLibp2pComponentLogger(namespace: string, fixedTerms = {}): ComponentLogger { + return { + forComponent: (component: string) => createLibp2pLogger(`${namespace}:${component}`, fixedTerms), + }; +} + +function createLibp2pLogger(component: string, fixedTerms = {}): Logger { + const logger = createLogger(component, fixedTerms); + + // Default log level is trace as this is super super noisy + const logFn = (formatter: any, ...args: any[]) => { + // Handle %p format specifier by manually replacing with args + if (typeof formatter === 'string' && args.length > 0) { + // Handle %p, %a, %s and %d format specifiers + const parts = formatter.split(/(%p|%a|%s|%d)/); + let result = parts[0]; + let argIndex = 0; + + for (let i = 1; i < parts.length; i += 2) { + if (argIndex < args.length) { + result += String(args[argIndex]) + (parts[i + 1] || ''); + argIndex++; + } + } + + formatter = result; + // Only keep non-format args as data + args = args.slice(argIndex); + } + + // Handle object args by spreading them, but only if they weren't used in formatting + if (args.length === 1 && typeof args[0] === 'object') { + logger.trace(formatter, args[0]); + } else if (args.length > 0) { + // If we have remaining args after formatting, pass them as data + logger.trace(formatter, { _args: args }); + } else { + logger.trace(formatter); + } + }; + + return Object.assign(logFn, { + enabled: logger.isLevelEnabled('debug'), + + error(...args: any[]) { + const [msg, ...rest] = args; + logger.error(msg as string, ...rest); + }, + + debug(...args: any[]) { + const [msg, ...rest] = args; + logger.debug(msg as string, ...rest); + }, + + info(...args: any[]) { + const [msg, ...rest] = args; + logger.info(msg as string, ...rest); + }, + + warn(...args: any[]) { + const [msg, ...rest] = args; + logger.warn(msg as string, ...rest); + }, + + trace(...args: any[]) { + const [msg, ...rest] = args; + logger.trace(msg as string, ...rest); + }, + }); +} diff --git a/yarn-project/p2p/src/services/libp2p/libp2p_service.ts b/yarn-project/p2p/src/services/libp2p/libp2p_service.ts index b30c5760328..dbfb9dc1ec5 100644 --- a/yarn-project/p2p/src/services/libp2p/libp2p_service.ts +++ b/yarn-project/p2p/src/services/libp2p/libp2p_service.ts @@ -19,7 +19,7 @@ import { metricsTopicStrToLabels, } from '@aztec/circuit-types'; import { Fr } from '@aztec/circuits.js'; -import { type EpochCache } from '@aztec/epoch-cache'; +import { type EpochCacheInterface } from '@aztec/epoch-cache'; import { createLogger } from '@aztec/foundation/log'; import { SerialQueue } from '@aztec/foundation/queue'; import { RunningPromise } from '@aztec/foundation/running-promise'; @@ -34,8 +34,10 @@ import { gossipsub, } from '@chainsafe/libp2p-gossipsub'; import { createPeerScoreParams, createTopicScoreParams } from '@chainsafe/libp2p-gossipsub/score'; +import { SignaturePolicy } from '@chainsafe/libp2p-gossipsub/types'; import { noise } from '@chainsafe/libp2p-noise'; import { yamux } from '@chainsafe/libp2p-yamux'; +import { bootstrap } from '@libp2p/bootstrap'; import { identify } from '@libp2p/identify'; import { type Message, type PeerId, TopicValidatorResult } from '@libp2p/interface'; import { type ConnectionManager } from '@libp2p/interface-internal'; @@ -65,6 +67,7 @@ import { pingHandler, reqRespBlockHandler, reqRespTxHandler, statusHandler } fro import { ReqResp } from '../reqresp/reqresp.js'; import type { P2PService, PeerDiscoveryService } from '../service.js'; import { GossipSubEvent } from '../types.js'; +import { createLibp2pComponentLogger } from './libp2p_logger.js'; interface MessageValidator { validator: { @@ -111,7 +114,7 @@ export class LibP2PService extends WithTracer implement private peerDiscoveryService: PeerDiscoveryService, private mempools: MemPools, private l2BlockSource: L2BlockSource, - epochCache: EpochCache, + epochCache: EpochCacheInterface, private proofVerifier: ClientProtocolCircuitVerifier, private worldStateSynchronizer: WorldStateSynchronizer, telemetry: TelemetryClient, @@ -127,7 +130,7 @@ export class LibP2PService extends WithTracer implement peerDiscoveryService, config, telemetry, - logger, + createLogger(`${logger.module}:peer_manager`), peerScoring, this.reqresp, ); @@ -164,11 +167,12 @@ export class LibP2PService extends WithTracer implement peerId: PeerId, mempools: MemPools, l2BlockSource: L2BlockSource, - epochCache: EpochCache, + epochCache: EpochCacheInterface, proofVerifier: ClientProtocolCircuitVerifier, worldStateSynchronizer: WorldStateSynchronizer, store: AztecAsyncKVStore, telemetry: TelemetryClient, + logger = createLogger('p2p:libp2p_service'), ) { const { tcpListenAddress, tcpAnnounceAddress, minPeerCount, maxPeerCount } = config; const bindAddrTcp = convertToMultiaddr(tcpListenAddress, 'tcp'); @@ -179,6 +183,12 @@ export class LibP2PService extends WithTracer implement const otelMetricsAdapter = new OtelMetricsAdapter(telemetry); + // If bootstrap nodes are provided, also provide them to the p2p service + const peerDiscovery = []; + if (peerDiscoveryService.bootstrapNodes.length > 0) { + peerDiscovery.push(bootstrap({ list: peerDiscoveryService.bootstrapNodes })); + } + const node = await createLibp2p({ start: false, peerId, @@ -200,24 +210,35 @@ export class LibP2PService extends WithTracer implement }), ], datastore, - streamMuxers: [yamux(), mplex()], + peerDiscovery, + streamMuxers: [mplex(), yamux()], connectionEncryption: [noise()], connectionManager: { minConnections: minPeerCount, maxConnections: maxPeerCount, + + maxParallelDials: 100, + maxPeerAddrsToDial: 5, + maxIncomingPendingConnections: 5, }, services: { identify: identify({ protocolPrefix: 'aztec', }), pubsub: gossipsub({ + debugName: 'gossipsub', + globalSignaturePolicy: SignaturePolicy.StrictNoSign, allowPublishToZeroTopicPeers: true, + floodPublish: config.gossipsubFloodPublish, D: config.gossipsubD, Dlo: config.gossipsubDlo, Dhi: config.gossipsubDhi, + Dlazy: config.gossipsubDLazy, heartbeatInterval: config.gossipsubInterval, mcacheLength: config.gossipsubMcacheLength, mcacheGossip: config.gossipsubMcacheGossip, + // Increased from default 3s to give time for input lag: configuration and rationale from lodestar + gossipsubIWantFollowupMs: 12 * 1000, msgIdFn: getMsgIdFn, msgIdToStrFn: msgIdToStrFn, fastMsgIdFn: fastMsgIdFn, @@ -226,6 +247,8 @@ export class LibP2PService extends WithTracer implement metricsTopicStrToLabel: metricsTopicStrToLabels(), asyncValidation: true, scoreParams: createPeerScoreParams({ + // IPColocation factor can be disabled for local testing - default to -5 + IPColocationFactorWeight: config.debugDisableColocationPenalty ? 0 : -5.0, topics: { [Tx.p2pTopic]: createTopicScoreParams({ topicWeight: 1, @@ -254,6 +277,8 @@ export class LibP2PService extends WithTracer implement connectionManager: components.connectionManager, }), }, + // Fix the peer id in libp2p logs so we can see the source of the log + logger: createLibp2pComponentLogger(logger.module, { sourcePeerId: peerId }), }); return new LibP2PService( @@ -267,6 +292,7 @@ export class LibP2PService extends WithTracer implement proofVerifier, worldStateSynchronizer, telemetry, + logger, ); } @@ -319,8 +345,16 @@ export class LibP2PService extends WithTracer implement [BlockProposal.p2pTopic]: this.validatePropagatedBlockFromMessage.bind(this), [EpochProofQuote.p2pTopic]: this.validatePropagatedEpochProofQuoteFromMessage.bind(this), }; - for (const [topic, validator] of Object.entries(topicValidators)) { - this.node.services.pubsub.topicValidators.set(topic, validator); + // When running bandwidth benchmarks, we use send blobs of data we do not want to validate + // NEVER switch this off in production + if (!this.config.debugDisableMessageValidation) { + for (const [topic, validator] of Object.entries(topicValidators)) { + this.node.services.pubsub.topicValidators.set(topic, validator); + } + } else { + this.logger.warn( + 'MESSAGE VALIDATION DISABLED - IF YOU SEE THIS LOG AND ARE NOT DEBUGGING AND ARE RUNNING IN A PRODUCTION ENVIRONMENT, PLEASE RE-ENABLE MESSAGE VALIDATION', + ); } // add GossipSub listener @@ -904,7 +938,10 @@ export class LibP2PService extends WithTracer implement this.logger.trace(`Sending message ${identifier}`, { p2pMessageIdentifier: identifier }); const recipientsNum = await this.publishToTopic(parent.p2pTopic, message.toBuffer()); - this.logger.debug(`Sent message ${identifier} to ${recipientsNum} peers`, { p2pMessageIdentifier: identifier }); + this.logger.debug(`Sent message ${identifier} to ${recipientsNum} peers`, { + p2pMessageIdentifier: identifier, + sourcePeer: this.node.peerId.toString(), + }); } // Libp2p seems to hang sometimes if new peers are initiating connections. diff --git a/yarn-project/p2p/src/services/reqresp/protocols/goodbye.ts b/yarn-project/p2p/src/services/reqresp/protocols/goodbye.ts index 888e9e8d2cd..ce91853f988 100644 --- a/yarn-project/p2p/src/services/reqresp/protocols/goodbye.ts +++ b/yarn-project/p2p/src/services/reqresp/protocols/goodbye.ts @@ -95,7 +95,7 @@ export function reqGoodbyeHandler(peerManager: PeerManager): ReqRespSubProtocolH peerManager.goodbyeReceived(peerId, reason); - // Return a buffer of length 1 as an acknowledgement + // Return a buffer of length 1 as an acknowledgement: this is allowed to fail return Promise.resolve(Buffer.from([0x0])); }; } diff --git a/yarn-project/p2p/src/services/reqresp/reqresp.integration.test.ts b/yarn-project/p2p/src/services/reqresp/reqresp.integration.test.ts deleted file mode 100644 index e9b31b8a9d6..00000000000 --- a/yarn-project/p2p/src/services/reqresp/reqresp.integration.test.ts +++ /dev/null @@ -1,277 +0,0 @@ -// An integration test for the p2p client to test req resp protocols -import { MockL2BlockSource } from '@aztec/archiver/test'; -import { - type ClientProtocolCircuitVerifier, - P2PClientType, - PeerErrorSeverity, - type Tx, - type WorldStateSynchronizer, - mockTx, -} from '@aztec/circuit-types'; -import { emptyChainConfig } from '@aztec/circuit-types/config'; -import { type EpochCache } from '@aztec/epoch-cache'; -import { type Logger, createLogger } from '@aztec/foundation/log'; -import { sleep } from '@aztec/foundation/sleep'; -import { type AztecAsyncKVStore } from '@aztec/kv-store'; -import { type DataStoreConfig } from '@aztec/kv-store/config'; -import { openTmpStore } from '@aztec/kv-store/lmdb-v2'; - -import { SignableENR } from '@chainsafe/enr'; -import { describe, expect, it, jest } from '@jest/globals'; -import { multiaddr } from '@multiformats/multiaddr'; -import getPort from 'get-port'; -import { type MockProxy, mock } from 'jest-mock-extended'; -import { generatePrivateKey } from 'viem/accounts'; - -import { createP2PClient } from '../../client/index.js'; -import { type P2PClient } from '../../client/p2p_client.js'; -import { type P2PConfig, getP2PDefaultConfig } from '../../config.js'; -import { type AttestationPool } from '../../mem_pools/attestation_pool/attestation_pool.js'; -import { type EpochProofQuotePool } from '../../mem_pools/epoch_proof_quote_pool/epoch_proof_quote_pool.js'; -import { type TxPool } from '../../mem_pools/tx_pool/index.js'; -import { AlwaysFalseCircuitVerifier, AlwaysTrueCircuitVerifier } from '../../mocks/index.js'; -import { convertToMultiaddr, createLibP2PPeerIdFromPrivateKey } from '../../util.js'; -import { setAztecEnrKey } from '../../versioning.js'; - -const TEST_TIMEOUT = 80000; - -function generatePeerIdPrivateKeys(numberOfPeers: number): string[] { - const peerIdPrivateKeys: string[] = []; - for (let i = 0; i < numberOfPeers; i++) { - // magic number is multiaddr prefix: https://multiformats.io/multiaddr/ - peerIdPrivateKeys.push('08021220' + generatePrivateKey().slice(2, 68)); - } - return peerIdPrivateKeys; -} - -const NUMBER_OF_PEERS = 2; - -describe('Req Resp p2p client integration', () => { - let txPool: MockProxy; - let attestationPool: MockProxy; - let epochProofQuotePool: MockProxy; - let epochCache: MockProxy; - let l2BlockSource: MockL2BlockSource; - let kvStore: AztecAsyncKVStore; - let worldState: WorldStateSynchronizer; - let proofVerifier: ClientProtocolCircuitVerifier; - let baseConfig: P2PConfig; - let logger: Logger; - - beforeEach(() => { - txPool = mock(); - attestationPool = mock(); - epochProofQuotePool = mock(); - epochCache = mock(); - - baseConfig = { ...emptyChainConfig, ...getP2PDefaultConfig() }; - logger = createLogger('p2p:test:client-integration'); - - txPool.getAllTxs.mockImplementation(() => { - return Promise.resolve([] as Tx[]); - }); - }); - - const getPorts = (numberOfPeers: number) => Promise.all(Array.from({ length: numberOfPeers }, () => getPort())); - - const createClients = async (numberOfPeers: number, alwaysTrueVerifier: boolean = true): Promise => { - const clients: P2PClient[] = []; - const peerIdPrivateKeys = generatePeerIdPrivateKeys(numberOfPeers); - - const ports = await getPorts(numberOfPeers); - - const peerEnrs = await Promise.all( - peerIdPrivateKeys.map(async (pk, i) => { - const peerId = await createLibP2PPeerIdFromPrivateKey(pk); - const enr = SignableENR.createFromPeerId(peerId); - - const udpAnnounceAddress = `127.0.0.1:${ports[i]}`; - const tcpAnnounceAddress = `127.0.0.1:${ports[i]}`; - const udpPublicAddr = multiaddr(convertToMultiaddr(udpAnnounceAddress, 'udp')); - const tcpPublicAddr = multiaddr(convertToMultiaddr(tcpAnnounceAddress, 'tcp')); - - // ENRS must include the network and a discoverable address (udp for discv5) - setAztecEnrKey(enr, baseConfig); - enr.setLocationMultiaddr(udpPublicAddr); - enr.setLocationMultiaddr(tcpPublicAddr); - - return enr.encodeTxt(); - }), - ); - - for (let i = 0; i < numberOfPeers; i++) { - // Note these bindings are important - const addr = `127.0.0.1:${ports[i]}`; - const listenAddr = `0.0.0.0:${ports[i]}`; - - // Filter nodes so that we only dial active peers - const otherNodes = peerEnrs.filter((_, ind) => ind < i); - - const config: P2PConfig & DataStoreConfig = { - ...baseConfig, - p2pEnabled: true, - peerIdPrivateKey: peerIdPrivateKeys[i], - tcpListenAddress: listenAddr, // run on port 0 - udpListenAddress: listenAddr, - tcpAnnounceAddress: addr, - udpAnnounceAddress: addr, - bootstrapNodes: [...otherNodes], - peerCheckIntervalMS: 1000, - minPeerCount: 1, - maxPeerCount: 10, - } as P2PConfig & DataStoreConfig; - - l2BlockSource = new MockL2BlockSource(); - await l2BlockSource.createBlocks(100); - - proofVerifier = alwaysTrueVerifier ? new AlwaysTrueCircuitVerifier() : new AlwaysFalseCircuitVerifier(); - kvStore = await openTmpStore('test'); - const deps = { - txPool: txPool as unknown as TxPool, - attestationPool: attestationPool as unknown as AttestationPool, - epochProofQuotePool: epochProofQuotePool as unknown as EpochProofQuotePool, - store: kvStore, - }; - const client = await createP2PClient( - P2PClientType.Full, - config, - l2BlockSource, - proofVerifier, - worldState, - epochCache, - undefined, - deps, - ); - - await client.start(); - clients.push(client); - - logger.info(`Creating client ${i}`); - } - - logger.info(`Created ${NUMBER_OF_PEERS} clients`); - await Promise.all(clients.map(client => client.isReady())); - logger.info(`Clients ready`); - return clients; - }; - - // Shutdown all test clients - const shutdown = async (clients: P2PClient[]) => { - await Promise.all([...clients.map(client => client.stop())]); - await sleep(1000); - }; - - it( - 'Returns undefined if unable to find a transaction from another peer', - async () => { - // We want to create a set of nodes and request transaction from them - // Not using a before each as a the wind down is not working as expected - const clients = await createClients(NUMBER_OF_PEERS); - const [client1] = clients; - - await sleep(2000); - - // Perform a get tx request from client 1 - const tx = await mockTx(); - const txHash = await tx.getTxHash(); - - const requestedTx = await client1.requestTxByHash(txHash); - expect(requestedTx).toBeUndefined(); - - // await shutdown(clients, bootstrapNode); - await shutdown(clients); - }, - TEST_TIMEOUT, - ); - - it( - 'Can request a transaction from another peer', - async () => { - // We want to create a set of nodes and request transaction from them - const clients = await createClients(NUMBER_OF_PEERS); - const [client1] = clients; - - // Give the nodes time to discover each other - await sleep(6000); - - // Perform a get tx request from client 1 - const tx = await mockTx(); - const txHash = await tx.getTxHash(); - // Mock the tx pool to return the tx we are looking for - txPool.getTxByHash.mockImplementationOnce(() => Promise.resolve(tx)); - - const requestedTx = await client1.requestTxByHash(txHash); - - // Expect the tx to be the returned tx to be the same as the one we mocked - expect(requestedTx?.toBuffer()).toStrictEqual(tx.toBuffer()); - - await shutdown(clients); - }, - TEST_TIMEOUT, - ); - - it( - 'Will penalize peers that send invalid proofs', - async () => { - // We want to create a set of nodes and request transaction from them - const clients = await createClients(NUMBER_OF_PEERS, /*valid proofs*/ false); - const [client1, client2] = clients; - const client2PeerId = await client2.getEnr()!.peerId(); - - // Give the nodes time to discover each other - await sleep(6000); - - const penalizePeerSpy = jest.spyOn((client1 as any).p2pService.peerManager, 'penalizePeer'); - - // Perform a get tx request from client 1 - const tx = await mockTx(); - const txHash = await tx.getTxHash(); - - // Return the correct tx with an invalid proof -> active attack - txPool.getTxByHash.mockImplementationOnce(() => Promise.resolve(tx)); - - const requestedTx = await client1.requestTxByHash(txHash); - // Even though we got a response, the proof was deemed invalid - expect(requestedTx).toBeUndefined(); - - // Low tolerance error is due to the invalid proof - expect(penalizePeerSpy).toHaveBeenCalledWith(client2PeerId, PeerErrorSeverity.LowToleranceError); - - await shutdown(clients); - }, - TEST_TIMEOUT, - ); - - it( - 'Will penalize peers that send the wrong transaction', - async () => { - // We want to create a set of nodes and request transaction from them - const clients = await createClients(NUMBER_OF_PEERS, /*Valid proofs*/ true); - const [client1, client2] = clients; - const client2PeerId = (await client2.getEnr()?.peerId())!; - - // Give the nodes time to discover each other - await sleep(6000); - - const penalizePeerSpy = jest.spyOn((client1 as any).p2pService.peerManager, 'penalizePeer'); - - // Perform a get tx request from client 1 - const tx = await mockTx(); - const txHash = await tx.getTxHash(); - const tx2 = await mockTx(420); - - // Return an invalid tx - txPool.getTxByHash.mockImplementationOnce(() => Promise.resolve(tx2)); - - const requestedTx = await client1.requestTxByHash(txHash); - // Even though we got a response, the proof was deemed invalid - expect(requestedTx).toBeUndefined(); - - // Received wrong tx - expect(penalizePeerSpy).toHaveBeenCalledWith(client2PeerId, PeerErrorSeverity.MidToleranceError); - - await shutdown(clients); - }, - TEST_TIMEOUT, - ); -}); diff --git a/yarn-project/p2p/src/services/reqresp/reqresp.test.ts b/yarn-project/p2p/src/services/reqresp/reqresp.test.ts index e441bad2efb..50693859a10 100644 --- a/yarn-project/p2p/src/services/reqresp/reqresp.test.ts +++ b/yarn-project/p2p/src/services/reqresp/reqresp.test.ts @@ -16,7 +16,7 @@ import { createNodes, startNodes, stopNodes, -} from '../../mocks/index.js'; +} from '../../test-helpers/reqresp-nodes.js'; import { type PeerManager } from '../peer-manager/peer_manager.js'; import { type PeerScoring } from '../peer-manager/peer_scoring.js'; import { ReqRespSubProtocol, RequestableBuffer } from './interface.js'; @@ -354,9 +354,8 @@ describe('ReqResp', () => { GoodByeReason.SHUTDOWN, ); - // Expect the response to be a buffer of length 1 - expect(response?.status).toEqual(ReqRespStatus.SUCCESS); - expect(response?.data).toEqual(Buffer.from([0x0])); + // Expect no response to be sent - we categorize as unknown + expect(response?.status).toEqual(ReqRespStatus.UNKNOWN); }); }); diff --git a/yarn-project/p2p/src/services/reqresp/reqresp.ts b/yarn-project/p2p/src/services/reqresp/reqresp.ts index ea82bdfa4ab..677bf94ff49 100644 --- a/yarn-project/p2p/src/services/reqresp/reqresp.ts +++ b/yarn-project/p2p/src/services/reqresp/reqresp.ts @@ -598,6 +598,8 @@ export class ReqResp { const handler = this.subProtocolHandlers[protocol]; const transform = this.snappyTransform; + this.logger.info(`Stream handler for ${protocol}`); + await pipe( stream, async function* (source: any) { @@ -605,6 +607,12 @@ export class ReqResp { const msg = Buffer.from(chunkList.subarray()); const response = await handler(connection.remotePeer, msg); + if (protocol === ReqRespSubProtocol.GOODBYE) { + // Don't respond + await stream.close(); + return; + } + // Send success code first, then the response const successChunk = Buffer.from([ReqRespStatus.SUCCESS]); yield new Uint8Array(successChunk); @@ -615,7 +623,7 @@ export class ReqResp { stream, ); } catch (e: any) { - this.logger.warn(e); + this.logger.warn('Reqresp Response error: ', e); this.metrics.recordResponseError(protocol); // If we receive a known error, we use the error status in the response chunk, otherwise we categorize as unknown diff --git a/yarn-project/p2p/src/services/service.ts b/yarn-project/p2p/src/services/service.ts index 91169b49094..d911b5699bb 100644 --- a/yarn-project/p2p/src/services/service.ts +++ b/yarn-project/p2p/src/services/service.ts @@ -106,4 +106,6 @@ export interface PeerDiscoveryService extends EventEmitter { getStatus(): PeerDiscoveryState; getEnr(): ENR | undefined; + + bootstrapNodes: string[]; } diff --git a/yarn-project/p2p/src/test-helpers/generate-peer-id-private-keys.ts b/yarn-project/p2p/src/test-helpers/generate-peer-id-private-keys.ts new file mode 100644 index 00000000000..cfeb1f82c3d --- /dev/null +++ b/yarn-project/p2p/src/test-helpers/generate-peer-id-private-keys.ts @@ -0,0 +1,15 @@ +import { generatePrivateKey } from 'viem/accounts'; + +/** + * Generate a list of peer id private keys + * @param numberOfPeers - The number of peer id private keys to generate + * @returns A list of peer id private keys + */ +export function generatePeerIdPrivateKeys(numberOfPeers: number): string[] { + const peerIdPrivateKeys: string[] = []; + for (let i = 0; i < numberOfPeers; i++) { + // magic number is multiaddr prefix: https://multiformats.io/multiaddr/ + peerIdPrivateKeys.push('08021220' + generatePrivateKey().slice(2, 68)); + } + return peerIdPrivateKeys; +} diff --git a/yarn-project/p2p/src/test-helpers/get-ports.ts b/yarn-project/p2p/src/test-helpers/get-ports.ts new file mode 100644 index 00000000000..d71f425cca6 --- /dev/null +++ b/yarn-project/p2p/src/test-helpers/get-ports.ts @@ -0,0 +1,8 @@ +import getPort from 'get-port'; + +/** + * Get a list of ports for a given number of peers + * @param numberOfPeers - The number of peers to get ports for + * @returns A list of ports + */ +export const getPorts = (numberOfPeers: number) => Promise.all(Array.from({ length: numberOfPeers }, () => getPort())); diff --git a/yarn-project/p2p/src/test-helpers/index.ts b/yarn-project/p2p/src/test-helpers/index.ts new file mode 100644 index 00000000000..7003fb36451 --- /dev/null +++ b/yarn-project/p2p/src/test-helpers/index.ts @@ -0,0 +1,5 @@ +export * from './generate-peer-id-private-keys.js'; +export * from './get-ports.js'; +export * from './make-enrs.js'; +export * from './make-test-p2p-clients.js'; +export * from './reqresp-nodes.js'; diff --git a/yarn-project/p2p/src/test-helpers/make-enrs.ts b/yarn-project/p2p/src/test-helpers/make-enrs.ts new file mode 100644 index 00000000000..854a2d7e676 --- /dev/null +++ b/yarn-project/p2p/src/test-helpers/make-enrs.ts @@ -0,0 +1,44 @@ +import { type ChainConfig } from '@aztec/circuit-types/config'; + +import { SignableENR } from '@chainsafe/enr'; +import { multiaddr } from '@multiformats/multiaddr'; + +import { convertToMultiaddr, createLibP2PPeerIdFromPrivateKey } from '../util.js'; +import { setAztecEnrKey } from '../versioning.js'; + +/** + * Make a list of ENRs for a given list of p2p private keys and ports + * @param p2pPrivateKeys - The private keys of the p2p nodes + * @param ports - The ports of the p2p nodes + * @returns A list of ENRs + */ +export async function makeEnrs(p2pPrivateKeys: string[], ports: number[], config: ChainConfig) { + return await Promise.all( + p2pPrivateKeys.map((pk, i) => { + return makeEnr(pk, ports[i], config); + }), + ); +} + +/** + * Make an ENR for a given p2p private key and port + * @param p2pPrivateKey - The private key of the p2p node + * @param port - The port of the p2p node + * @returns The ENR of the p2p node + */ +export async function makeEnr(p2pPrivateKey: string, port: number, config: ChainConfig) { + const peerId = await createLibP2PPeerIdFromPrivateKey(p2pPrivateKey); + const enr = SignableENR.createFromPeerId(peerId); + + const udpAnnounceAddress = `127.0.0.1:${port}`; + const tcpAnnounceAddress = `127.0.0.1:${port}`; + const udpPublicAddr = multiaddr(convertToMultiaddr(udpAnnounceAddress, 'udp')); + const tcpPublicAddr = multiaddr(convertToMultiaddr(tcpAnnounceAddress, 'tcp')); + + // ENRS must include the network and a discoverable address (udp for discv5) + setAztecEnrKey(enr, config); + enr.setLocationMultiaddr(udpPublicAddr); + enr.setLocationMultiaddr(tcpPublicAddr); + + return enr.encodeTxt(); +} diff --git a/yarn-project/p2p/src/test-helpers/make-test-p2p-clients.ts b/yarn-project/p2p/src/test-helpers/make-test-p2p-clients.ts new file mode 100644 index 00000000000..a5d70c26df2 --- /dev/null +++ b/yarn-project/p2p/src/test-helpers/make-test-p2p-clients.ts @@ -0,0 +1,125 @@ +import { MockL2BlockSource } from '@aztec/archiver/test'; +import { P2PClientType, type WorldStateSynchronizer } from '@aztec/circuit-types'; +import { type EpochCache } from '@aztec/epoch-cache'; +import { type Logger, createLogger } from '@aztec/foundation/log'; +import { type DataStoreConfig } from '@aztec/kv-store/config'; +import { openTmpStore } from '@aztec/kv-store/lmdb-v2'; + +import { createP2PClient } from '../client/index.js'; +import { type P2PClient } from '../client/p2p_client.js'; +import { type P2PConfig } from '../config.js'; +import { type AttestationPool } from '../mem_pools/attestation_pool/attestation_pool.js'; +import { type EpochProofQuotePool } from '../mem_pools/epoch_proof_quote_pool/epoch_proof_quote_pool.js'; +import { type TxPool } from '../mem_pools/tx_pool/index.js'; +import { generatePeerIdPrivateKeys } from '../test-helpers/generate-peer-id-private-keys.js'; +import { getPorts } from './get-ports.js'; +import { makeEnrs } from './make-enrs.js'; +import { AlwaysFalseCircuitVerifier, AlwaysTrueCircuitVerifier } from './reqresp-nodes.js'; + +interface MakeTestP2PClientOptions { + mockAttestationPool: AttestationPool; + mockEpochProofQuotePool: EpochProofQuotePool; + mockTxPool: TxPool; + mockEpochCache: EpochCache; + mockWorldState: WorldStateSynchronizer; + alwaysTrueVerifier?: boolean; + p2pBaseConfig: P2PConfig; + p2pConfigOverrides?: Partial; + logger?: Logger; +} + +/** + * Creates a single P2P client for testing purposes. + * @param peerIdPrivateKey - The private key of the peer. + * @param port - The port to run the client on. + * @param peers - The peers to connect to. + * @param options - The options for the client. + * @returns The created client. + */ +export async function makeTestP2PClient( + peerIdPrivateKey: string, + port: number, + peers: string[], + { + alwaysTrueVerifier = true, + p2pBaseConfig, + p2pConfigOverrides = {}, + mockAttestationPool, + mockEpochProofQuotePool, + mockTxPool, + mockEpochCache, + mockWorldState, + logger = createLogger('p2p-test-client'), + }: MakeTestP2PClientOptions, +) { + const addr = `127.0.0.1:${port}`; + const listenAddr = `0.0.0.0:${port}`; + + // Filter nodes so that we only dial active peers + + const config: P2PConfig & DataStoreConfig = { + ...p2pBaseConfig, + p2pEnabled: true, + peerIdPrivateKey, + tcpListenAddress: listenAddr, // run on port 0 + udpListenAddress: listenAddr, + tcpAnnounceAddress: addr, + udpAnnounceAddress: addr, + bootstrapNodes: peers, + peerCheckIntervalMS: 1000, + minPeerCount: 1, + maxPeerCount: 10, + ...p2pConfigOverrides, + } as P2PConfig & DataStoreConfig; + + const l2BlockSource = new MockL2BlockSource(); + await l2BlockSource.createBlocks(100); + + const proofVerifier = alwaysTrueVerifier ? new AlwaysTrueCircuitVerifier() : new AlwaysFalseCircuitVerifier(); + const kvStore = await openTmpStore('test'); + const deps = { + txPool: mockTxPool as unknown as TxPool, + attestationPool: mockAttestationPool as unknown as AttestationPool, + epochProofQuotePool: mockEpochProofQuotePool as unknown as EpochProofQuotePool, + store: kvStore, + logger, + }; + const client = await createP2PClient( + P2PClientType.Full, + config, + l2BlockSource, + proofVerifier, + mockWorldState, + mockEpochCache, + undefined, + deps, + ); + await client.start(); + + return client; +} + +/** + * Creates a number of P2P clients for testing purposes. + * @param numberOfPeers - The number of clients to create. + * @param options - The options for the clients. + * @returns The created clients. + */ +export async function makeTestP2PClients(numberOfPeers: number, testConfig: MakeTestP2PClientOptions) { + const clients: P2PClient[] = []; + const peerIdPrivateKeys = generatePeerIdPrivateKeys(numberOfPeers); + + const ports = await getPorts(numberOfPeers); + const peerEnrs = await makeEnrs(peerIdPrivateKeys, ports, testConfig.p2pBaseConfig); + + for (let i = 0; i < numberOfPeers; i++) { + const client = await makeTestP2PClient(peerIdPrivateKeys[i], ports[i], peerEnrs, { + ...testConfig, + logger: createLogger(`p2p:${i}`), + }); + clients.push(client); + } + + await Promise.all(clients.map(client => client.isReady())); + return clients; +} diff --git a/yarn-project/p2p/src/mocks/index.ts b/yarn-project/p2p/src/test-helpers/reqresp-nodes.ts similarity index 100% rename from yarn-project/p2p/src/mocks/index.ts rename to yarn-project/p2p/src/test-helpers/reqresp-nodes.ts diff --git a/yarn-project/p2p/src/testbench/README.md b/yarn-project/p2p/src/testbench/README.md new file mode 100644 index 00000000000..6d742543eb9 --- /dev/null +++ b/yarn-project/p2p/src/testbench/README.md @@ -0,0 +1,20 @@ +## P2P Test bench + +A testbench that runs only the P2P client on a number of worker threads, with the purpose of monitoring and testing the performance of the P2P client. + +### Running the testbench + +```bash +./run_testbench.sh +``` + +This will produce a LONG series of logs that can be used for further analysis. + +## TODO + +- Strongly parameterizing the testbench scripts +- Add traffic shaping options to the testbench +- Add log parsing step that can categorize a report in json of the propoagation of the message +- Add multiple different tx sizes +- Create ci pipeline that can run analysis on the logs and compare against previous runs +- Create a series of markdown reports detailing what each parameter change does and include graphs to compare performance diff --git a/yarn-project/p2p/src/testbench/p2p_client_testbench_worker.ts b/yarn-project/p2p/src/testbench/p2p_client_testbench_worker.ts new file mode 100644 index 00000000000..328b595b67a --- /dev/null +++ b/yarn-project/p2p/src/testbench/p2p_client_testbench_worker.ts @@ -0,0 +1,156 @@ +/** + * A testbench worker that creates a p2p client and listens for commands from the parent. + * + * Used when running testbench commands + */ +import { MockL2BlockSource } from '@aztec/archiver/test'; +import { P2PClientType, Tx, TxStatus, type WorldStateSynchronizer } from '@aztec/circuit-types'; +import { type EpochCacheInterface } from '@aztec/epoch-cache'; +import { EthAddress } from '@aztec/foundation/eth-address'; +import { createLogger } from '@aztec/foundation/log'; +import { sleep } from '@aztec/foundation/sleep'; +import { type DataStoreConfig } from '@aztec/kv-store/config'; +import { openTmpStore } from '@aztec/kv-store/lmdb-v2'; + +import { type P2PConfig } from '../config.js'; +import { createP2PClient } from '../index.js'; +import { type AttestationPool } from '../mem_pools/attestation_pool/attestation_pool.js'; +import { type EpochProofQuotePool } from '../mem_pools/epoch_proof_quote_pool/epoch_proof_quote_pool.js'; +import { type TxPool } from '../mem_pools/tx_pool/index.js'; +import { AlwaysTrueCircuitVerifier } from '../test-helpers/reqresp-nodes.js'; + +// Simple mock implementation +function mockTxPool(): TxPool { + // Mock all methods + return { + addTxs: () => Promise.resolve(), + getTxByHash: () => Promise.resolve(undefined), + getArchivedTxByHash: () => Promise.resolve(undefined), + markAsMined: () => Promise.resolve(), + markMinedAsPending: () => Promise.resolve(), + deleteTxs: () => Promise.resolve(), + getAllTxs: () => Promise.resolve([]), + getAllTxHashes: () => Promise.resolve([]), + getPendingTxHashes: () => Promise.resolve([]), + getMinedTxHashes: () => Promise.resolve([]), + getTxStatus: () => Promise.resolve(TxStatus.PENDING), + }; +} + +function mockAttestationPool(): AttestationPool { + return { + addAttestations: () => Promise.resolve(), + deleteAttestations: () => Promise.resolve(), + deleteAttestationsOlderThan: () => Promise.resolve(), + deleteAttestationsForSlot: () => Promise.resolve(), + deleteAttestationsForSlotAndProposal: () => Promise.resolve(), + getAttestationsForSlot: () => Promise.resolve([]), + }; +} + +function mockEpochProofQuotePool(): EpochProofQuotePool { + return { + addQuote: () => {}, + getQuotes: () => [], + deleteQuotesToEpoch: () => {}, + }; +} + +function mockEpochCache(): EpochCacheInterface { + return { + getCommittee: () => Promise.resolve([] as EthAddress[]), + getProposerIndexEncoding: () => '0x' as `0x${string}`, + getEpochAndSlotNow: () => ({ epoch: 0n, slot: 0n, ts: 0n }), + computeProposerIndex: () => 0n, + getProposerInCurrentOrNextSlot: () => + Promise.resolve({ + currentProposer: EthAddress.ZERO, + nextProposer: EthAddress.ZERO, + currentSlot: 0n, + nextSlot: 0n, + }), + isInCommittee: () => Promise.resolve(false), + }; +} + +// eslint-disable-next-line @typescript-eslint/no-misused-promises +process.on('message', async msg => { + const { type, config, clientIndex } = msg as { type: string; config: P2PConfig; clientIndex: number }; + + try { + if (type === 'START') { + const txPool = mockTxPool(); + const attestationPool = mockAttestationPool(); + const epochProofQuotePool = mockEpochProofQuotePool(); + const epochCache = mockEpochCache(); + const worldState = {} as WorldStateSynchronizer; + const l2BlockSource = new MockL2BlockSource(); + await l2BlockSource.createBlocks(100); + + const proofVerifier = new AlwaysTrueCircuitVerifier(); + const kvStore = await openTmpStore(`test-${clientIndex}`); + const logger = createLogger(`p2p:${clientIndex}`); + + const deps = { + txPool, + attestationPool, + epochProofQuotePool, + store: kvStore, + logger, + }; + + const client = await createP2PClient( + P2PClientType.Full, + config as P2PConfig & DataStoreConfig, + l2BlockSource, + proofVerifier, + worldState, + epochCache, + undefined, + deps, + ); + + // Create spy for gossip messages + let gossipMessageCount = 0; + (client as any).p2pService.handleNewGossipMessage = (...args: any[]) => { + gossipMessageCount++; + process.send!({ type: 'GOSSIP_RECEIVED', count: gossipMessageCount }); + return (client as any).p2pService.constructor.prototype.handleNewGossipMessage.apply( + (client as any).p2pService, + args, + ); + }; + + await client.start(); + // Wait until the client is ready + for (let i = 0; i < 100; i++) { + const isReady = client.isReady(); + logger.debug(`Client ${clientIndex} isReady: ${isReady}`); + if (isReady) { + break; + } + await sleep(1000); + } + + // Listen for commands from parent + // eslint-disable-next-line @typescript-eslint/no-misused-promises + process.on('message', async (cmd: any) => { + switch (cmd.type) { + case 'STOP': + await client.stop(); + process.exit(0); + break; + case 'SEND_TX': + await client.sendTx(Tx.fromBuffer(Buffer.from(cmd.tx))); + process.send!({ type: 'TX_SENT' }); + break; + } + }); + + process.send!({ type: 'READY' }); + } + } catch (err: any) { + process.send!({ type: 'ERROR', error: err.message }); + process.exit(1); + } +}); diff --git a/yarn-project/p2p/src/testbench/scripts/run_testbench.sh b/yarn-project/p2p/src/testbench/scripts/run_testbench.sh new file mode 100644 index 00000000000..1be4e726f9b --- /dev/null +++ b/yarn-project/p2p/src/testbench/scripts/run_testbench.sh @@ -0,0 +1,7 @@ +## Test bench +# Run the testbench and pipe the output into a file +# Usage: ./run_testbench.sh + +outputfile=$1 + +LOG_LEVEL="debug; trace: .*gossipsub" yarn test testbench.test.ts 2>&1 | pino-pretty > $outputfile \ No newline at end of file diff --git a/yarn-project/p2p/src/testbench/testbench.test.ts b/yarn-project/p2p/src/testbench/testbench.test.ts new file mode 100644 index 00000000000..bbe0fe5f04f --- /dev/null +++ b/yarn-project/p2p/src/testbench/testbench.test.ts @@ -0,0 +1,144 @@ +import { emptyChainConfig } from '@aztec/circuit-types/config'; +import { createLogger } from '@aztec/foundation/log'; +import { sleep } from '@aztec/foundation/sleep'; + +import { type ChildProcess, fork } from 'child_process'; +import path from 'path'; +import { fileURLToPath } from 'url'; + +import { mockTx } from '../../../circuit-types/src/mocks.js'; +import { type P2PConfig, getP2PDefaultConfig } from '../config.js'; +import { generatePeerIdPrivateKeys } from '../test-helpers/generate-peer-id-private-keys.js'; +import { getPorts } from '../test-helpers/get-ports.js'; +import { makeEnrs } from '../test-helpers/make-enrs.js'; + +const __dirname = path.dirname(fileURLToPath(import.meta.url)); +const workerPath = path.join(__dirname, '../../dest/testbench/p2p_client_testbench_worker.js'); +const logger = createLogger('testbench'); + +describe.skip('Gossipsub', () => { + let processes: ChildProcess[]; + + let p2pBaseConfig: P2PConfig; + + beforeEach(() => { + processes = []; + p2pBaseConfig = { ...emptyChainConfig, ...getP2PDefaultConfig() }; + }); + + afterEach(async () => { + // Kill all child processes + await Promise.all( + processes.map( + proc => + new Promise(resolve => { + proc.once('exit', () => resolve()); + proc.send({ type: 'STOP' }); + }), + ), + ); + }); + + /** + * Creates a number of worker clients in separate processes + * All are configured to connect to each other and overrided with the test specific config + * + * @param numberOfClients - The number of clients to create + * @param p2pConfig - The P2P config to use for the clients + * @returns The ENRs of the created clients + */ + async function makeWorkerClients(numberOfClients: number, p2pConfig: Partial) { + const peerIdPrivateKeys = generatePeerIdPrivateKeys(numberOfClients); + const ports = await getPorts(numberOfClients); + const peerEnrs = await makeEnrs(peerIdPrivateKeys, ports, p2pBaseConfig); + + processes = []; + for (let i = 0; i < numberOfClients; i++) { + logger.info(`\n\n\n\n\n\n\nCreating client ${i}\n\n\n\n\n\n\n`); + const addr = `127.0.0.1:${ports[i]}`; + const listenAddr = `0.0.0.0:${ports[i]}`; + + // Maximum seed with 10 other peers to allow peer discovery to connect them at a smoother rate + const otherNodes = peerEnrs.filter((_, ind) => ind < Math.min(i, 10)); + + const config: P2PConfig = { + ...getP2PDefaultConfig(), + p2pEnabled: true, + peerIdPrivateKey: peerIdPrivateKeys[i], + tcpListenAddress: listenAddr, + udpListenAddress: listenAddr, + tcpAnnounceAddress: addr, + udpAnnounceAddress: addr, + bootstrapNodes: [...otherNodes], + ...p2pConfig, + }; + + const childProcess = fork(workerPath); + childProcess.send({ type: 'START', config, clientIndex: i }); + + // Wait for ready signal + await new Promise((resolve, reject) => { + childProcess.once('message', (msg: any) => { + if (msg.type === 'READY') { + resolve(undefined); + } + if (msg.type === 'ERROR') { + reject(new Error(msg.error)); + } + }); + }); + + processes.push(childProcess); + } + // Wait for peers to all connect with each other + await sleep(4000); + + return peerEnrs; + } + + it('Should propagate a tx to all peers with a throttled degree and large node set', async () => { + // No network partition, all nodes should receive + const numberOfClients = 20; + + // Setup clients in separate processes + const testConfig: Partial = { + minPeerCount: 0, + maxPeerCount: numberOfClients + 20, + gossipsubInterval: 700, + gossipsubD: 1, + gossipsubDlo: 1, + gossipsubDhi: 1, + peerCheckIntervalMS: 2500, + + // Increased + gossipsubMcacheGossip: 12, + gossipsubMcacheLength: 12, + }; + + await makeWorkerClients(numberOfClients, testConfig); + + // Track gossip message counts from all processes + const gossipCounts = new Map(); + processes.forEach((proc, i) => { + proc.on('message', (msg: any) => { + if (msg.type === 'GOSSIP_RECEIVED') { + gossipCounts.set(i, msg.count); + } + }); + }); + + // Send tx from client 3 + const tx = await mockTx(); + processes[0].send({ type: 'SEND_TX', tx: tx.toBuffer() }); + + // Give time for message propagation + await sleep(15000); + logger.info(`\n\n\n\n\n\n\nWoke up\n\n\n\n\n\n\n`); + + // Count how many processes received the message + const spiesTriggered = Array.from(gossipCounts.values()).filter(count => count > 0).length; + + // Expect all nodes apart from the one that sent it to receive the message + expect(spiesTriggered).toEqual(numberOfClients - 1); // All nodes apart from the one that sent it + }, 500_000); +}); diff --git a/yarn-project/prover-node/src/prover-node.test.ts b/yarn-project/prover-node/src/prover-node.test.ts index 0e8a18018eb..7ef130de9de 100644 --- a/yarn-project/prover-node/src/prover-node.test.ts +++ b/yarn-project/prover-node/src/prover-node.test.ts @@ -23,7 +23,7 @@ import { retryUntil } from '@aztec/foundation/retry'; import { sleep } from '@aztec/foundation/sleep'; import { openTmpStore } from '@aztec/kv-store/lmdb-v2'; import { type BootstrapNode, InMemoryTxPool, MemoryEpochProofQuotePool, P2PClient } from '@aztec/p2p'; -import { createBootstrapNode, createTestLibP2PService } from '@aztec/p2p/mocks'; +import { createBootstrapNode, createTestLibP2PService } from '@aztec/p2p/test-helpers'; import { type PublicProcessorFactory } from '@aztec/simulator/server'; import { getTelemetryClient } from '@aztec/telemetry-client';