Skip to content

Commit

Permalink
Skip serializing block after fetching from network
Browse files Browse the repository at this point in the history
  • Loading branch information
dapplion committed Jun 1, 2023
1 parent 7bf0b75 commit ec3869f
Show file tree
Hide file tree
Showing 29 changed files with 170 additions and 124 deletions.
6 changes: 4 additions & 2 deletions packages/beacon-node/src/api/impl/beacon/blocks/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -229,12 +229,14 @@ export function getBeaconBlockApi({
signedBlock,
BlockSource.api,
// The blobsSidecar will be replaced in the followup PRs with just blobs
blobsSidecar
blobsSidecar,
null
);
} else {
signedBlock = signedBlockOrContents as allForks.SignedBeaconBlock;
signedBlobs = [];
blockForImport = getBlockInput.preDeneb(config, signedBlock, BlockSource.api);
// TODO: Once API supports submitting data as SSZ, replace null with blockBytes
blockForImport = getBlockInput.preDeneb(config, signedBlock, BlockSource.api, null);
}

// Simple implementation of a pending block queue. Keeping the block here recycles the API logic, and keeps the
Expand Down
10 changes: 5 additions & 5 deletions packages/beacon-node/src/chain/blocks/index.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
import {WithOptionalBytes, allForks} from "@lodestar/types";
import {allForks} from "@lodestar/types";
import {toHex, isErrorAborted} from "@lodestar/utils";
import {JobItemQueue, isQueueErrorAborted} from "../../util/queue/index.js";
import {Metrics} from "../../metrics/metrics.js";
Expand All @@ -19,10 +19,10 @@ const QUEUE_MAX_LENGTH = 256;
* BlockProcessor processes block jobs in a queued fashion, one after the other.
*/
export class BlockProcessor {
readonly jobQueue: JobItemQueue<[WithOptionalBytes<BlockInput>[], ImportBlockOpts], void>;
readonly jobQueue: JobItemQueue<[BlockInput[], ImportBlockOpts], void>;

constructor(chain: BeaconChain, metrics: Metrics | null, opts: BlockProcessOpts, signal: AbortSignal) {
this.jobQueue = new JobItemQueue<[WithOptionalBytes<BlockInput>[], ImportBlockOpts], void>(
this.jobQueue = new JobItemQueue<[BlockInput[], ImportBlockOpts], void>(
(job, importOpts) => {
return processBlocks.call(chain, job, {...opts, ...importOpts});
},
Expand All @@ -31,7 +31,7 @@ export class BlockProcessor {
);
}

async processBlocksJob(job: WithOptionalBytes<BlockInput>[], opts: ImportBlockOpts = {}): Promise<void> {
async processBlocksJob(job: BlockInput[], opts: ImportBlockOpts = {}): Promise<void> {
await this.jobQueue.push(job, opts);
}
}
Expand All @@ -48,7 +48,7 @@ export class BlockProcessor {
*/
export async function processBlocks(
this: BeaconChain,
blocks: WithOptionalBytes<BlockInput>[],
blocks: BlockInput[],
opts: BlockProcessOpts & ImportBlockOpts
): Promise<void> {
if (blocks.length === 0) {
Expand Down
23 changes: 16 additions & 7 deletions packages/beacon-node/src/chain/blocks/types.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import {CachedBeaconStateAllForks, computeEpochAtSlot, DataAvailableStatus} from "@lodestar/state-transition";
import {MaybeValidExecutionStatus} from "@lodestar/fork-choice";
import {allForks, deneb, Slot, WithOptionalBytes} from "@lodestar/types";
import {allForks, deneb, Slot} from "@lodestar/types";
import {ForkSeq, MIN_EPOCHS_FOR_BLOB_SIDECARS_REQUESTS} from "@lodestar/params";
import {ChainForkConfig} from "@lodestar/config";

Expand All @@ -19,9 +19,10 @@ export enum BlockSource {
byRoot = "req_resp_by_root",
}

export type BlockInput =
| {type: BlockInputType.preDeneb; block: allForks.SignedBeaconBlock; source: BlockSource}
| {type: BlockInputType.postDeneb; block: allForks.SignedBeaconBlock; source: BlockSource; blobs: deneb.BlobsSidecar};
export type BlockInput = {block: allForks.SignedBeaconBlock; source: BlockSource; blockBytes: Uint8Array | null} & (
| {type: BlockInputType.preDeneb}
| {type: BlockInputType.postDeneb; blobs: deneb.BlobsSidecar}
);

export function blockRequiresBlobs(config: ChainForkConfig, blockSlot: Slot, clockSlot: Slot): boolean {
return (
Expand Down Expand Up @@ -51,22 +52,29 @@ export function blobSidecarsToBlobsSidecar(
}

export const getBlockInput = {
preDeneb(config: ChainForkConfig, block: allForks.SignedBeaconBlock, source: BlockSource): BlockInput {
preDeneb(
config: ChainForkConfig,
block: allForks.SignedBeaconBlock,
source: BlockSource,
blockBytes: Uint8Array | null
): BlockInput {
if (config.getForkSeq(block.message.slot) >= ForkSeq.deneb) {
throw Error(`Post Deneb block slot ${block.message.slot}`);
}
return {
type: BlockInputType.preDeneb,
block,
source,
blockBytes,
};
},

postDeneb(
config: ChainForkConfig,
block: allForks.SignedBeaconBlock,
source: BlockSource,
blobs: deneb.BlobsSidecar
blobs: deneb.BlobsSidecar,
blockBytes: Uint8Array | null
): BlockInput {
if (config.getForkSeq(block.message.slot) < ForkSeq.deneb) {
throw Error(`Pre Deneb block slot ${block.message.slot}`);
Expand All @@ -76,6 +84,7 @@ export const getBlockInput = {
block,
source,
blobs,
blockBytes,
};
},
};
Expand Down Expand Up @@ -130,7 +139,7 @@ export type ImportBlockOpts = {
* A wrapper around a `SignedBeaconBlock` that indicates that this block is fully verified and ready to import
*/
export type FullyVerifiedBlock = {
blockInput: WithOptionalBytes<BlockInput>;
blockInput: BlockInput;
postState: CachedBeaconStateAllForks;
parentBlockSlot: Slot;
proposerBalanceDelta: number;
Expand Down
4 changes: 2 additions & 2 deletions packages/beacon-node/src/chain/blocks/verifyBlock.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ import {
isStateValidatorsNodesPopulated,
DataAvailableStatus,
} from "@lodestar/state-transition";
import {WithOptionalBytes, bellatrix} from "@lodestar/types";
import {bellatrix} from "@lodestar/types";
import {ForkName} from "@lodestar/params";
import {toHexString} from "@chainsafe/ssz";
import {ProtoBlock} from "@lodestar/fork-choice";
Expand Down Expand Up @@ -37,7 +37,7 @@ import {writeBlockInputToDb} from "./writeBlockInputToDb.js";
export async function verifyBlocksInEpoch(
this: BeaconChain,
parentBlock: ProtoBlock,
blocksInput: WithOptionalBytes<BlockInput>[],
blocksInput: BlockInput[],
dataAvailabilityStatuses: DataAvailableStatus[],
opts: BlockProcessOpts & ImportBlockOpts
): Promise<{
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import {computeStartSlotAtEpoch, DataAvailableStatus} from "@lodestar/state-transition";
import {ChainForkConfig} from "@lodestar/config";
import {IForkChoice, ProtoBlock} from "@lodestar/fork-choice";
import {Slot, deneb, WithOptionalBytes} from "@lodestar/types";
import {Slot, deneb} from "@lodestar/types";
import {toHexString} from "@lodestar/utils";
import {IClock} from "../../util/clock.js";
import {BlockError, BlockErrorCode} from "../errors/index.js";
Expand All @@ -22,10 +22,10 @@ import {BlockInput, BlockInputType, ImportBlockOpts} from "./types.js";
*/
export function verifyBlocksSanityChecks(
chain: {forkChoice: IForkChoice; clock: IClock; config: ChainForkConfig},
blocks: WithOptionalBytes<BlockInput>[],
blocks: BlockInput[],
opts: ImportBlockOpts
): {
relevantBlocks: WithOptionalBytes<BlockInput>[];
relevantBlocks: BlockInput[];
dataAvailabilityStatuses: DataAvailableStatus[];
parentSlots: Slot[];
parentBlock: ProtoBlock | null;
Expand All @@ -34,7 +34,7 @@ export function verifyBlocksSanityChecks(
throw Error("Empty partiallyVerifiedBlocks");
}

const relevantBlocks: WithOptionalBytes<BlockInput>[] = [];
const relevantBlocks: BlockInput[] = [];
const dataAvailabilityStatuses: DataAvailableStatus[] = [];
const parentSlots: Slot[] = [];
let parentBlock: ProtoBlock | null = null;
Expand Down
18 changes: 6 additions & 12 deletions packages/beacon-node/src/chain/blocks/writeBlockInputToDb.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
import {WithOptionalBytes, allForks, deneb} from "@lodestar/types";
import {allForks, deneb} from "@lodestar/types";
import {toHex} from "@lodestar/utils";
import {BeaconChain} from "../chain.js";
import {BlockInput, BlockInputType} from "./types.js";
Expand All @@ -10,20 +10,17 @@ import {BlockInput, BlockInputType} from "./types.js";
* This operation may be performed before, during or after importing to the fork-choice. As long as errors
* are handled properly for eventual consistency.
*/
export async function writeBlockInputToDb(
this: BeaconChain,
blocksInput: WithOptionalBytes<BlockInput>[]
): Promise<void> {
export async function writeBlockInputToDb(this: BeaconChain, blocksInput: BlockInput[]): Promise<void> {
const fnPromises: Promise<void>[] = [];

for (const blockInput of blocksInput) {
const {block, serializedData, type} = blockInput;
const {block, blockBytes, type} = blockInput;
const blockRoot = this.config.getForkTypes(block.message.slot).BeaconBlock.hashTreeRoot(block.message);
const blockRootHex = toHex(blockRoot);
if (serializedData) {
if (blockBytes) {
// skip serializing data if we already have it
this.metrics?.importBlock.persistBlockWithSerializedDataCount.inc();
fnPromises.push(this.db.block.putBinary(this.db.block.getId(block), serializedData));
fnPromises.push(this.db.block.putBinary(this.db.block.getId(block), blockBytes));
} else {
this.metrics?.importBlock.persistBlockNoSerializedDataCount.inc();
fnPromises.push(this.db.block.add(block));
Expand Down Expand Up @@ -51,10 +48,7 @@ export async function writeBlockInputToDb(
/**
* Prunes eagerly persisted block inputs only if not known to the fork-choice
*/
export async function removeEagerlyPersistedBlockInputs(
this: BeaconChain,
blockInputs: WithOptionalBytes<BlockInput>[]
): Promise<void> {
export async function removeEagerlyPersistedBlockInputs(this: BeaconChain, blockInputs: BlockInput[]): Promise<void> {
const blockToRemove: allForks.SignedBeaconBlock[] = [];
const blobsToRemove: deneb.BlobsSidecar[] = [];

Expand Down
5 changes: 2 additions & 3 deletions packages/beacon-node/src/chain/chain.ts
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@ import {
ValidatorIndex,
deneb,
Wei,
WithOptionalBytes,
bellatrix,
} from "@lodestar/types";
import {CheckpointWithHex, ExecutionStatus, IForkChoice, ProtoBlock} from "@lodestar/fork-choice";
Expand Down Expand Up @@ -564,11 +563,11 @@ export class BeaconChain implements IBeaconChain {
return blobSidecars;
}

async processBlock(block: WithOptionalBytes<BlockInput>, opts?: ImportBlockOpts): Promise<void> {
async processBlock(block: BlockInput, opts?: ImportBlockOpts): Promise<void> {
return this.blockProcessor.processBlocksJob([block], opts);
}

async processChainSegment(blocks: WithOptionalBytes<BlockInput>[], opts?: ImportBlockOpts): Promise<void> {
async processChainSegment(blocks: BlockInput[], opts?: ImportBlockOpts): Promise<void> {
return this.blockProcessor.processBlocksJob(blocks, opts);
}

Expand Down
18 changes: 3 additions & 15 deletions packages/beacon-node/src/chain/interface.ts
Original file line number Diff line number Diff line change
@@ -1,16 +1,4 @@
import {
allForks,
UintNum64,
Root,
phase0,
Slot,
RootHex,
Epoch,
ValidatorIndex,
deneb,
Wei,
WithOptionalBytes,
} from "@lodestar/types";
import {allForks, UintNum64, Root, phase0, Slot, RootHex, Epoch, ValidatorIndex, deneb, Wei} from "@lodestar/types";
import {
BeaconStateAllForks,
CachedBeaconStateAllForks,
Expand Down Expand Up @@ -155,9 +143,9 @@ export interface IBeaconChain {
produceBlindedBlock(blockAttributes: BlockAttributes): Promise<{block: allForks.BlindedBeaconBlock; blockValue: Wei}>;

/** Process a block until complete */
processBlock(block: WithOptionalBytes<BlockInput>, opts?: ImportBlockOpts): Promise<void>;
processBlock(block: BlockInput, opts?: ImportBlockOpts): Promise<void>;
/** Process a chain of blocks until complete */
processChainSegment(blocks: WithOptionalBytes<BlockInput>[], opts?: ImportBlockOpts): Promise<void>;
processChainSegment(blocks: BlockInput[], opts?: ImportBlockOpts): Promise<void>;

getStatus(): phase0.Status;

Expand Down
6 changes: 4 additions & 2 deletions packages/beacon-node/src/network/interface.ts
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,8 @@ import {GossipType} from "./gossip/interface.js";
import {PendingGossipsubMessage} from "./processor/types.js";
import {PeerAction} from "./peers/index.js";

export type WithBytes<T> = {data: T; bytes: Uint8Array};

/**
* The architecture of the network looks like so:
* - core:
Expand All @@ -35,11 +37,11 @@ export interface INetwork extends INetworkCorePublic {
sendBeaconBlocksByRange(
peerId: PeerIdStr,
request: phase0.BeaconBlocksByRangeRequest
): Promise<allForks.SignedBeaconBlock[]>;
): Promise<WithBytes<allForks.SignedBeaconBlock>[]>;
sendBeaconBlocksByRoot(
peerId: PeerIdStr,
request: phase0.BeaconBlocksByRootRequest
): Promise<allForks.SignedBeaconBlock[]>;
): Promise<WithBytes<allForks.SignedBeaconBlock>[]>;
sendBlobSidecarsByRange(peerId: PeerIdStr, request: deneb.BlobSidecarsByRangeRequest): Promise<deneb.BlobSidecar[]>;
sendBlobSidecarsByRoot(peerId: PeerIdStr, request: deneb.BlobSidecarsByRootRequest): Promise<deneb.BlobSidecar[]>;

Expand Down
14 changes: 9 additions & 5 deletions packages/beacon-node/src/network/network.ts
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ import {PeerIdStr, peerIdToString} from "../util/peerId.js";
import {IClock} from "../util/clock.js";
import {BlockInput, BlockInputType} from "../chain/blocks/types.js";
import {NetworkOptions} from "./options.js";
import {INetwork} from "./interface.js";
import {WithBytes, INetwork} from "./interface.js";
import {ReqRespMethod} from "./reqresp/index.js";
import {GossipHandlers, GossipTopicMap, GossipType, GossipTypeMap} from "./gossip/index.js";
import {PeerAction, PeerScoreStats} from "./peers/index.js";
Expand All @@ -25,7 +25,11 @@ import {CommitteeSubscription} from "./subnets/index.js";
import {isPublishToZeroPeersError} from "./util.js";
import {NetworkProcessor, PendingGossipsubMessage} from "./processor/index.js";
import {INetworkCore, NetworkCore, WorkerNetworkCore} from "./core/index.js";
import {collectExactOneTyped, collectMaxResponseTyped} from "./reqresp/utils/collect.js";
import {
collectExactOneTyped,
collectMaxResponseTyped,
collectMaxResponseTypedWithBytes,
} from "./reqresp/utils/collect.js";
import {GetReqRespHandlerFn, Version, requestSszTypeByMethod, responseSszTypeByMethod} from "./reqresp/types.js";
import {collectSequentialBlocksInRange} from "./reqresp/utils/collectSequentialBlocksInRange.js";
import {getGossipSSZType, gossipTopicIgnoreDuplicatePublishError, stringifyGossipTopic} from "./gossip/topic.js";
Expand Down Expand Up @@ -409,7 +413,7 @@ export class Network implements INetwork {
async sendBeaconBlocksByRange(
peerId: PeerIdStr,
request: phase0.BeaconBlocksByRangeRequest
): Promise<allForks.SignedBeaconBlock[]> {
): Promise<WithBytes<allForks.SignedBeaconBlock>[]> {
return collectSequentialBlocksInRange(
this.sendReqRespRequest(
peerId,
Expand All @@ -425,8 +429,8 @@ export class Network implements INetwork {
async sendBeaconBlocksByRoot(
peerId: PeerIdStr,
request: phase0.BeaconBlocksByRootRequest
): Promise<allForks.SignedBeaconBlock[]> {
return collectMaxResponseTyped(
): Promise<WithBytes<allForks.SignedBeaconBlock>[]> {
return collectMaxResponseTypedWithBytes(
this.sendReqRespRequest(
peerId,
ReqRespMethod.BeaconBlocksByRoot,
Expand Down
17 changes: 7 additions & 10 deletions packages/beacon-node/src/network/processor/gossipHandlers.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import {toHexString} from "@chainsafe/ssz";
import {BeaconConfig} from "@lodestar/config";
import {Logger, prettyBytes} from "@lodestar/utils";
import {Root, Slot, ssz, WithBytes} from "@lodestar/types";
import {Root, Slot, ssz} from "@lodestar/types";
import {ForkName, ForkSeq} from "@lodestar/params";
import {Metrics} from "../../metrics/index.js";
import {OpSource} from "../../metrics/validatorMonitor.js";
Expand Down Expand Up @@ -125,11 +125,7 @@ export function getGossipHandlers(modules: ValidatorFnsModules, options: GossipH
}
}

function handleValidBeaconBlock(
blockInput: WithBytes<BlockInput>,
peerIdStr: string,
seenTimestampSec: number
): void {
function handleValidBeaconBlock(blockInput: BlockInput, peerIdStr: string, seenTimestampSec: number): void {
const signedBlock = blockInput.block;

// Handler - MUST NOT `await`, to allow validation result to be propagated
Expand Down Expand Up @@ -185,9 +181,9 @@ export function getGossipHandlers(modules: ValidatorFnsModules, options: GossipH
throw new GossipActionError(GossipAction.REJECT, {code: "POST_DENEB_BLOCK"});
}

const blockInput = getBlockInput.preDeneb(config, signedBlock, BlockSource.gossip);
const blockInput = getBlockInput.preDeneb(config, signedBlock, BlockSource.gossip, serializedData);
await validateBeaconBlock(blockInput, topic.fork, peerIdStr, seenTimestampSec);
handleValidBeaconBlock({...blockInput, serializedData}, peerIdStr, seenTimestampSec);
handleValidBeaconBlock(blockInput, peerIdStr, seenTimestampSec);
},

[GossipType.beacon_block_and_blobs_sidecar]: async ({serializedData}, topic, peerIdStr, seenTimestampSec) => {
Expand All @@ -199,10 +195,11 @@ export function getGossipHandlers(modules: ValidatorFnsModules, options: GossipH
}

// Validate block + blob. Then forward, then handle both
const blockInput = getBlockInput.postDeneb(config, beaconBlock, BlockSource.gossip, blobsSidecar);
// TODO DENEB: replace null with proper binary data for block and blobs separately
const blockInput = getBlockInput.postDeneb(config, beaconBlock, BlockSource.gossip, blobsSidecar, null);
await validateBeaconBlock(blockInput, topic.fork, peerIdStr, seenTimestampSec);
validateGossipBlobsSidecar(beaconBlock, blobsSidecar);
handleValidBeaconBlock({...blockInput, serializedData}, peerIdStr, seenTimestampSec);
handleValidBeaconBlock(blockInput, peerIdStr, seenTimestampSec);
},

[GossipType.beacon_aggregate_and_proof]: async ({serializedData}, topic, _peer, seenTimestampSec) => {
Expand Down
Loading

0 comments on commit ec3869f

Please sign in to comment.