diff --git a/packages/beacon-node/src/chain/archiver/archiveBlocks.ts b/packages/beacon-node/src/chain/archiver/archiveBlocks.ts index 9814ff92d0cf..05f696c10a1d 100644 --- a/packages/beacon-node/src/chain/archiver/archiveBlocks.ts +++ b/packages/beacon-node/src/chain/archiver/archiveBlocks.ts @@ -2,8 +2,10 @@ import {fromHexString} from "@chainsafe/ssz"; import {Epoch, Slot} from "@lodestar/types"; import {IForkChoice} from "@lodestar/fork-choice"; import {ILogger, toHex} from "@lodestar/utils"; -import {SLOTS_PER_EPOCH} from "@lodestar/params"; -import {computeEpochAtSlot} from "@lodestar/state-transition"; +import {ForkSeq, SLOTS_PER_EPOCH} from "@lodestar/params"; +import {computeEpochAtSlot, computeStartSlotAtEpoch} from "@lodestar/state-transition"; +import {IKeyValue} from "@lodestar/db"; +import {IChainForkConfig} from "@lodestar/config"; import {IBeaconDb} from "../../db/index.js"; import {BlockArchiveBatchPutBinaryItem} from "../../db/repositories/index.js"; import {LightClientServer} from "../lightClient/index.js"; @@ -11,7 +13,8 @@ import {LightClientServer} from "../lightClient/index.js"; // Process in chunks to avoid OOM // this number of blocks per chunk is tested in e2e test blockArchive.test.ts // TODO: Review after merge since the size of blocks will increase significantly -const BATCH_SIZE = 256; +const BLOCK_BATCH_SIZE = 256; +const BLOB_SIDECAR_BATCH_SIZE = 32; type BlockRootSlot = {slot: Slot; root: Uint8Array}; @@ -25,16 +28,21 @@ type BlockRootSlot = {slot: Slot; root: Uint8Array}; * the next run should not reprocess finalzied block of this run. */ export async function archiveBlocks( + config: IChainForkConfig, db: IBeaconDb, forkChoice: IForkChoice, lightclientServer: LightClientServer, logger: ILogger, - finalizedCheckpoint: {rootHex: string; epoch: Epoch} + finalizedCheckpoint: {rootHex: string; epoch: Epoch}, + currentEpoch: Epoch ): Promise { // Use fork choice to determine the blocks to archive and delete const finalizedCanonicalBlocks = forkChoice.getAllAncestorBlocks(finalizedCheckpoint.rootHex); const finalizedNonCanonicalBlocks = forkChoice.getAllNonAncestorBlocks(finalizedCheckpoint.rootHex); + // NOTE: The finalized block will be exactly the first block of `epoch` or previous + const finalizedPostEIP4844 = finalizedCheckpoint.epoch >= config.EIP4844_FORK_EPOCH; + const finalizedCanonicalBlockRoots: BlockRootSlot[] = finalizedCanonicalBlocks.map((block) => ({ slot: block.slot, root: fromHexString(block.blockRoot), @@ -47,6 +55,11 @@ export async function archiveBlocks( toSlot: finalizedCanonicalBlockRoots[finalizedCanonicalBlockRoots.length - 1].slot, size: finalizedCanonicalBlockRoots.length, }); + + if (finalizedPostEIP4844) { + await migrateBlobsSidecarFromHotToColdDb(config, db, finalizedCanonicalBlockRoots); + logger.verbose("Migrated blobsSidecar from hot DB to cold DB"); + } } // deleteNonCanonicalBlocks @@ -55,9 +68,31 @@ export async function archiveBlocks( const nonCanonicalBlockRoots = finalizedNonCanonicalBlocks.map((summary) => fromHexString(summary.blockRoot)); if (nonCanonicalBlockRoots.length > 0) { await db.block.batchDelete(nonCanonicalBlockRoots); - logger.verbose("deleteNonCanonicalBlocks", { + logger.verbose("Deleted non canonical blocks from hot DB", { slots: finalizedNonCanonicalBlocks.map((summary) => summary.slot).join(","), }); + + if (finalizedPostEIP4844) { + await db.blobsSidecar.batchDelete(nonCanonicalBlockRoots); + logger.verbose("Deleted non canonical blobsSider from hot DB"); + } + } + + // Delete expired blobs + // Keep only `[max(GENESIS_EPOCH, current_epoch - MIN_EPOCHS_FOR_BLOBS_SIDECARS_REQUESTS), current_epoch]` + if (finalizedPostEIP4844) { + const blobsSidecarMinEpoch = currentEpoch - config.MIN_EPOCHS_FOR_BLOBS_SIDECARS_REQUESTS; + if (blobsSidecarMinEpoch > 0) { + const slotsToDelete = await db.blobsSidecarArchive.keys({lt: computeStartSlotAtEpoch(blobsSidecarMinEpoch)}); + if (slotsToDelete.length > 0) { + await db.blobsSidecarArchive.batchDelete(slotsToDelete); + logger.verbose( + `blobsSidecar prune: batchDelete range ${slotsToDelete[0]}..${slotsToDelete[slotsToDelete.length - 1]}` + ); + } else { + logger.verbose(`blobsSidecar prune: no entries before epoch ${blobsSidecarMinEpoch}`); + } + } } // Prunning potential checkpoint data @@ -79,8 +114,8 @@ async function migrateBlocksFromHotToColdDb(db: IBeaconDb, blocks: BlockRootSlot // Start from `i=0`: 1st block in iterateAncestorBlocks() is the finalized block itself // we move it to blockArchive but forkchoice still have it to check next onBlock calls // the next iterateAncestorBlocks call does not return this block - for (let i = 0; i < blocks.length; i += BATCH_SIZE) { - const toIdx = Math.min(i + BATCH_SIZE, blocks.length); + for (let i = 0; i < blocks.length; i += BLOCK_BATCH_SIZE) { + const toIdx = Math.min(i + BLOCK_BATCH_SIZE, blocks.length); const canonicalBlocks = blocks.slice(i, toIdx); // processCanonicalBlocks @@ -112,6 +147,39 @@ async function migrateBlocksFromHotToColdDb(db: IBeaconDb, blocks: BlockRootSlot } } +async function migrateBlobsSidecarFromHotToColdDb( + config: IChainForkConfig, + db: IBeaconDb, + blocks: BlockRootSlot[] +): Promise { + for (let i = 0; i < blocks.length; i += BLOB_SIDECAR_BATCH_SIZE) { + const toIdx = Math.min(i + BLOB_SIDECAR_BATCH_SIZE, blocks.length); + const canonicalBlocks = blocks.slice(i, toIdx); + + // processCanonicalBlocks + if (canonicalBlocks.length === 0) return; + + // load Buffer instead of ssz deserialized to improve performance + const canonicalBlobsSidecarEntries: IKeyValue[] = await Promise.all( + canonicalBlocks + .filter((block) => config.getForkSeq(block.slot) >= ForkSeq.eip4844) + .map(async (block) => { + const bytes = await db.blobsSidecar.getBinary(block.root); + if (!bytes) { + throw Error(`No blobsSidecar found for slot ${block.slot} root ${toHex(block.root)}`); + } + return {key: block.slot, value: bytes}; + }) + ); + + // put to blockArchive db and delete block db + await Promise.all([ + db.blobsSidecarArchive.batchPutBinary(canonicalBlobsSidecarEntries), + db.blobsSidecar.batchDelete(canonicalBlocks.map((block) => block.root)), + ]); + } +} + /** * ``` * class SignedBeaconBlock(Container): diff --git a/packages/beacon-node/src/chain/archiver/index.ts b/packages/beacon-node/src/chain/archiver/index.ts index d2b8a575e6b8..fdff0de6ef3b 100644 --- a/packages/beacon-node/src/chain/archiver/index.ts +++ b/packages/beacon-node/src/chain/archiver/index.ts @@ -73,7 +73,15 @@ export class Archiver { try { const finalizedEpoch = finalized.epoch; this.logger.verbose("Start processing finalized checkpoint", {epoch: finalizedEpoch}); - await archiveBlocks(this.db, this.chain.forkChoice, this.chain.lightClientServer, this.logger, finalized); + await archiveBlocks( + this.chain.config, + this.db, + this.chain.forkChoice, + this.chain.lightClientServer, + this.logger, + finalized, + this.chain.clock.currentEpoch + ); // should be after ArchiveBlocksTask to handle restart cleanly await this.statesArchiver.maybeArchiveState(finalized); diff --git a/packages/beacon-node/src/db/beacon.ts b/packages/beacon-node/src/db/beacon.ts index e4ad33ce2f43..4f4cf723749f 100644 --- a/packages/beacon-node/src/db/beacon.ts +++ b/packages/beacon-node/src/db/beacon.ts @@ -75,4 +75,12 @@ export class BeaconDb extends DatabaseService implements IBeaconDb { async stop(): Promise { await super.stop(); } + + async pruneHotDb(): Promise { + // Prune all hot blobs + await this.blobsSidecar.batchDelete(await this.blobsSidecar.keys()); + // Prune all hot blocks + // TODO: Enable once it's deemed safe + // await this.block.batchDelete(await this.block.keys()); + } } diff --git a/packages/beacon-node/src/db/interface.ts b/packages/beacon-node/src/db/interface.ts index 59b9c3630805..0a3c885db81c 100644 --- a/packages/beacon-node/src/db/interface.ts +++ b/packages/beacon-node/src/db/interface.ts @@ -58,6 +58,8 @@ export interface IBeaconDb { backfilledRanges: BackfilledRanges; + pruneHotDb(): Promise; + /** Start the connection to the db instance and open the db store. */ start(): Promise; /** Stop the connection to the db instance and close the db store. */ diff --git a/packages/beacon-node/src/node/nodejs.ts b/packages/beacon-node/src/node/nodejs.ts index 2d205649b78a..e245ef794c2c 100644 --- a/packages/beacon-node/src/node/nodejs.ts +++ b/packages/beacon-node/src/node/nodejs.ts @@ -149,6 +149,9 @@ export class BeaconNode { // start db if not already started await db.start(); + // Prune hot db repos + // TODO: Should this call be awaited? + await db.pruneHotDb(); let metrics = null; if (opts.metrics.enabled) { diff --git a/packages/beacon-node/test/unit/chain/archive/blockArchiver.test.ts b/packages/beacon-node/test/unit/chain/archive/blockArchiver.test.ts index 9dca61335960..a7c3b99631f7 100644 --- a/packages/beacon-node/test/unit/chain/archive/blockArchiver.test.ts +++ b/packages/beacon-node/test/unit/chain/archive/blockArchiver.test.ts @@ -2,6 +2,7 @@ import {expect} from "chai"; import sinon, {SinonStubbedInstance} from "sinon"; import {ssz} from "@lodestar/types"; import {ForkChoice} from "@lodestar/fork-choice"; +import {config} from "@lodestar/config/default"; import {fromHexString, toHexString} from "@chainsafe/ssz"; import {ZERO_HASH_HEX} from "../../../../src/constants/index.js"; import {generateProtoBlock, generateEmptySignedBlock} from "../../../utils/block.js"; @@ -33,9 +34,18 @@ describe("block archiver task", function () { ); const canonicalBlocks = [blocks[4], blocks[3], blocks[1], blocks[0]]; const nonCanonicalBlocks = [blocks[2]]; + const currentEpoch = 8; forkChoiceStub.getAllAncestorBlocks.returns(canonicalBlocks); forkChoiceStub.getAllNonAncestorBlocks.returns(nonCanonicalBlocks); - await archiveBlocks(dbStub, forkChoiceStub, lightclientServer, logger, {epoch: 5, rootHex: ZERO_HASH_HEX}); + await archiveBlocks( + config, + dbStub, + forkChoiceStub, + lightclientServer, + logger, + {epoch: 5, rootHex: ZERO_HASH_HEX}, + currentEpoch + ); expect(dbStub.blockArchive.batchPutBinary.getCall(0).args[0]).to.deep.equal( canonicalBlocks.map((summary) => ({ diff --git a/packages/beacon-node/test/utils/mocks/db.ts b/packages/beacon-node/test/utils/mocks/db.ts index b05314127d33..ad53fb75d28e 100644 --- a/packages/beacon-node/test/utils/mocks/db.ts +++ b/packages/beacon-node/test/utils/mocks/db.ts @@ -66,5 +66,6 @@ export function getStubbedBeaconDb(): IBeaconDb { async stop(): Promise {}, /** To inject metrics after CLI initialization */ setMetrics(): void {}, + async pruneHotDb(): Promise {}, }; }