From c5a0c767cd1f8ac4ad1b495753af0da3d5b63644 Mon Sep 17 00:00:00 2001 From: harkamal Date: Sun, 3 Nov 2024 10:41:15 +0530 Subject: [PATCH 1/3] hookup the getblobs api to get bob and proof data from el remove unused fix import metrics overhault, test, debugging testing, some feeback fix add nethermind bug dicussion link fix resolve conflicts --- .../src/api/impl/beacon/blocks/index.ts | 10 +- .../src/chain/blocks/verifyBlock.ts | 2 +- .../blocks/verifyBlocksDataAvailability.ts | 28 +- .../src/eth1/provider/jsonRpcHttpClient.ts | 2 +- .../src/execution/engine/disabled.ts | 4 + .../beacon-node/src/execution/engine/http.ts | 57 +++- .../src/execution/engine/interface.ts | 4 +- .../beacon-node/src/execution/engine/mock.ts | 7 + .../beacon-node/src/execution/engine/types.ts | 19 ++ .../beacon-node/src/metrics/metrics/beacon.ts | 115 ++++++++ .../src/network/processor/gossipHandlers.ts | 3 + .../reqresp/beaconBlocksMaybeBlobsByRoot.ts | 169 +++++++++++- packages/beacon-node/src/sync/unknownBlock.ts | 17 +- .../unavailableBeaconBlobsByRoot.test.ts | 257 ++++++++++++++++++ packages/types/src/deneb/types.ts | 4 + 15 files changed, 663 insertions(+), 35 deletions(-) create mode 100644 packages/beacon-node/test/unit/network/unavailableBeaconBlobsByRoot.test.ts diff --git a/packages/beacon-node/src/api/impl/beacon/blocks/index.ts b/packages/beacon-node/src/api/impl/beacon/blocks/index.ts index bda72bc3b997..66935df25b2b 100644 --- a/packages/beacon-node/src/api/impl/beacon/blocks/index.ts +++ b/packages/beacon-node/src/api/impl/beacon/blocks/index.ts @@ -215,12 +215,12 @@ export function getBeaconBlockApi({ // specification is very clear that this is the desired behaviour. // // i) Publish blobs and block before importing so that network can see them asap - // ii) publish blobs first because - // a) by the times nodes see block, they might decide to pull blobs - // b) they might require more hops to reach recipients in peerDAS kind of setup where - // blobs might need to hop between nodes because of partial subnet subscription - ...blobSidecars.map((blobSidecar) => () => network.publishBlobSidecar(blobSidecar)), + // ii) publish block first because + // a) as soon as node sees block they can start processing it while blobs arrive + // b) getting block first allows nodes to use getBlobs from local ELs and save + // import latency and hopefully bandwidth () => network.publishBeaconBlock(signedBlock) as Promise, + ...blobSidecars.map((blobSidecar) => () => network.publishBlobSidecar(blobSidecar)), () => // there is no rush to persist block since we published it to gossip anyway chain diff --git a/packages/beacon-node/src/chain/blocks/verifyBlock.ts b/packages/beacon-node/src/chain/blocks/verifyBlock.ts index 47a9a9060572..5ead67a720f7 100644 --- a/packages/beacon-node/src/chain/blocks/verifyBlock.ts +++ b/packages/beacon-node/src/chain/blocks/verifyBlock.ts @@ -106,7 +106,7 @@ export async function verifyBlocksInEpoch( } as SegmentExecStatus), // data availability for the blobs - verifyBlocksDataAvailability(this, blocksInput, opts), + verifyBlocksDataAvailability(this, blocksInput, abortController.signal, opts), // Run state transition only // TODO: Ensure it yields to allow flushing to workers and engine API diff --git a/packages/beacon-node/src/chain/blocks/verifyBlocksDataAvailability.ts b/packages/beacon-node/src/chain/blocks/verifyBlocksDataAvailability.ts index 1d8c72a46873..11e1d8f94567 100644 --- a/packages/beacon-node/src/chain/blocks/verifyBlocksDataAvailability.ts +++ b/packages/beacon-node/src/chain/blocks/verifyBlocksDataAvailability.ts @@ -2,7 +2,7 @@ import {ChainForkConfig} from "@lodestar/config"; import {DataAvailabilityStatus} from "@lodestar/fork-choice"; import {computeTimeAtSlot} from "@lodestar/state-transition"; import {UintNum64, deneb} from "@lodestar/types"; -import {Logger} from "@lodestar/utils"; +import {ErrorAborted, Logger} from "@lodestar/utils"; import {Metrics} from "../../metrics/metrics.js"; import {BlockError, BlockErrorCode} from "../errors/index.js"; import {validateBlobSidecars} from "../validation/blobSidecar.js"; @@ -27,6 +27,7 @@ const BLOB_AVAILABILITY_TIMEOUT = 12_000; export async function verifyBlocksDataAvailability( chain: {config: ChainForkConfig; genesisTime: UintNum64; logger: Logger; metrics: Metrics | null}, blocks: BlockInput[], + signal: AbortSignal, opts: ImportBlockOpts ): Promise<{ dataAvailabilityStatuses: DataAvailabilityStatus[]; @@ -43,9 +44,12 @@ export async function verifyBlocksDataAvailability( const availableBlockInputs: BlockInput[] = []; for (const blockInput of blocks) { + if (signal.aborted) { + throw new ErrorAborted("verifyBlocksDataAvailability"); + } // Validate status of only not yet finalized blocks, we don't need yet to propogate the status // as it is not used upstream anywhere - const {dataAvailabilityStatus, availableBlockInput} = await maybeValidateBlobs(chain, blockInput, opts); + const {dataAvailabilityStatus, availableBlockInput} = await maybeValidateBlobs(chain, blockInput, signal, opts); dataAvailabilityStatuses.push(dataAvailabilityStatus); availableBlockInputs.push(availableBlockInput); } @@ -69,6 +73,7 @@ export async function verifyBlocksDataAvailability( async function maybeValidateBlobs( chain: {config: ChainForkConfig; genesisTime: UintNum64; logger: Logger}, blockInput: BlockInput, + signal: AbortSignal, opts: ImportBlockOpts ): Promise<{dataAvailabilityStatus: DataAvailabilityStatus; availableBlockInput: BlockInput}> { switch (blockInput.type) { @@ -92,7 +97,7 @@ async function maybeValidateBlobs( const blobsData = blockInput.type === BlockInputType.availableData ? blockInput.blockData - : await raceWithCutoff(chain, blockInput, blockInput.cachedData.availabilityPromise); + : await raceWithCutoff(chain, blockInput, blockInput.cachedData.availabilityPromise, signal); const {blobs} = blobsData; const {blobKzgCommitments} = (block as deneb.SignedBeaconBlock).message.body; @@ -122,16 +127,21 @@ async function maybeValidateBlobs( async function raceWithCutoff( chain: {config: ChainForkConfig; genesisTime: UintNum64; logger: Logger}, blockInput: BlockInput, - availabilityPromise: Promise + availabilityPromise: Promise, + signal: AbortSignal ): Promise { const {block} = blockInput; const blockSlot = block.message.slot; - const cutoffTime = Math.max( - computeTimeAtSlot(chain.config, blockSlot, chain.genesisTime) * 1000 + BLOB_AVAILABILITY_TIMEOUT - Date.now(), - 0 - ); - const cutoffTimeout = new Promise((_resolve, reject) => setTimeout(reject, cutoffTime)); + const cutoffTime = + computeTimeAtSlot(chain.config, blockSlot, chain.genesisTime) * 1000 + BLOB_AVAILABILITY_TIMEOUT - Date.now(); + const cutoffTimeout = + cutoffTime > 0 + ? new Promise((_resolve, reject) => { + setTimeout(() => reject(new Error("Timeout exceeded")), cutoffTime); + signal.addEventListener("abort", () => reject(signal.reason)); + }) + : Promise.reject(new Error("Cutoff time must be greater than 0")); chain.logger.debug("Racing for blob availabilityPromise", {blockSlot, cutoffTime}); try { diff --git a/packages/beacon-node/src/eth1/provider/jsonRpcHttpClient.ts b/packages/beacon-node/src/eth1/provider/jsonRpcHttpClient.ts index da72d5183d30..906c9cd883c7 100644 --- a/packages/beacon-node/src/eth1/provider/jsonRpcHttpClient.ts +++ b/packages/beacon-node/src/eth1/provider/jsonRpcHttpClient.ts @@ -372,7 +372,7 @@ export class HttpRpcError extends Error { /** * JSON RPC spec errors https://www.jsonrpc.org/specification#response_object */ -function parseJsonRpcErrorCode(code: number): string { +export function parseJsonRpcErrorCode(code: number): string { if (code === -32700) return "Parse request error"; if (code === -32600) return "Invalid request object"; if (code === -32601) return "Method not found"; diff --git a/packages/beacon-node/src/execution/engine/disabled.ts b/packages/beacon-node/src/execution/engine/disabled.ts index 68c72dc02ac6..dce9244ef7ab 100644 --- a/packages/beacon-node/src/execution/engine/disabled.ts +++ b/packages/beacon-node/src/execution/engine/disabled.ts @@ -28,4 +28,8 @@ export class ExecutionEngineDisabled implements IExecutionEngine { getPayloadBodiesByRange(): Promise { throw Error("Execution engine disabled"); } + + getBlobs(): Promise { + throw Error("Execution engine disabled"); + } } diff --git a/packages/beacon-node/src/execution/engine/http.ts b/packages/beacon-node/src/execution/engine/http.ts index c5cf61481b67..c9efbe22fcb0 100644 --- a/packages/beacon-node/src/execution/engine/http.ts +++ b/packages/beacon-node/src/execution/engine/http.ts @@ -1,14 +1,16 @@ import {Logger} from "@lodestar/logger"; import {ForkName, ForkSeq, SLOTS_PER_EPOCH} from "@lodestar/params"; import {ExecutionPayload, ExecutionRequests, Root, RootHex, Wei} from "@lodestar/types"; +import {BlobAndProof} from "@lodestar/types/deneb"; import { ErrorJsonRpcResponse, HttpRpcError, IJsonRpcHttpClient, JsonRpcHttpClientEvent, ReqOpts, + parseJsonRpcErrorCode, } from "../../eth1/provider/jsonRpcHttpClient.js"; -import {numToQuantity} from "../../eth1/provider/utils.js"; +import {bytesToData, numToQuantity} from "../../eth1/provider/utils.js"; import {Metrics} from "../../metrics/index.js"; import {EPOCHS_PER_BATCH} from "../../sync/constants.js"; import {getLodestarClientVersion} from "../../util/metadata.js"; @@ -31,6 +33,7 @@ import { EngineApiRpcReturnTypes, ExecutionPayloadBody, assertReqSizeLimit, + deserializeBlobAndProofs, deserializeExecutionPayloadBody, parseExecutionPayload, serializeBeaconBlockRoot, @@ -111,6 +114,7 @@ const getPayloadOpts: ReqOpts = {routeId: "getPayload"}; */ export class ExecutionEngineHttp implements IExecutionEngine { private logger: Logger; + private lastGetBlobsErrorTime = 0; // The default state is ONLINE, it will be updated to SYNCING once we receive the first payload // This assumption is better than the OFFLINE state, since we can't be sure if the EL is offline and being offline may trigger some notifications @@ -461,6 +465,57 @@ export class ExecutionEngineHttp implements IExecutionEngine { return response.map(deserializeExecutionPayloadBody); } + async getBlobs(_fork: ForkName, versionedHashes: VersionedHashes): Promise<(BlobAndProof | null)[]> { + // retry only after a day may be + const GETBLOBS_RETRY_TIMEOUT = 256 * 12; + const timeNow = Date.now() / 1000; + const timeSinceLastFail = timeNow - this.lastGetBlobsErrorTime; + if (timeSinceLastFail < GETBLOBS_RETRY_TIMEOUT) { + // do not try getblobs since it might not be available + this.logger.debug( + `disabled engine_getBlobsV1 api call since last failed < GETBLOBS_RETRY_TIMEOUT=${GETBLOBS_RETRY_TIMEOUT}`, + timeSinceLastFail + ); + throw Error( + `engine_getBlobsV1 call recently failed timeSinceLastFail=${timeSinceLastFail} < GETBLOBS_RETRY_TIMEOUT=${GETBLOBS_RETRY_TIMEOUT}` + ); + } + + const method = "engine_getBlobsV1"; + assertReqSizeLimit(versionedHashes.length, 128); + const versionedHashesHex = versionedHashes.map(bytesToData); + let response = await this.rpc + .fetchWithRetries({ + method, + params: [versionedHashesHex], + }) + .catch((e) => { + if (e instanceof ErrorJsonRpcResponse && parseJsonRpcErrorCode(e.response.error.code) === "Method not found") { + this.lastGetBlobsErrorTime = timeNow; + this.logger.debug("disabling engine_getBlobsV1 api call since engine responded with method not availeble", { + retryTimeout: GETBLOBS_RETRY_TIMEOUT, + }); + } + throw e; + }); + + // handle nethermind buggy response + // see: https://discord.com/channels/595666850260713488/1293605631785304088/1298956894274060301 + if ( + (response as unknown as {blobsAndProofs: EngineApiRpcReturnTypes[typeof method]}).blobsAndProofs !== undefined + ) { + response = (response as unknown as {blobsAndProofs: EngineApiRpcReturnTypes[typeof method]}).blobsAndProofs; + } + + if (response.length !== versionedHashes.length) { + const error = `Invalid engine_getBlobsV1 response length=${response.length} versionedHashes=${versionedHashes.length}`; + this.logger.error(error); + throw Error(error); + } + + return response.map(deserializeBlobAndProofs); + } + private async getClientVersion(clientVersion: ClientVersion): Promise { const method = "engine_getClientVersionV1"; diff --git a/packages/beacon-node/src/execution/engine/interface.ts b/packages/beacon-node/src/execution/engine/interface.ts index 7bbf1bb27c1d..c32cc1bc7215 100644 --- a/packages/beacon-node/src/execution/engine/interface.ts +++ b/packages/beacon-node/src/execution/engine/interface.ts @@ -1,6 +1,6 @@ import {ForkName} from "@lodestar/params"; import {ExecutionPayload, ExecutionRequests, Root, RootHex, Wei, capella} from "@lodestar/types"; -import {Blob, KZGCommitment, KZGProof} from "@lodestar/types/deneb"; +import {Blob, BlobAndProof, KZGCommitment, KZGProof} from "@lodestar/types/deneb"; import {DATA} from "../../eth1/provider/utils.js"; import {PayloadId, PayloadIdCache, WithdrawalV1} from "./payloadIdCache.js"; @@ -179,4 +179,6 @@ export interface IExecutionEngine { getPayloadBodiesByHash(fork: ForkName, blockHash: DATA[]): Promise<(ExecutionPayloadBody | null)[]>; getPayloadBodiesByRange(fork: ForkName, start: number, count: number): Promise<(ExecutionPayloadBody | null)[]>; + + getBlobs(fork: ForkName, versionedHashes: VersionedHashes): Promise<(BlobAndProof | null)[]>; } diff --git a/packages/beacon-node/src/execution/engine/mock.ts b/packages/beacon-node/src/execution/engine/mock.ts index 2331500624eb..8a84d2b0148e 100644 --- a/packages/beacon-node/src/execution/engine/mock.ts +++ b/packages/beacon-node/src/execution/engine/mock.ts @@ -99,6 +99,7 @@ export class ExecutionEngineMockBackend implements JsonRpcBackend { engine_getPayloadBodiesByHashV1: this.getPayloadBodiesByHash.bind(this), engine_getPayloadBodiesByRangeV1: this.getPayloadBodiesByRange.bind(this), engine_getClientVersionV1: this.getClientVersionV1.bind(this), + engine_getBlobsV1: this.getBlobs.bind(this), }; } @@ -397,6 +398,12 @@ export class ExecutionEngineMockBackend implements JsonRpcBackend { return [{code: ClientCode.XX, name: "mock", version: "", commit: ""}]; } + private getBlobs( + versionedHashes: EngineApiRpcParamTypes["engine_getBlobsV1"][0] + ): EngineApiRpcReturnTypes["engine_getBlobsV1"] { + return versionedHashes.map((_vh) => null); + } + private timestampToFork(timestamp: number): ForkExecution { if (timestamp > (this.opts.electraForkTimestamp ?? Infinity)) return ForkName.electra; if (timestamp > (this.opts.denebForkTimestamp ?? Infinity)) return ForkName.deneb; diff --git a/packages/beacon-node/src/execution/engine/types.ts b/packages/beacon-node/src/execution/engine/types.ts index 9af2d8d1ce10..f35a63aa3d96 100644 --- a/packages/beacon-node/src/execution/engine/types.ts +++ b/packages/beacon-node/src/execution/engine/types.ts @@ -6,6 +6,7 @@ import { ForkSeq, } from "@lodestar/params"; import {ExecutionPayload, ExecutionRequests, Root, Wei, bellatrix, capella, deneb, electra, ssz} from "@lodestar/types"; +import {BlobAndProof} from "@lodestar/types/deneb"; import { DATA, @@ -67,6 +68,8 @@ export type EngineApiRpcParamTypes = { * Object - Instance of ClientVersion */ engine_getClientVersionV1: [ClientVersionRpc]; + + engine_getBlobsV1: [DATA[]]; }; export type PayloadStatus = { @@ -109,6 +112,8 @@ export type EngineApiRpcReturnTypes = { engine_getPayloadBodiesByRangeV1: (ExecutionPayloadBodyRpc | null)[]; engine_getClientVersionV1: ClientVersionRpc[]; + + engine_getBlobsV1: (BlobAndProofRpc | null)[]; }; type ExecutionPayloadRpcWithValue = { @@ -171,6 +176,11 @@ export type DepositRequestsRpc = DATA; export type WithdrawalRequestsRpc = DATA; export type ConsolidationRequestsRpc = DATA; +export type BlobAndProofRpc = { + blob: DATA; + proof: DATA; +}; + export type VersionedHashesRpc = DATA[]; export type PayloadAttributesRpc = { @@ -462,6 +472,15 @@ export function serializeExecutionPayloadBody(data: ExecutionPayloadBody | null) : null; } +export function deserializeBlobAndProofs(data: BlobAndProofRpc | null): BlobAndProof | null { + return data + ? { + blob: dataToBytes(data.blob, BYTES_PER_FIELD_ELEMENT * FIELD_ELEMENTS_PER_BLOB), + proof: dataToBytes(data.proof, 48), + } + : null; +} + export function assertReqSizeLimit(blockHashesReqCount: number, count: number): void { if (blockHashesReqCount > count) { throw new Error(`Requested blocks must not be > ${count}`); diff --git a/packages/beacon-node/src/metrics/metrics/beacon.ts b/packages/beacon-node/src/metrics/metrics/beacon.ts index 0347aef8957b..f572ec0d3c1f 100644 --- a/packages/beacon-node/src/metrics/metrics/beacon.ts +++ b/packages/beacon-node/src/metrics/metrics/beacon.ts @@ -220,6 +220,121 @@ export function createBeaconMetrics(register: RegistryMetricCreator) { }), }, + blockInputFetchStats: { + // of already available blocks which didn't have to go through blobs pull + totalDataAvailableBlockInputs: register.gauge({ + name: "beacon_blockinputs_already_available_total", + help: "Total number of block inputs whose blobs were already available", + }), + totalDataAvailableBlockInputBlobs: register.gauge({ + name: "beacon_blockinput_blobs_already_available_total", + help: "Total number of block input blobs that of already available blocks", + }), + + // of those which need to be fetched + dataPromiseBlobsAlreadyAvailable: register.gauge({ + name: "beacon_datapromise_blockinput_blobs_already_available_total", + help: "Count of blocks that were already available in blockinput cache via gossip", + }), + dataPromiseBlobsDelayedGossipAvailable: register.gauge({ + name: "beacon_datapromise_blockinput_blobs_delayed_gossip_available_total", + help: "Count of blobs that became available delayed via gossip post block arrival", + }), + dataPromiseBlobsDeplayedGossipAvailableSavedGetBlobsCompute: register.gauge({ + name: "beacon_datapromise_blockinput_blobs_delayed_gossip_saved_computation_total", + help: "Count of late available blobs that saved blob sidecar computation from getblobs", + }), + dataPromiseBlobsFoundInGetBlobsCacheNotNull: register.gauge({ + name: "beacon_datapromise_blockinput_blobs_found_nonnull_in_getblobs_cache_total", + help: "Count of blobs that were found not null in getblobs cache", + }), + dataPromiseBlobsFoundInGetBlobsCacheNull: register.gauge({ + name: "beacon_datapromise_blockinput_blobs_found_null_in_getblobs_cache_total", + help: "Count of blobs that were found null in the getblobs cache", + }), + dataPromiseBlobsNotAvailableInGetBlobsCache: register.gauge({ + name: "beacon_datapromise_blockinput_blobs_notfound_in_getblobs_cache_total", + help: "Count of blobs that were newly seen and hence in not getblobs cache", + }), + dataPromiseBlobsEngineGetBlobsApiRequests: register.gauge({ + name: "beacon_datapromise_blockinput_blobs_queried_in_getblobs_api_total", + help: "Total number of blobs requested to the getblobs api", + }), + dataPromiseBlobsEngineGetBlobsApiNotNull: register.gauge({ + name: "beacon_datapromise_blockinput_blobs_responded_nonnull_in_getblobs_api_total", + help: "Count of successful engine API responses that were not null", + }), + dataPromiseBlobsEngineGetBlobsApiNull: register.gauge({ + name: "beacon_datapromise_blockinput_blobs_responded_null_in_getblobs_api_total", + help: "Count of engine API responses that were null", + }), + dataPromiseBlobsEngineApiGetBlobsErroredNull: register.gauge({ + name: "beacon_datapromise_blockinput_blobs_errored_as_null_in_getblobs_api_total", + help: "Number of responses marked null due to errors in getblobs api", + }), + dataPromiseBlobsEngineApiGetBlobsUseful: register.gauge({ + name: "beacon_datapromise_blockinput_getblobs_api_nonnull_responses_used_total", + help: "Count of successful non null engine API responses that were found useful", + }), + dataPromiseBlobsFinallyQueriedFromNetwork: register.gauge({ + name: "beacon_datapromise_blockinput_blobs_finally_queried_from_network_total", + help: "Number of blob requests finally sent to the network", + }), + dataPromiseBlobsFinallyAvailableFromNetwork: register.gauge({ + name: "beacon_datapromise_blockinput_blobs_finally_resolved_from_network_total", + help: "Number of blobs successfully fetched from the network", + }), + + totalDataPromiseBlockInputsAvailableUsingGetBlobs: register.gauge({ + name: "beacon_datapromise_blockinputs_available_using_getblobs_total", + help: "Count of block inputs that became available using non-null get blobs requests", + }), + totalDataPromiseBlockInputsTried: register.gauge({ + name: "beacon_datapromise_blockinputs_tried_for_blobs_pull_total", + help: "Total number of block inputs that were tried to resolve", + }), + totalDataPromiseBlockInputsResolvedAvailable: register.gauge({ + name: "beacon_datapromise_blockinputs_available_post_blobs_pull_total", + help: "Total number of block inputs that were successfully resolved as available on blobs pull", + }), + + // retry counts + totalDataPromiseBlockInputsReTried: register.gauge({ + name: "beacon_datapromise_blockinputs_retried_for_blobs_pull_total", + help: "Total number of block inputs that were retried for blobs pull from network", + }), + dataPromiseBlobsRetriedFromNetwork: register.gauge({ + name: "beacon_datapromise_blockinput_blobs_retried_from_network_total", + help: "Number of blob requests required from the network on retries", + }), + dataPromiseBlobsRetriedAvailableFromNetwork: register.gauge({ + name: "beacon_datapromise_blockinput_blobs_retried_and_resolved_from_network_total", + help: "Number of blobs successfully fetched from the network on retries", + }), + totalDataPromiseBlockInputsRetriedAvailableFromNetwork: register.gauge({ + name: "beacon_datapromise_blockinputs_retried_and_resolved_from_network_total", + help: "Number of blockinputs successfully resolved from the network on retries", + }), + + // some caches stats + getBlobsCacheSize: register.gauge({ + name: "getblob_cache_size", + help: "getBlobs cache size", + }), + getBlobsCachePruned: register.gauge({ + name: "getblob_cache_pruned_total", + help: "getblobs cache pruned count", + }), + dataPromiseBlockInputRetryTrackerCacheSize: register.gauge({ + name: "beacon_datapromise_blockinput_retry_tracker_cache_size", + help: "datapromise retry tracker cache size", + }), + dataPromiseBlockInputRetryTrackerCachePruned: register.gauge({ + name: "beacon_datapromise_blockinput_retry_tracker_cache_pruned", + help: "datapromise retry tracker cache pruned count", + }), + }, + // Non-spec'ed clockSlot: register.gauge({ name: "beacon_clock_slot", diff --git a/packages/beacon-node/src/network/processor/gossipHandlers.ts b/packages/beacon-node/src/network/processor/gossipHandlers.ts index 5bec263a45c8..029e7ae4db42 100644 --- a/packages/beacon-node/src/network/processor/gossipHandlers.ts +++ b/packages/beacon-node/src/network/processor/gossipHandlers.ts @@ -253,6 +253,9 @@ function getSequentialHandlers(modules: ValidatorFnsModules, options: GossipHand // if blobs are not yet fully available start an aggressive blob pull if (blockInput.type === BlockInputType.dataPromise) { events.emit(NetworkEvent.unknownBlockInput, {blockInput, peer: peerIdStr}); + } else if (blockInput.type === BlockInputType.availableData) { + metrics?.blockInputFetchStats.totalDataAvailableBlockInputs.inc(); + metrics?.blockInputFetchStats.totalDataAvailableBlockInputBlobs.inc(blockInput.blockData.blobs.length); } chain diff --git a/packages/beacon-node/src/network/reqresp/beaconBlocksMaybeBlobsByRoot.ts b/packages/beacon-node/src/network/reqresp/beaconBlocksMaybeBlobsByRoot.ts index a6919318999d..d0c3b10cfbbf 100644 --- a/packages/beacon-node/src/network/reqresp/beaconBlocksMaybeBlobsByRoot.ts +++ b/packages/beacon-node/src/network/reqresp/beaconBlocksMaybeBlobsByRoot.ts @@ -1,6 +1,9 @@ +import {toHexString} from "@chainsafe/ssz"; import {ChainForkConfig} from "@lodestar/config"; -import {ForkSeq} from "@lodestar/params"; -import {SignedBeaconBlock, deneb, phase0} from "@lodestar/types"; +import {ForkName, ForkSeq} from "@lodestar/params"; +import {signedBlockToSignedHeader} from "@lodestar/state-transition"; +import {RootHex, SignedBeaconBlock, deneb, phase0} from "@lodestar/types"; +import {BlobAndProof} from "@lodestar/types/deneb"; import {fromHex} from "@lodestar/utils"; import { BlobsSource, @@ -13,11 +16,17 @@ import { getBlockInputBlobs, } from "../../chain/blocks/types.js"; import {BlockInputAvailabilitySource} from "../../chain/seenCache/seenGossipBlockInput.js"; +import {IExecutionEngine} from "../../execution/index.js"; import {Metrics} from "../../metrics/index.js"; +import {computeInclusionProof, kzgCommitmentToVersionedHash} from "../../util/blobs.js"; import {PeerIdStr} from "../../util/peerId.js"; import {INetwork} from "../interface.js"; import {matchBlockWithBlobs} from "./beaconBlocksMaybeBlobsByRange.js"; +// keep 1 epoch of stuff, assmume 16 blobs +const MAX_ENGINE_GETBLOBS_CACHE = 32 * 16; +const MAX_UNAVAILABLE_RETRY_CACHE = 32; + export async function beaconBlocksMaybeBlobsByRoot( config: ChainForkConfig, network: INetwork, @@ -35,6 +44,7 @@ export async function beaconBlocksMaybeBlobsByRoot( if (ForkSeq[fork] >= ForkSeq.deneb) { const blobKzgCommitmentsLen = (block.data.message.body as deneb.BeaconBlockBody).blobKzgCommitments.length; for (let index = 0; index < blobKzgCommitmentsLen; index++) { + // try see if the blob is available locally blobIdentifiers.push({blockRoot, index}); } } @@ -57,8 +67,14 @@ export async function unavailableBeaconBlobsByRoot( network: INetwork, peerId: PeerIdStr, unavailableBlockInput: BlockInput | NullBlockInput, - metrics: Metrics | null + opts: { + metrics: Metrics | null; + executionEngine: IExecutionEngine; + engineGetBlobsCache?: Map; + blockInputsRetryTrackerCache?: Set; + } ): Promise { + const {executionEngine, metrics, engineGetBlobsCache, blockInputsRetryTrackerCache} = opts; if (unavailableBlockInput.block !== null && unavailableBlockInput.type !== BlockInputType.dataPromise) { return unavailableBlockInput; } @@ -81,26 +97,150 @@ export async function unavailableBeaconBlobsByRoot( } // resolve missing blobs - const blobIdentifiers: deneb.BlobIdentifier[] = []; const slot = block.message.slot; + const fork = config.getForkName(slot); const blockRoot = config.getForkTypes(slot).BeaconBlock.hashTreeRoot(block.message); + const blockRootHex = toHexString(blockRoot); + + const blockTriedBefore = blockInputsRetryTrackerCache?.has(blockRootHex) === true; + if (blockTriedBefore) { + metrics?.blockInputFetchStats.totalDataPromiseBlockInputsReTried.inc(); + } else { + metrics?.blockInputFetchStats.totalDataPromiseBlockInputsTried.inc(); + blockInputsRetryTrackerCache?.add(blockRootHex); + } const blobKzgCommitmentsLen = (block.message.body as deneb.BeaconBlockBody).blobKzgCommitments.length; + const signedBlockHeader = signedBlockToSignedHeader(config, block); + + const engineReqIdentifiers: (deneb.BlobIdentifier & { + kzgCommitment: deneb.KZGCommitment; + versionedHash: Uint8Array; + })[] = []; + const networkReqIdentifiers: deneb.BlobIdentifier[] = []; + + let getBlobsUseful = false; for (let index = 0; index < blobKzgCommitmentsLen; index++) { - if (blobsCache.has(index) === false) blobIdentifiers.push({blockRoot, index}); + if (blobsCache.has(index) === false) { + const kzgCommitment = (block.message.body as deneb.BeaconBlockBody).blobKzgCommitments[index]; + const versionedHash = kzgCommitmentToVersionedHash(kzgCommitment); + + // check if the getblobs cache has the data if block not been queried before + if (engineGetBlobsCache?.has(toHexString(versionedHash)) === true && !blockTriedBefore) { + const catchedBlobAndProof = engineGetBlobsCache.get(toHexString(versionedHash)) ?? null; + if (catchedBlobAndProof === null) { + metrics?.blockInputFetchStats.dataPromiseBlobsFoundInGetBlobsCacheNull.inc(); + networkReqIdentifiers.push({blockRoot, index}); + } else { + metrics?.blockInputFetchStats.dataPromiseBlobsFoundInGetBlobsCacheNotNull.inc(); + // compute TODO: also add inclusion proof cache + const {blob, proof: kzgProof} = catchedBlobAndProof; + const kzgCommitmentInclusionProof = computeInclusionProof(fork, block.message.body, index); + const blobSidecar = {index, blob, kzgCommitment, kzgProof, signedBlockHeader, kzgCommitmentInclusionProof}; + blobsCache.set(blobSidecar.index, {blobSidecar, blobBytes: null}); + } + } else if (blockTriedBefore) { + // only retry it from network + networkReqIdentifiers.push({blockRoot, index}); + } else { + // see if we can pull from EL + metrics?.blockInputFetchStats.dataPromiseBlobsNotAvailableInGetBlobsCache.inc(); + engineReqIdentifiers.push({blockRoot, index, versionedHash, kzgCommitment}); + } + } else { + metrics?.blockInputFetchStats.dataPromiseBlobsAlreadyAvailable.inc(); + } } - let allBlobSidecars: deneb.BlobSidecar[]; - if (blobIdentifiers.length > 0) { - allBlobSidecars = await network.sendBlobSidecarsByRoot(peerId, blobIdentifiers); + const versionedHashes = engineReqIdentifiers.map((bi) => bi.versionedHash); + metrics?.blockInputFetchStats.dataPromiseBlobsEngineGetBlobsApiRequests.inc(versionedHashes.length); + + const blobAndProofs = await executionEngine.getBlobs(ForkName.deneb, versionedHashes).catch((_e) => { + metrics?.blockInputFetchStats.dataPromiseBlobsEngineApiGetBlobsErroredNull.inc(versionedHashes.length); + return versionedHashes.map((_vh) => null); + }); + + for (let j = 0; j < versionedHashes.length; j++) { + const blobAndProof = blobAndProofs[j] ?? null; + // save to cache for future reference + engineGetBlobsCache?.set(toHexString(versionedHashes[j]), blobAndProof); + if (blobAndProof !== null) { + metrics?.blockInputFetchStats.dataPromiseBlobsEngineGetBlobsApiNotNull.inc(); + + // if we already got it by now, save the compute + if (blobsCache.has(engineReqIdentifiers[j].index) === false) { + metrics?.blockInputFetchStats.dataPromiseBlobsEngineApiGetBlobsUseful.inc(); + getBlobsUseful = true; + const {blob, proof: kzgProof} = blobAndProof; + const {kzgCommitment, index} = engineReqIdentifiers[j]; + const kzgCommitmentInclusionProof = computeInclusionProof(fork, block.message.body, index); + const blobSidecar = {index, blob, kzgCommitment, kzgProof, signedBlockHeader, kzgCommitmentInclusionProof}; + // add them in cache so that its reflected in all the blockInputs that carry this + // for e.g. a blockInput that might be awaiting blobs promise fullfillment in + // verifyBlocksDataAvailability + blobsCache.set(blobSidecar.index, {blobSidecar, blobBytes: null}); + } else { + metrics?.blockInputFetchStats.dataPromiseBlobsDelayedGossipAvailable.inc(); + metrics?.blockInputFetchStats.dataPromiseBlobsDeplayedGossipAvailableSavedGetBlobsCompute.inc(); + } + } + // may be blobsidecar arrived in the timespan of making the request + else { + metrics?.blockInputFetchStats.dataPromiseBlobsEngineGetBlobsApiNull.inc(); + if (blobsCache.has(engineReqIdentifiers[j].index) === false) { + const {blockRoot, index} = engineReqIdentifiers[j]; + networkReqIdentifiers.push({blockRoot, index}); + } else { + metrics?.blockInputFetchStats.dataPromiseBlobsDelayedGossipAvailable.inc(); + } + } + } + + if (engineGetBlobsCache !== undefined) { + // prune out engineGetBlobsCache + let pruneLength = Math.max(0, engineGetBlobsCache?.size - MAX_ENGINE_GETBLOBS_CACHE); + for (const key of engineGetBlobsCache.keys()) { + if (pruneLength <= 0) break; + engineGetBlobsCache.delete(key); + pruneLength--; + metrics?.blockInputFetchStats.getBlobsCachePruned.inc(); + } + metrics?.blockInputFetchStats.getBlobsCacheSize.set(engineGetBlobsCache.size); + } + if (blockInputsRetryTrackerCache !== undefined) { + // prune out engineGetBlobsCache + let pruneLength = Math.max(0, blockInputsRetryTrackerCache?.size - MAX_UNAVAILABLE_RETRY_CACHE); + for (const key of blockInputsRetryTrackerCache.keys()) { + if (pruneLength <= 0) break; + blockInputsRetryTrackerCache.delete(key); + pruneLength--; + metrics?.blockInputFetchStats.dataPromiseBlockInputRetryTrackerCachePruned.inc(); + } + metrics?.blockInputFetchStats.dataPromiseBlockInputRetryTrackerCacheSize.set(blockInputsRetryTrackerCache.size); + } + + // if clients expect sorted identifiers + networkReqIdentifiers.sort((a, b) => a.index - b.index); + let networkResBlobSidecars: deneb.BlobSidecar[]; + metrics?.blockInputFetchStats.dataPromiseBlobsFinallyQueriedFromNetwork.inc(networkReqIdentifiers.length); + if (blockTriedBefore) { + metrics?.blockInputFetchStats.dataPromiseBlobsRetriedFromNetwork.inc(networkReqIdentifiers.length); + } + + if (networkReqIdentifiers.length > 0) { + networkResBlobSidecars = await network.sendBlobSidecarsByRoot(peerId, networkReqIdentifiers); + metrics?.blockInputFetchStats.dataPromiseBlobsFinallyAvailableFromNetwork.inc(networkResBlobSidecars.length); + if (blockTriedBefore) { + metrics?.blockInputFetchStats.dataPromiseBlobsRetriedAvailableFromNetwork.inc(networkReqIdentifiers.length); + } } else { - allBlobSidecars = []; + networkResBlobSidecars = []; } // add them in cache so that its reflected in all the blockInputs that carry this // for e.g. a blockInput that might be awaiting blobs promise fullfillment in // verifyBlocksDataAvailability - for (const blobSidecar of allBlobSidecars) { + for (const blobSidecar of networkResBlobSidecars) { blobsCache.set(blobSidecar.index, {blobSidecar, blobBytes: null}); } @@ -114,5 +254,14 @@ export async function unavailableBeaconBlobsByRoot( const blockData = {fork: cachedData.fork, ...allBlobs, blobsSource: BlobsSource.byRoot} as BlockInputDataBlobs; resolveAvailability(blockData); metrics?.syncUnknownBlock.resolveAvailabilitySource.inc({source: BlockInputAvailabilitySource.UNKNOWN_SYNC}); + + metrics?.blockInputFetchStats.totalDataAvailableBlockInputs.inc(); + if (getBlobsUseful) { + metrics?.blockInputFetchStats.totalDataPromiseBlockInputsAvailableUsingGetBlobs.inc(); + } + if (blockTriedBefore) { + metrics?.blockInputFetchStats.totalDataPromiseBlockInputsRetriedAvailableFromNetwork.inc(); + } + return getBlockInput.availableData(config, block, BlockSource.byRoot, blockBytes, blockData); } diff --git a/packages/beacon-node/src/sync/unknownBlock.ts b/packages/beacon-node/src/sync/unknownBlock.ts index c9652dc6ecd2..f1fa44750f4b 100644 --- a/packages/beacon-node/src/sync/unknownBlock.ts +++ b/packages/beacon-node/src/sync/unknownBlock.ts @@ -1,6 +1,7 @@ import {ChainForkConfig} from "@lodestar/config"; import {INTERVALS_PER_SLOT} from "@lodestar/params"; import {Root, RootHex, deneb} from "@lodestar/types"; +import {BlobAndProof} from "@lodestar/types/deneb"; import {Logger, fromHex, pruneSetToMax, toRootHex} from "@lodestar/utils"; import {sleep} from "@lodestar/utils"; import {BlockInput, BlockInputType, NullBlockInput} from "../chain/blocks/types.js"; @@ -34,6 +35,9 @@ export class UnknownBlockSync { private readonly maxPendingBlocks; private subscribedToNetworkEvents = false; + private engineGetBlobsCache = new Map(); + private blockInputsRetryTrackerCache = new Set(); + constructor( private readonly config: ChainForkConfig, private readonly network: INetwork, @@ -532,13 +536,12 @@ export class UnknownBlockSync { for (let i = 0; i < MAX_ATTEMPTS_PER_BLOCK; i++) { const peer = shuffledPeers[i % shuffledPeers.length]; try { - const blockInput = await unavailableBeaconBlobsByRoot( - this.config, - this.network, - peer, - unavailableBlockInput, - this.metrics - ); + const blockInput = await unavailableBeaconBlobsByRoot(this.config, this.network, peer, unavailableBlockInput, { + metrics: this.metrics, + executionEngine: this.chain.executionEngine, + engineGetBlobsCache: this.engineGetBlobsCache, + blockInputsRetryTrackerCache: this.blockInputsRetryTrackerCache, + }); // Peer does not have the block, try with next peer if (blockInput === undefined) { diff --git a/packages/beacon-node/test/unit/network/unavailableBeaconBlobsByRoot.test.ts b/packages/beacon-node/test/unit/network/unavailableBeaconBlobsByRoot.test.ts new file mode 100644 index 000000000000..b222b2ac38d8 --- /dev/null +++ b/packages/beacon-node/test/unit/network/unavailableBeaconBlobsByRoot.test.ts @@ -0,0 +1,257 @@ +import {toHexString} from "@chainsafe/ssz"; +import {createBeaconConfig, createChainForkConfig, defaultChainConfig} from "@lodestar/config"; +import {BYTES_PER_FIELD_ELEMENT, FIELD_ELEMENTS_PER_BLOB, ForkBlobs, ForkName, isForkBlobs} from "@lodestar/params"; +import {signedBlockToSignedHeader} from "@lodestar/state-transition"; +import {SignedBeaconBlock, deneb, ssz} from "@lodestar/types"; +import {beforeAll, describe, expect, it, vi} from "vitest"; +import { + BlobsSource, + BlockInput, + BlockInputDataBlobs, + BlockInputType, + BlockSource, + CachedData, + getBlockInput, +} from "../../../src/chain/blocks/types.js"; +import {IExecutionEngine} from "../../../src/execution/index.js"; +import {INetwork} from "../../../src/network/interface.js"; +import {unavailableBeaconBlobsByRoot} from "../../../src/network/reqresp/index.js"; +import {computeInclusionProof, kzgCommitmentToVersionedHash} from "../../../src/util/blobs.js"; +import {ckzg} from "../../../src/util/kzg.js"; +import {initCKZG, loadEthereumTrustedSetup} from "../../../src/util/kzg.js"; + +describe("unavailableBeaconBlobsByRoot", () => { + beforeAll(async () => { + await initCKZG(); + loadEthereumTrustedSetup(); + }); + + /* eslint-disable @typescript-eslint/naming-convention */ + const chainConfig = createChainForkConfig({ + ...defaultChainConfig, + ALTAIR_FORK_EPOCH: 0, + BELLATRIX_FORK_EPOCH: 0, + CAPELLA_FORK_EPOCH: 0, + DENEB_FORK_EPOCH: 0, + }); + const genesisValidatorsRoot = Buffer.alloc(32, 0xaa); + const config = createBeaconConfig(chainConfig, genesisValidatorsRoot); + + const executionEngine = { + getBlobs: vi.fn(), + }; + + const network = { + sendBeaconBlocksByRoot: vi.fn(), + sendBlobSidecarsByRoot: vi.fn(), + }; + + const peerId = "mockPeerId"; + const engineGetBlobsCache = new Map(); + + it("should successfully resolve all blobs from engine and network", async () => { + // Simulate a block 1 with 5 blobs + const signedBlock = ssz.deneb.SignedBeaconBlock.defaultValue(); + signedBlock.message.slot = 1; + const blobscommitmentsandproofs = generateBlobs(5); + signedBlock.message.body.blobKzgCommitments.push(...blobscommitmentsandproofs.kzgCommitments); + const blockheader = signedBlockToSignedHeader(config, signedBlock); + + const unavailableBlockInput = { + block: signedBlock, + source: BlockSource.gossip, + blockBytes: null, + type: BlockInputType.dataPromise, + cachedData: getEmptyBlockInputCacheEntry(ForkName.deneb).cachedData, + } as BlockInput; + + // total of 5 blobs + // blob 0. not in cache & to resolved by getBlobs + // blob 1. not in cache & to resolved by getBlobs + // blob 2. to be found in engineGetBlobsCache + // blob 3. null cached earlier so should directly go to network query and skip engine query + // blob 4. to hit getBlobs first with null response and then go to the network query + // + // engineGetBlobsCache caches 2 fully, and null for 3 + // getBlobs should see 0,1,4 and return first two non null and last null + // network should see 3,4 + + engineGetBlobsCache.set(toHexString(blobscommitmentsandproofs.blobVersionedHashes[2]), { + blob: blobscommitmentsandproofs.blobs[2], + proof: blobscommitmentsandproofs.kzgProofs[2], + }); + engineGetBlobsCache.set(toHexString(blobscommitmentsandproofs.blobVersionedHashes[3]), null); + + // Mock execution engine to return 2 blobs + executionEngine.getBlobs.mockResolvedValueOnce([ + { + blob: blobscommitmentsandproofs.blobs[0], + proof: blobscommitmentsandproofs.kzgProofs[0], + }, + { + blob: blobscommitmentsandproofs.blobs[1], + proof: blobscommitmentsandproofs.kzgProofs[1], + }, + null, + ]); + + // Mock network to return 2 blobs + network.sendBlobSidecarsByRoot.mockResolvedValueOnce([ + { + index: 3, + blob: blobscommitmentsandproofs.blobs[3], + kzgCommitment: blobscommitmentsandproofs.kzgCommitments[3], + kzgProof: blobscommitmentsandproofs.kzgProofs[3], + signedBlockHeader: blockheader, + kzgCommitmentInclusionProof: computeInclusionProof(ForkName.deneb, signedBlock.message.body, 3), + }, + { + index: 4, + blob: blobscommitmentsandproofs.blobs[4], + kzgCommitment: blobscommitmentsandproofs.kzgCommitments[4], + kzgProof: blobscommitmentsandproofs.kzgProofs[4], + signedBlockHeader: blockheader, + kzgCommitmentInclusionProof: computeInclusionProof(ForkName.deneb, signedBlock.message.body, 4), + }, + ]); + + const result = await unavailableBeaconBlobsByRoot( + config, + network as unknown as INetwork, + peerId, + unavailableBlockInput, + { + executionEngine: executionEngine as unknown as IExecutionEngine, + metrics: null, + engineGetBlobsCache, + } + ); + + // Check if all blobs are aggregated + const allBlobs = [ + { + index: 0, + blob: blobscommitmentsandproofs.blobs[0], + kzgCommitment: blobscommitmentsandproofs.kzgCommitments[0], + kzgProof: blobscommitmentsandproofs.kzgProofs[0], + signedBlockHeader: blockheader, + kzgCommitmentInclusionProof: computeInclusionProof(ForkName.deneb, signedBlock.message.body, 0), + }, + { + index: 1, + blob: blobscommitmentsandproofs.blobs[1], + kzgCommitment: blobscommitmentsandproofs.kzgCommitments[1], + kzgProof: blobscommitmentsandproofs.kzgProofs[1], + signedBlockHeader: blockheader, + kzgCommitmentInclusionProof: computeInclusionProof(ForkName.deneb, signedBlock.message.body, 1), + }, + { + index: 2, + blob: blobscommitmentsandproofs.blobs[2], + kzgCommitment: blobscommitmentsandproofs.kzgCommitments[2], + kzgProof: blobscommitmentsandproofs.kzgProofs[2], + signedBlockHeader: blockheader, + kzgCommitmentInclusionProof: computeInclusionProof(ForkName.deneb, signedBlock.message.body, 2), + }, + { + index: 3, + blob: blobscommitmentsandproofs.blobs[3], + kzgCommitment: blobscommitmentsandproofs.kzgCommitments[3], + kzgProof: blobscommitmentsandproofs.kzgProofs[3], + signedBlockHeader: blockheader, + kzgCommitmentInclusionProof: computeInclusionProof(ForkName.deneb, signedBlock.message.body, 3), + }, + { + index: 4, + blob: blobscommitmentsandproofs.blobs[4], + kzgCommitment: blobscommitmentsandproofs.kzgCommitments[4], + kzgProof: blobscommitmentsandproofs.kzgProofs[4], + signedBlockHeader: blockheader, + kzgCommitmentInclusionProof: computeInclusionProof(ForkName.deneb, signedBlock.message.body, 4), + }, + ]; + + const blockData = { + fork: ForkName.deneb as ForkBlobs, + blobs: allBlobs, + blobsBytes: [null, null, null, null, null], + blobsSource: BlobsSource.byRoot, + }; + const resolvedBlobs = getBlockInput.availableData(config, signedBlock, BlockSource.byRoot, null, blockData); + + const engineReqIdentifiers = [...blobscommitmentsandproofs.blobVersionedHashes]; + // versionedHashes: 1,2,4 + engineReqIdentifiers.splice(2, 2); + expect(result).toBeDefined(); + expect(executionEngine.getBlobs).toHaveBeenCalledWith("deneb", engineReqIdentifiers); + expect(result).toEqual(resolvedBlobs); + }); +}); + +type BlockInputCacheType = { + fork: ForkName; + block?: SignedBeaconBlock; + blockBytes?: Uint8Array | null; + cachedData?: CachedData; + // block promise and its callback cached for delayed resolution + blockInputPromise: Promise; + resolveBlockInput: (blockInput: BlockInput) => void; +}; + +function getEmptyBlockInputCacheEntry(fork: ForkName): BlockInputCacheType { + // Capture both the promise and its callbacks for blockInput and final availability + // It is not spec'ed but in tests in Firefox and NodeJS the promise constructor is run immediately + let resolveBlockInput: ((block: BlockInput) => void) | null = null; + const blockInputPromise = new Promise((resolveCB) => { + resolveBlockInput = resolveCB; + }); + if (resolveBlockInput === null) { + throw Error("Promise Constructor was not executed immediately"); + } + if (!isForkBlobs(fork)) { + return {fork, blockInputPromise, resolveBlockInput}; + } + + let resolveAvailability: ((blobs: BlockInputDataBlobs) => void) | null = null; + const availabilityPromise = new Promise((resolveCB) => { + resolveAvailability = resolveCB; + }); + + if (resolveAvailability === null) { + throw Error("Promise Constructor was not executed immediately"); + } + + const blobsCache = new Map(); + const cachedData: CachedData = {fork, blobsCache, availabilityPromise, resolveAvailability}; + return {fork, blockInputPromise, resolveBlockInput, cachedData}; +} + +function generateBlobs(count: number): { + blobs: Uint8Array[]; + kzgCommitments: Uint8Array[]; + blobVersionedHashes: Uint8Array[]; + kzgProofs: Uint8Array[]; +} { + const blobs = Array.from({length: count}, (_, index) => generateRandomBlob(index)); + const kzgCommitments = blobs.map((blob) => ckzg.blobToKzgCommitment(blob)); + const versionedHash = kzgCommitments.map((kzgCommitment) => kzgCommitmentToVersionedHash(kzgCommitment)); + const kzgProofs = blobs.map((blob, index) => ckzg.computeBlobKzgProof(blob, kzgCommitments[index])); + + return { + blobs, + kzgCommitments, + blobVersionedHashes: versionedHash.map((hash) => hash), + kzgProofs, + }; +} + +function generateRandomBlob(index: number): deneb.Blob { + const blob = new Uint8Array(FIELD_ELEMENTS_PER_BLOB * BYTES_PER_FIELD_ELEMENT); + const dv = new DataView(blob.buffer, blob.byteOffset, blob.byteLength); + + for (let i = 0; i < FIELD_ELEMENTS_PER_BLOB; i++) { + // Generate a unique value based on the index + dv.setUint32(i * BYTES_PER_FIELD_ELEMENT, index + i); + } + return blob; +} diff --git a/packages/types/src/deneb/types.ts b/packages/types/src/deneb/types.ts index 7ee6648aeaf2..1bbabd0e4285 100644 --- a/packages/types/src/deneb/types.ts +++ b/packages/types/src/deneb/types.ts @@ -51,3 +51,7 @@ export type ProducedBlobSidecars = Omit; export type SignedBlockContents = ValueOf; export type Contents = Omit; +export type BlobAndProof = { + blob: Blob; + proof: KZGProof; +}; From cd0d850f6ad8418fc6ee1fae3eff6ffb52b6da91 Mon Sep 17 00:00:00 2001 From: harkamal Date: Sun, 3 Nov 2024 12:47:12 +0530 Subject: [PATCH 2/3] deblobs timeout --- packages/beacon-node/src/execution/engine/http.ts | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/packages/beacon-node/src/execution/engine/http.ts b/packages/beacon-node/src/execution/engine/http.ts index c9efbe22fcb0..ea064d2fe816 100644 --- a/packages/beacon-node/src/execution/engine/http.ts +++ b/packages/beacon-node/src/execution/engine/http.ts @@ -467,7 +467,7 @@ export class ExecutionEngineHttp implements IExecutionEngine { async getBlobs(_fork: ForkName, versionedHashes: VersionedHashes): Promise<(BlobAndProof | null)[]> { // retry only after a day may be - const GETBLOBS_RETRY_TIMEOUT = 256 * 12; + const GETBLOBS_RETRY_TIMEOUT = 256 * 32 * 12; const timeNow = Date.now() / 1000; const timeSinceLastFail = timeNow - this.lastGetBlobsErrorTime; if (timeSinceLastFail < GETBLOBS_RETRY_TIMEOUT) { From 11e21f0facb8cf1a8c036c07f2b8ff9055f942e8 Mon Sep 17 00:00:00 2001 From: harkamal Date: Sun, 3 Nov 2024 12:47:57 +0530 Subject: [PATCH 3/3] fix metric --- .../src/network/reqresp/beaconBlocksMaybeBlobsByRoot.ts | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/packages/beacon-node/src/network/reqresp/beaconBlocksMaybeBlobsByRoot.ts b/packages/beacon-node/src/network/reqresp/beaconBlocksMaybeBlobsByRoot.ts index d0c3b10cfbbf..3bbe00bfc56b 100644 --- a/packages/beacon-node/src/network/reqresp/beaconBlocksMaybeBlobsByRoot.ts +++ b/packages/beacon-node/src/network/reqresp/beaconBlocksMaybeBlobsByRoot.ts @@ -231,7 +231,7 @@ export async function unavailableBeaconBlobsByRoot( networkResBlobSidecars = await network.sendBlobSidecarsByRoot(peerId, networkReqIdentifiers); metrics?.blockInputFetchStats.dataPromiseBlobsFinallyAvailableFromNetwork.inc(networkResBlobSidecars.length); if (blockTriedBefore) { - metrics?.blockInputFetchStats.dataPromiseBlobsRetriedAvailableFromNetwork.inc(networkReqIdentifiers.length); + metrics?.blockInputFetchStats.dataPromiseBlobsRetriedAvailableFromNetwork.inc(networkResBlobSidecars.length); } } else { networkResBlobSidecars = [];