Skip to content

Commit

Permalink
Merge 2e9dd45 into 8bd86ae
Browse files Browse the repository at this point in the history
  • Loading branch information
dapplion authored Dec 7, 2022
2 parents 8bd86ae + 2e9dd45 commit e0b4d2a
Show file tree
Hide file tree
Showing 7 changed files with 109 additions and 9 deletions.
82 changes: 75 additions & 7 deletions packages/beacon-node/src/chain/archiver/archiveBlocks.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,16 +2,19 @@ import {fromHexString} from "@chainsafe/ssz";
import {Epoch, Slot} from "@lodestar/types";
import {IForkChoice} from "@lodestar/fork-choice";
import {ILogger, toHex} from "@lodestar/utils";
import {SLOTS_PER_EPOCH} from "@lodestar/params";
import {computeEpochAtSlot} from "@lodestar/state-transition";
import {ForkSeq, SLOTS_PER_EPOCH} from "@lodestar/params";
import {computeEpochAtSlot, computeStartSlotAtEpoch} from "@lodestar/state-transition";
import {IKeyValue} from "@lodestar/db";
import {IChainForkConfig} from "@lodestar/config";
import {IBeaconDb} from "../../db/index.js";
import {BlockArchiveBatchPutBinaryItem} from "../../db/repositories/index.js";
import {LightClientServer} from "../lightClient/index.js";

// Process in chunks to avoid OOM
// this number of blocks per chunk is tested in e2e test blockArchive.test.ts
// TODO: Review after merge since the size of blocks will increase significantly
const BATCH_SIZE = 256;
const BLOCK_BATCH_SIZE = 256;
const BLOB_SIDECAR_BATCH_SIZE = 32;

type BlockRootSlot = {slot: Slot; root: Uint8Array};

Expand All @@ -25,16 +28,21 @@ type BlockRootSlot = {slot: Slot; root: Uint8Array};
* the next run should not reprocess finalzied block of this run.
*/
export async function archiveBlocks(
config: IChainForkConfig,
db: IBeaconDb,
forkChoice: IForkChoice,
lightclientServer: LightClientServer,
logger: ILogger,
finalizedCheckpoint: {rootHex: string; epoch: Epoch}
finalizedCheckpoint: {rootHex: string; epoch: Epoch},
currentEpoch: Epoch
): Promise<void> {
// Use fork choice to determine the blocks to archive and delete
const finalizedCanonicalBlocks = forkChoice.getAllAncestorBlocks(finalizedCheckpoint.rootHex);
const finalizedNonCanonicalBlocks = forkChoice.getAllNonAncestorBlocks(finalizedCheckpoint.rootHex);

// NOTE: The finalized block will be exactly the first block of `epoch` or previous
const finalizedPostEIP4844 = finalizedCheckpoint.epoch >= config.EIP4844_FORK_EPOCH;

const finalizedCanonicalBlockRoots: BlockRootSlot[] = finalizedCanonicalBlocks.map((block) => ({
slot: block.slot,
root: fromHexString(block.blockRoot),
Expand All @@ -47,6 +55,11 @@ export async function archiveBlocks(
toSlot: finalizedCanonicalBlockRoots[finalizedCanonicalBlockRoots.length - 1].slot,
size: finalizedCanonicalBlockRoots.length,
});

if (finalizedPostEIP4844) {
await migrateBlobsSidecarFromHotToColdDb(config, db, finalizedCanonicalBlockRoots);
logger.verbose("Migrated blobsSidecar from hot DB to cold DB");
}
}

// deleteNonCanonicalBlocks
Expand All @@ -55,9 +68,31 @@ export async function archiveBlocks(
const nonCanonicalBlockRoots = finalizedNonCanonicalBlocks.map((summary) => fromHexString(summary.blockRoot));
if (nonCanonicalBlockRoots.length > 0) {
await db.block.batchDelete(nonCanonicalBlockRoots);
logger.verbose("deleteNonCanonicalBlocks", {
logger.verbose("Deleted non canonical blocks from hot DB", {
slots: finalizedNonCanonicalBlocks.map((summary) => summary.slot).join(","),
});

if (finalizedPostEIP4844) {
await db.blobsSidecar.batchDelete(nonCanonicalBlockRoots);
logger.verbose("Deleted non canonical blobsSider from hot DB");
}
}

// Delete expired blobs
// Keep only `[max(GENESIS_EPOCH, current_epoch - MIN_EPOCHS_FOR_BLOBS_SIDECARS_REQUESTS), current_epoch]`
if (finalizedPostEIP4844) {
const blobsSidecarMinEpoch = currentEpoch - config.MIN_EPOCHS_FOR_BLOBS_SIDECARS_REQUESTS;
if (blobsSidecarMinEpoch > 0) {
const slotsToDelete = await db.blobsSidecarArchive.keys({lt: computeStartSlotAtEpoch(blobsSidecarMinEpoch)});
if (slotsToDelete.length > 0) {
await db.blobsSidecarArchive.batchDelete(slotsToDelete);
logger.verbose(
`blobsSidecar prune: batchDelete range ${slotsToDelete[0]}..${slotsToDelete[slotsToDelete.length - 1]}`
);
} else {
logger.verbose(`blobsSidecar prune: no entries before epoch ${blobsSidecarMinEpoch}`);
}
}
}

// Prunning potential checkpoint data
Expand All @@ -79,8 +114,8 @@ async function migrateBlocksFromHotToColdDb(db: IBeaconDb, blocks: BlockRootSlot
// Start from `i=0`: 1st block in iterateAncestorBlocks() is the finalized block itself
// we move it to blockArchive but forkchoice still have it to check next onBlock calls
// the next iterateAncestorBlocks call does not return this block
for (let i = 0; i < blocks.length; i += BATCH_SIZE) {
const toIdx = Math.min(i + BATCH_SIZE, blocks.length);
for (let i = 0; i < blocks.length; i += BLOCK_BATCH_SIZE) {
const toIdx = Math.min(i + BLOCK_BATCH_SIZE, blocks.length);
const canonicalBlocks = blocks.slice(i, toIdx);

// processCanonicalBlocks
Expand Down Expand Up @@ -112,6 +147,39 @@ async function migrateBlocksFromHotToColdDb(db: IBeaconDb, blocks: BlockRootSlot
}
}

async function migrateBlobsSidecarFromHotToColdDb(
config: IChainForkConfig,
db: IBeaconDb,
blocks: BlockRootSlot[]
): Promise<void> {
for (let i = 0; i < blocks.length; i += BLOB_SIDECAR_BATCH_SIZE) {
const toIdx = Math.min(i + BLOB_SIDECAR_BATCH_SIZE, blocks.length);
const canonicalBlocks = blocks.slice(i, toIdx);

// processCanonicalBlocks
if (canonicalBlocks.length === 0) return;

// load Buffer instead of ssz deserialized to improve performance
const canonicalBlobsSidecarEntries: IKeyValue<Slot, Uint8Array>[] = await Promise.all(
canonicalBlocks
.filter((block) => config.getForkSeq(block.slot) >= ForkSeq.eip4844)
.map(async (block) => {
const bytes = await db.blobsSidecar.getBinary(block.root);
if (!bytes) {
throw Error(`No blobsSidecar found for slot ${block.slot} root ${toHex(block.root)}`);
}
return {key: block.slot, value: bytes};
})
);

// put to blockArchive db and delete block db
await Promise.all([
db.blobsSidecarArchive.batchPutBinary(canonicalBlobsSidecarEntries),
db.blobsSidecar.batchDelete(canonicalBlocks.map((block) => block.root)),
]);
}
}

/**
* ```
* class SignedBeaconBlock(Container):
Expand Down
10 changes: 9 additions & 1 deletion packages/beacon-node/src/chain/archiver/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,15 @@ export class Archiver {
try {
const finalizedEpoch = finalized.epoch;
this.logger.verbose("Start processing finalized checkpoint", {epoch: finalizedEpoch});
await archiveBlocks(this.db, this.chain.forkChoice, this.chain.lightClientServer, this.logger, finalized);
await archiveBlocks(
this.chain.config,
this.db,
this.chain.forkChoice,
this.chain.lightClientServer,
this.logger,
finalized,
this.chain.clock.currentEpoch
);

// should be after ArchiveBlocksTask to handle restart cleanly
await this.statesArchiver.maybeArchiveState(finalized);
Expand Down
8 changes: 8 additions & 0 deletions packages/beacon-node/src/db/beacon.ts
Original file line number Diff line number Diff line change
Expand Up @@ -75,4 +75,12 @@ export class BeaconDb extends DatabaseService implements IBeaconDb {
async stop(): Promise<void> {
await super.stop();
}

async pruneHotDb(): Promise<void> {
// Prune all hot blobs
await this.blobsSidecar.batchDelete(await this.blobsSidecar.keys());
// Prune all hot blocks
// TODO: Enable once it's deemed safe
// await this.block.batchDelete(await this.block.keys());
}
}
2 changes: 2 additions & 0 deletions packages/beacon-node/src/db/interface.ts
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,8 @@ export interface IBeaconDb {

backfilledRanges: BackfilledRanges;

pruneHotDb(): Promise<void>;

/** Start the connection to the db instance and open the db store. */
start(): Promise<void>;
/** Stop the connection to the db instance and close the db store. */
Expand Down
3 changes: 3 additions & 0 deletions packages/beacon-node/src/node/nodejs.ts
Original file line number Diff line number Diff line change
Expand Up @@ -149,6 +149,9 @@ export class BeaconNode {

// start db if not already started
await db.start();
// Prune hot db repos
// TODO: Should this call be awaited?
await db.pruneHotDb();

let metrics = null;
if (opts.metrics.enabled) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ import {expect} from "chai";
import sinon, {SinonStubbedInstance} from "sinon";
import {ssz} from "@lodestar/types";
import {ForkChoice} from "@lodestar/fork-choice";
import {config} from "@lodestar/config/default";
import {fromHexString, toHexString} from "@chainsafe/ssz";
import {ZERO_HASH_HEX} from "../../../../src/constants/index.js";
import {generateProtoBlock, generateEmptySignedBlock} from "../../../utils/block.js";
Expand Down Expand Up @@ -33,9 +34,18 @@ describe("block archiver task", function () {
);
const canonicalBlocks = [blocks[4], blocks[3], blocks[1], blocks[0]];
const nonCanonicalBlocks = [blocks[2]];
const currentEpoch = 8;
forkChoiceStub.getAllAncestorBlocks.returns(canonicalBlocks);
forkChoiceStub.getAllNonAncestorBlocks.returns(nonCanonicalBlocks);
await archiveBlocks(dbStub, forkChoiceStub, lightclientServer, logger, {epoch: 5, rootHex: ZERO_HASH_HEX});
await archiveBlocks(
config,
dbStub,
forkChoiceStub,
lightclientServer,
logger,
{epoch: 5, rootHex: ZERO_HASH_HEX},
currentEpoch
);

expect(dbStub.blockArchive.batchPutBinary.getCall(0).args[0]).to.deep.equal(
canonicalBlocks.map((summary) => ({
Expand Down
1 change: 1 addition & 0 deletions packages/beacon-node/test/utils/mocks/db.ts
Original file line number Diff line number Diff line change
Expand Up @@ -66,5 +66,6 @@ export function getStubbedBeaconDb(): IBeaconDb {
async stop(): Promise<void> {},
/** To inject metrics after CLI initialization */
setMetrics(): void {},
async pruneHotDb(): Promise<void> {},
};
}

0 comments on commit e0b4d2a

Please sign in to comment.