Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

EIP-4844: Archive hot blobs and prune old blobs #4862

Merged
merged 3 commits into from
Dec 7, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
82 changes: 75 additions & 7 deletions packages/beacon-node/src/chain/archiver/archiveBlocks.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,16 +2,19 @@ import {fromHexString} from "@chainsafe/ssz";
import {Epoch, Slot} from "@lodestar/types";
import {IForkChoice} from "@lodestar/fork-choice";
import {ILogger, toHex} from "@lodestar/utils";
import {SLOTS_PER_EPOCH} from "@lodestar/params";
import {computeEpochAtSlot} from "@lodestar/state-transition";
import {ForkSeq, SLOTS_PER_EPOCH} from "@lodestar/params";
import {computeEpochAtSlot, computeStartSlotAtEpoch} from "@lodestar/state-transition";
import {IKeyValue} from "@lodestar/db";
import {IChainForkConfig} from "@lodestar/config";
import {IBeaconDb} from "../../db/index.js";
import {BlockArchiveBatchPutBinaryItem} from "../../db/repositories/index.js";
import {LightClientServer} from "../lightClient/index.js";

// Process in chunks to avoid OOM
// this number of blocks per chunk is tested in e2e test blockArchive.test.ts
// TODO: Review after merge since the size of blocks will increase significantly
const BATCH_SIZE = 256;
const BLOCK_BATCH_SIZE = 256;
const BLOB_SIDECAR_BATCH_SIZE = 32;

type BlockRootSlot = {slot: Slot; root: Uint8Array};

Expand All @@ -25,16 +28,21 @@ type BlockRootSlot = {slot: Slot; root: Uint8Array};
* the next run should not reprocess finalzied block of this run.
*/
export async function archiveBlocks(
config: IChainForkConfig,
db: IBeaconDb,
forkChoice: IForkChoice,
lightclientServer: LightClientServer,
logger: ILogger,
finalizedCheckpoint: {rootHex: string; epoch: Epoch}
finalizedCheckpoint: {rootHex: string; epoch: Epoch},
currentEpoch: Epoch
): Promise<void> {
// Use fork choice to determine the blocks to archive and delete
const finalizedCanonicalBlocks = forkChoice.getAllAncestorBlocks(finalizedCheckpoint.rootHex);
const finalizedNonCanonicalBlocks = forkChoice.getAllNonAncestorBlocks(finalizedCheckpoint.rootHex);

// NOTE: The finalized block will be exactly the first block of `epoch` or previous
const finalizedPostEIP4844 = finalizedCheckpoint.epoch >= config.EIP4844_FORK_EPOCH;

const finalizedCanonicalBlockRoots: BlockRootSlot[] = finalizedCanonicalBlocks.map((block) => ({
slot: block.slot,
root: fromHexString(block.blockRoot),
Expand All @@ -47,6 +55,11 @@ export async function archiveBlocks(
toSlot: finalizedCanonicalBlockRoots[finalizedCanonicalBlockRoots.length - 1].slot,
size: finalizedCanonicalBlockRoots.length,
});

if (finalizedPostEIP4844) {
await migrateBlobsSidecarFromHotToColdDb(config, db, finalizedCanonicalBlockRoots);
logger.verbose("Migrated blobsSidecar from hot DB to cold DB");
}
}

// deleteNonCanonicalBlocks
Expand All @@ -55,9 +68,31 @@ export async function archiveBlocks(
const nonCanonicalBlockRoots = finalizedNonCanonicalBlocks.map((summary) => fromHexString(summary.blockRoot));
if (nonCanonicalBlockRoots.length > 0) {
await db.block.batchDelete(nonCanonicalBlockRoots);
logger.verbose("deleteNonCanonicalBlocks", {
logger.verbose("Deleted non canonical blocks from hot DB", {
slots: finalizedNonCanonicalBlocks.map((summary) => summary.slot).join(","),
});

if (finalizedPostEIP4844) {
await db.blobsSidecar.batchDelete(nonCanonicalBlockRoots);
logger.verbose("Deleted non canonical blobsSider from hot DB");
}
}

// Delete expired blobs
// Keep only `[max(GENESIS_EPOCH, current_epoch - MIN_EPOCHS_FOR_BLOBS_SIDECARS_REQUESTS), current_epoch]`
if (finalizedPostEIP4844) {
const blobsSidecarMinEpoch = currentEpoch - config.MIN_EPOCHS_FOR_BLOBS_SIDECARS_REQUESTS;
if (blobsSidecarMinEpoch > 0) {
const slotsToDelete = await db.blobsSidecarArchive.keys({lt: computeStartSlotAtEpoch(blobsSidecarMinEpoch)});
if (slotsToDelete.length > 0) {
await db.blobsSidecarArchive.batchDelete(slotsToDelete);
logger.verbose(
`blobsSidecar prune: batchDelete range ${slotsToDelete[0]}..${slotsToDelete[slotsToDelete.length - 1]}`
);
} else {
logger.verbose(`blobsSidecar prune: no entries before epoch ${blobsSidecarMinEpoch}`);
}
}
}

// Prunning potential checkpoint data
Expand All @@ -79,8 +114,8 @@ async function migrateBlocksFromHotToColdDb(db: IBeaconDb, blocks: BlockRootSlot
// Start from `i=0`: 1st block in iterateAncestorBlocks() is the finalized block itself
// we move it to blockArchive but forkchoice still have it to check next onBlock calls
// the next iterateAncestorBlocks call does not return this block
for (let i = 0; i < blocks.length; i += BATCH_SIZE) {
const toIdx = Math.min(i + BATCH_SIZE, blocks.length);
for (let i = 0; i < blocks.length; i += BLOCK_BATCH_SIZE) {
const toIdx = Math.min(i + BLOCK_BATCH_SIZE, blocks.length);
const canonicalBlocks = blocks.slice(i, toIdx);

// processCanonicalBlocks
Expand Down Expand Up @@ -112,6 +147,39 @@ async function migrateBlocksFromHotToColdDb(db: IBeaconDb, blocks: BlockRootSlot
}
}

async function migrateBlobsSidecarFromHotToColdDb(
config: IChainForkConfig,
db: IBeaconDb,
blocks: BlockRootSlot[]
): Promise<void> {
for (let i = 0; i < blocks.length; i += BLOB_SIDECAR_BATCH_SIZE) {
const toIdx = Math.min(i + BLOB_SIDECAR_BATCH_SIZE, blocks.length);
const canonicalBlocks = blocks.slice(i, toIdx);

// processCanonicalBlocks
if (canonicalBlocks.length === 0) return;

// load Buffer instead of ssz deserialized to improve performance
const canonicalBlobsSidecarEntries: IKeyValue<Slot, Uint8Array>[] = await Promise.all(
canonicalBlocks
.filter((block) => config.getForkSeq(block.slot) >= ForkSeq.eip4844)
.map(async (block) => {
const bytes = await db.blobsSidecar.getBinary(block.root);
if (!bytes) {
throw Error(`No blobsSidecar found for slot ${block.slot} root ${toHex(block.root)}`);
}
return {key: block.slot, value: bytes};
})
);

// put to blockArchive db and delete block db
await Promise.all([
db.blobsSidecarArchive.batchPutBinary(canonicalBlobsSidecarEntries),
db.blobsSidecar.batchDelete(canonicalBlocks.map((block) => block.root)),
]);
}
}

/**
* ```
* class SignedBeaconBlock(Container):
Expand Down
10 changes: 9 additions & 1 deletion packages/beacon-node/src/chain/archiver/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,15 @@ export class Archiver {
try {
const finalizedEpoch = finalized.epoch;
this.logger.verbose("Start processing finalized checkpoint", {epoch: finalizedEpoch});
await archiveBlocks(this.db, this.chain.forkChoice, this.chain.lightClientServer, this.logger, finalized);
await archiveBlocks(
this.chain.config,
this.db,
this.chain.forkChoice,
this.chain.lightClientServer,
this.logger,
finalized,
this.chain.clock.currentEpoch
);

// should be after ArchiveBlocksTask to handle restart cleanly
await this.statesArchiver.maybeArchiveState(finalized);
Expand Down
8 changes: 8 additions & 0 deletions packages/beacon-node/src/db/beacon.ts
Original file line number Diff line number Diff line change
Expand Up @@ -75,4 +75,12 @@ export class BeaconDb extends DatabaseService implements IBeaconDb {
async stop(): Promise<void> {
await super.stop();
}

async pruneHotDb(): Promise<void> {
// Prune all hot blobs
await this.blobsSidecar.batchDelete(await this.blobsSidecar.keys());
// Prune all hot blocks
// TODO: Enable once it's deemed safe
// await this.block.batchDelete(await this.block.keys());
}
}
2 changes: 2 additions & 0 deletions packages/beacon-node/src/db/interface.ts
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,8 @@ export interface IBeaconDb {

backfilledRanges: BackfilledRanges;

pruneHotDb(): Promise<void>;

/** Start the connection to the db instance and open the db store. */
start(): Promise<void>;
/** Stop the connection to the db instance and close the db store. */
Expand Down
3 changes: 3 additions & 0 deletions packages/beacon-node/src/node/nodejs.ts
Original file line number Diff line number Diff line change
Expand Up @@ -149,6 +149,9 @@ export class BeaconNode {

// start db if not already started
await db.start();
// Prune hot db repos
// TODO: Should this call be awaited?
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Probably so unless we can be sure the pruning is done by the time we're syncing.

await db.pruneHotDb();

let metrics = null;
if (opts.metrics.enabled) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ import {expect} from "chai";
import sinon, {SinonStubbedInstance} from "sinon";
import {ssz} from "@lodestar/types";
import {ForkChoice} from "@lodestar/fork-choice";
import {config} from "@lodestar/config/default";
import {fromHexString, toHexString} from "@chainsafe/ssz";
import {ZERO_HASH_HEX} from "../../../../src/constants/index.js";
import {generateProtoBlock, generateEmptySignedBlock} from "../../../utils/block.js";
Expand Down Expand Up @@ -33,9 +34,18 @@ describe("block archiver task", function () {
);
const canonicalBlocks = [blocks[4], blocks[3], blocks[1], blocks[0]];
const nonCanonicalBlocks = [blocks[2]];
const currentEpoch = 8;
forkChoiceStub.getAllAncestorBlocks.returns(canonicalBlocks);
forkChoiceStub.getAllNonAncestorBlocks.returns(nonCanonicalBlocks);
await archiveBlocks(dbStub, forkChoiceStub, lightclientServer, logger, {epoch: 5, rootHex: ZERO_HASH_HEX});
await archiveBlocks(
config,
dbStub,
forkChoiceStub,
lightclientServer,
logger,
{epoch: 5, rootHex: ZERO_HASH_HEX},
currentEpoch
);

expect(dbStub.blockArchive.batchPutBinary.getCall(0).args[0]).to.deep.equal(
canonicalBlocks.map((summary) => ({
Expand Down
1 change: 1 addition & 0 deletions packages/beacon-node/test/utils/mocks/db.ts
Original file line number Diff line number Diff line change
Expand Up @@ -66,5 +66,6 @@ export function getStubbedBeaconDb(): IBeaconDb {
async stop(): Promise<void> {},
/** To inject metrics after CLI initialization */
setMetrics(): void {},
async pruneHotDb(): Promise<void> {},
};
}