Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: add and use getBlobsV1 to expedite gossip import #7134

Merged
merged 3 commits into from
Nov 3, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 5 additions & 5 deletions packages/beacon-node/src/api/impl/beacon/blocks/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -215,12 +215,12 @@ export function getBeaconBlockApi({
// specification is very clear that this is the desired behaviour.
//
// i) Publish blobs and block before importing so that network can see them asap
// ii) publish blobs first because
// a) by the times nodes see block, they might decide to pull blobs
// b) they might require more hops to reach recipients in peerDAS kind of setup where
// blobs might need to hop between nodes because of partial subnet subscription
...blobSidecars.map((blobSidecar) => () => network.publishBlobSidecar(blobSidecar)),
// ii) publish block first because
// a) as soon as node sees block they can start processing it while blobs arrive
// b) getting block first allows nodes to use getBlobs from local ELs and save
// import latency and hopefully bandwidth
() => network.publishBeaconBlock(signedBlock) as Promise<unknown>,
...blobSidecars.map((blobSidecar) => () => network.publishBlobSidecar(blobSidecar)),
() =>
// there is no rush to persist block since we published it to gossip anyway
chain
Expand Down
2 changes: 1 addition & 1 deletion packages/beacon-node/src/chain/blocks/verifyBlock.ts
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,7 @@ export async function verifyBlocksInEpoch(
} as SegmentExecStatus),

// data availability for the blobs
verifyBlocksDataAvailability(this, blocksInput, opts),
verifyBlocksDataAvailability(this, blocksInput, abortController.signal, opts),

// Run state transition only
// TODO: Ensure it yields to allow flushing to workers and engine API
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ import {ChainForkConfig} from "@lodestar/config";
import {DataAvailabilityStatus} from "@lodestar/fork-choice";
import {computeTimeAtSlot} from "@lodestar/state-transition";
import {UintNum64, deneb} from "@lodestar/types";
import {Logger} from "@lodestar/utils";
import {ErrorAborted, Logger} from "@lodestar/utils";
import {Metrics} from "../../metrics/metrics.js";
import {BlockError, BlockErrorCode} from "../errors/index.js";
import {validateBlobSidecars} from "../validation/blobSidecar.js";
Expand All @@ -27,6 +27,7 @@ const BLOB_AVAILABILITY_TIMEOUT = 12_000;
export async function verifyBlocksDataAvailability(
chain: {config: ChainForkConfig; genesisTime: UintNum64; logger: Logger; metrics: Metrics | null},
blocks: BlockInput[],
signal: AbortSignal,
opts: ImportBlockOpts
): Promise<{
dataAvailabilityStatuses: DataAvailabilityStatus[];
Expand All @@ -43,9 +44,12 @@ export async function verifyBlocksDataAvailability(
const availableBlockInputs: BlockInput[] = [];

for (const blockInput of blocks) {
if (signal.aborted) {
throw new ErrorAborted("verifyBlocksDataAvailability");
}
// Validate status of only not yet finalized blocks, we don't need yet to propogate the status
// as it is not used upstream anywhere
const {dataAvailabilityStatus, availableBlockInput} = await maybeValidateBlobs(chain, blockInput, opts);
const {dataAvailabilityStatus, availableBlockInput} = await maybeValidateBlobs(chain, blockInput, signal, opts);
dataAvailabilityStatuses.push(dataAvailabilityStatus);
availableBlockInputs.push(availableBlockInput);
}
Expand All @@ -69,6 +73,7 @@ export async function verifyBlocksDataAvailability(
async function maybeValidateBlobs(
chain: {config: ChainForkConfig; genesisTime: UintNum64; logger: Logger},
blockInput: BlockInput,
signal: AbortSignal,
opts: ImportBlockOpts
): Promise<{dataAvailabilityStatus: DataAvailabilityStatus; availableBlockInput: BlockInput}> {
switch (blockInput.type) {
Expand All @@ -92,7 +97,7 @@ async function maybeValidateBlobs(
const blobsData =
blockInput.type === BlockInputType.availableData
? blockInput.blockData
: await raceWithCutoff(chain, blockInput, blockInput.cachedData.availabilityPromise);
: await raceWithCutoff(chain, blockInput, blockInput.cachedData.availabilityPromise, signal);
const {blobs} = blobsData;

const {blobKzgCommitments} = (block as deneb.SignedBeaconBlock).message.body;
Expand Down Expand Up @@ -122,16 +127,21 @@ async function maybeValidateBlobs(
async function raceWithCutoff<T>(
chain: {config: ChainForkConfig; genesisTime: UintNum64; logger: Logger},
blockInput: BlockInput,
availabilityPromise: Promise<T>
availabilityPromise: Promise<T>,
signal: AbortSignal
): Promise<T> {
const {block} = blockInput;
const blockSlot = block.message.slot;

const cutoffTime = Math.max(
computeTimeAtSlot(chain.config, blockSlot, chain.genesisTime) * 1000 + BLOB_AVAILABILITY_TIMEOUT - Date.now(),
0
);
const cutoffTimeout = new Promise((_resolve, reject) => setTimeout(reject, cutoffTime));
const cutoffTime =
computeTimeAtSlot(chain.config, blockSlot, chain.genesisTime) * 1000 + BLOB_AVAILABILITY_TIMEOUT - Date.now();
g11tech marked this conversation as resolved.
Show resolved Hide resolved
const cutoffTimeout =
cutoffTime > 0
? new Promise((_resolve, reject) => {
setTimeout(() => reject(new Error("Timeout exceeded")), cutoffTime);
signal.addEventListener("abort", () => reject(signal.reason));
})
: Promise.reject(new Error("Cutoff time must be greater than 0"));
chain.logger.debug("Racing for blob availabilityPromise", {blockSlot, cutoffTime});

try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -372,7 +372,7 @@ export class HttpRpcError extends Error {
/**
* JSON RPC spec errors https://www.jsonrpc.org/specification#response_object
*/
function parseJsonRpcErrorCode(code: number): string {
export function parseJsonRpcErrorCode(code: number): string {
if (code === -32700) return "Parse request error";
if (code === -32600) return "Invalid request object";
if (code === -32601) return "Method not found";
Expand Down
4 changes: 4 additions & 0 deletions packages/beacon-node/src/execution/engine/disabled.ts
Original file line number Diff line number Diff line change
Expand Up @@ -28,4 +28,8 @@ export class ExecutionEngineDisabled implements IExecutionEngine {
getPayloadBodiesByRange(): Promise<never> {
throw Error("Execution engine disabled");
}

getBlobs(): Promise<never> {
throw Error("Execution engine disabled");
}
}
57 changes: 56 additions & 1 deletion packages/beacon-node/src/execution/engine/http.ts
Original file line number Diff line number Diff line change
@@ -1,14 +1,16 @@
import {Logger} from "@lodestar/logger";
import {ForkName, ForkSeq, SLOTS_PER_EPOCH} from "@lodestar/params";
import {ExecutionPayload, ExecutionRequests, Root, RootHex, Wei} from "@lodestar/types";
import {BlobAndProof} from "@lodestar/types/deneb";
import {
ErrorJsonRpcResponse,
HttpRpcError,
IJsonRpcHttpClient,
JsonRpcHttpClientEvent,
ReqOpts,
parseJsonRpcErrorCode,
} from "../../eth1/provider/jsonRpcHttpClient.js";
import {numToQuantity} from "../../eth1/provider/utils.js";
import {bytesToData, numToQuantity} from "../../eth1/provider/utils.js";
import {Metrics} from "../../metrics/index.js";
import {EPOCHS_PER_BATCH} from "../../sync/constants.js";
import {getLodestarClientVersion} from "../../util/metadata.js";
Expand All @@ -31,6 +33,7 @@ import {
EngineApiRpcReturnTypes,
ExecutionPayloadBody,
assertReqSizeLimit,
deserializeBlobAndProofs,
deserializeExecutionPayloadBody,
parseExecutionPayload,
serializeBeaconBlockRoot,
Expand Down Expand Up @@ -111,6 +114,7 @@ const getPayloadOpts: ReqOpts = {routeId: "getPayload"};
*/
export class ExecutionEngineHttp implements IExecutionEngine {
private logger: Logger;
private lastGetBlobsErrorTime = 0;

// The default state is ONLINE, it will be updated to SYNCING once we receive the first payload
// This assumption is better than the OFFLINE state, since we can't be sure if the EL is offline and being offline may trigger some notifications
Expand Down Expand Up @@ -461,6 +465,57 @@ export class ExecutionEngineHttp implements IExecutionEngine {
return response.map(deserializeExecutionPayloadBody);
}

async getBlobs(_fork: ForkName, versionedHashes: VersionedHashes): Promise<(BlobAndProof | null)[]> {
g11tech marked this conversation as resolved.
Show resolved Hide resolved
// retry only after a day may be
const GETBLOBS_RETRY_TIMEOUT = 256 * 32 * 12;
const timeNow = Date.now() / 1000;
const timeSinceLastFail = timeNow - this.lastGetBlobsErrorTime;
if (timeSinceLastFail < GETBLOBS_RETRY_TIMEOUT) {
// do not try getblobs since it might not be available
this.logger.debug(
`disabled engine_getBlobsV1 api call since last failed < GETBLOBS_RETRY_TIMEOUT=${GETBLOBS_RETRY_TIMEOUT}`,
timeSinceLastFail
);
throw Error(
`engine_getBlobsV1 call recently failed timeSinceLastFail=${timeSinceLastFail} < GETBLOBS_RETRY_TIMEOUT=${GETBLOBS_RETRY_TIMEOUT}`
);
}

const method = "engine_getBlobsV1";
assertReqSizeLimit(versionedHashes.length, 128);
const versionedHashesHex = versionedHashes.map(bytesToData);
let response = await this.rpc
.fetchWithRetries<EngineApiRpcReturnTypes[typeof method], EngineApiRpcParamTypes[typeof method]>({
method,
params: [versionedHashesHex],
})
.catch((e) => {
if (e instanceof ErrorJsonRpcResponse && parseJsonRpcErrorCode(e.response.error.code) === "Method not found") {
this.lastGetBlobsErrorTime = timeNow;
this.logger.debug("disabling engine_getBlobsV1 api call since engine responded with method not availeble", {
retryTimeout: GETBLOBS_RETRY_TIMEOUT,
});
}
throw e;
});

// handle nethermind buggy response
g11tech marked this conversation as resolved.
Show resolved Hide resolved
// see: https://discord.com/channels/595666850260713488/1293605631785304088/1298956894274060301
if (
(response as unknown as {blobsAndProofs: EngineApiRpcReturnTypes[typeof method]}).blobsAndProofs !== undefined
) {
response = (response as unknown as {blobsAndProofs: EngineApiRpcReturnTypes[typeof method]}).blobsAndProofs;
}

if (response.length !== versionedHashes.length) {
const error = `Invalid engine_getBlobsV1 response length=${response.length} versionedHashes=${versionedHashes.length}`;
this.logger.error(error);
throw Error(error);
}

return response.map(deserializeBlobAndProofs);
}

private async getClientVersion(clientVersion: ClientVersion): Promise<ClientVersion[]> {
const method = "engine_getClientVersionV1";

Expand Down
4 changes: 3 additions & 1 deletion packages/beacon-node/src/execution/engine/interface.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import {ForkName} from "@lodestar/params";
import {ExecutionPayload, ExecutionRequests, Root, RootHex, Wei, capella} from "@lodestar/types";
import {Blob, KZGCommitment, KZGProof} from "@lodestar/types/deneb";
import {Blob, BlobAndProof, KZGCommitment, KZGProof} from "@lodestar/types/deneb";

import {DATA} from "../../eth1/provider/utils.js";
import {PayloadId, PayloadIdCache, WithdrawalV1} from "./payloadIdCache.js";
Expand Down Expand Up @@ -179,4 +179,6 @@ export interface IExecutionEngine {
getPayloadBodiesByHash(fork: ForkName, blockHash: DATA[]): Promise<(ExecutionPayloadBody | null)[]>;

getPayloadBodiesByRange(fork: ForkName, start: number, count: number): Promise<(ExecutionPayloadBody | null)[]>;

getBlobs(fork: ForkName, versionedHashes: VersionedHashes): Promise<(BlobAndProof | null)[]>;
}
7 changes: 7 additions & 0 deletions packages/beacon-node/src/execution/engine/mock.ts
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,7 @@ export class ExecutionEngineMockBackend implements JsonRpcBackend {
engine_getPayloadBodiesByHashV1: this.getPayloadBodiesByHash.bind(this),
engine_getPayloadBodiesByRangeV1: this.getPayloadBodiesByRange.bind(this),
engine_getClientVersionV1: this.getClientVersionV1.bind(this),
engine_getBlobsV1: this.getBlobs.bind(this),
};
}

Expand Down Expand Up @@ -397,6 +398,12 @@ export class ExecutionEngineMockBackend implements JsonRpcBackend {
return [{code: ClientCode.XX, name: "mock", version: "", commit: ""}];
}

private getBlobs(
versionedHashes: EngineApiRpcParamTypes["engine_getBlobsV1"][0]
): EngineApiRpcReturnTypes["engine_getBlobsV1"] {
return versionedHashes.map((_vh) => null);
}

private timestampToFork(timestamp: number): ForkExecution {
if (timestamp > (this.opts.electraForkTimestamp ?? Infinity)) return ForkName.electra;
if (timestamp > (this.opts.denebForkTimestamp ?? Infinity)) return ForkName.deneb;
Expand Down
19 changes: 19 additions & 0 deletions packages/beacon-node/src/execution/engine/types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import {
ForkSeq,
} from "@lodestar/params";
import {ExecutionPayload, ExecutionRequests, Root, Wei, bellatrix, capella, deneb, electra, ssz} from "@lodestar/types";
import {BlobAndProof} from "@lodestar/types/deneb";

import {
DATA,
Expand Down Expand Up @@ -67,6 +68,8 @@ export type EngineApiRpcParamTypes = {
* Object - Instance of ClientVersion
*/
engine_getClientVersionV1: [ClientVersionRpc];

engine_getBlobsV1: [DATA[]];
};

export type PayloadStatus = {
Expand Down Expand Up @@ -109,6 +112,8 @@ export type EngineApiRpcReturnTypes = {
engine_getPayloadBodiesByRangeV1: (ExecutionPayloadBodyRpc | null)[];

engine_getClientVersionV1: ClientVersionRpc[];

engine_getBlobsV1: (BlobAndProofRpc | null)[];
};

type ExecutionPayloadRpcWithValue = {
Expand Down Expand Up @@ -171,6 +176,11 @@ export type DepositRequestsRpc = DATA;
export type WithdrawalRequestsRpc = DATA;
export type ConsolidationRequestsRpc = DATA;

export type BlobAndProofRpc = {
blob: DATA;
proof: DATA;
};

export type VersionedHashesRpc = DATA[];

export type PayloadAttributesRpc = {
Expand Down Expand Up @@ -462,6 +472,15 @@ export function serializeExecutionPayloadBody(data: ExecutionPayloadBody | null)
: null;
}

export function deserializeBlobAndProofs(data: BlobAndProofRpc | null): BlobAndProof | null {
return data
? {
blob: dataToBytes(data.blob, BYTES_PER_FIELD_ELEMENT * FIELD_ELEMENTS_PER_BLOB),
proof: dataToBytes(data.proof, 48),
}
: null;
}

export function assertReqSizeLimit(blockHashesReqCount: number, count: number): void {
if (blockHashesReqCount > count) {
throw new Error(`Requested blocks must not be > ${count}`);
Expand Down
Loading
Loading