Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: consume new state cache apis #6250

Closed
wants to merge 15 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
26 changes: 20 additions & 6 deletions packages/beacon-node/src/chain/archiver/archiveStates.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import {computeEpochAtSlot, computeStartSlotAtEpoch} from "@lodestar/state-trans
import {CheckpointWithHex} from "@lodestar/fork-choice";
import {IBeaconDb} from "../../db/index.js";
import {IStateRegenerator} from "../regen/interface.js";
import {getStateSlotFromBytes} from "../../util/multifork.js";

/**
* Minimum number of epochs between single temp archived states
Expand Down Expand Up @@ -83,13 +84,26 @@ export class StatesArchiver {
* Only the new finalized state is stored to disk
*/
async archiveState(finalized: CheckpointWithHex): Promise<void> {
const finalizedState = this.regen.getCheckpointStateSync(finalized);
if (!finalizedState) {
throw Error("No state in cache for finalized checkpoint state epoch #" + finalized.epoch);
// starting from Jan 2024, the finalized state could be from disk or in memory
const finalizedStateOrBytes = await this.regen.getCheckpointStateOrBytes(finalized);
const {rootHex} = finalized;
if (!finalizedStateOrBytes) {
throw Error(`No state in cache for finalized checkpoint state epoch #${finalized.epoch} root ${rootHex}`);
}
if (finalizedStateOrBytes instanceof Uint8Array) {
const slot = getStateSlotFromBytes(finalizedStateOrBytes);
await this.db.stateArchive.putBinary(slot, finalizedStateOrBytes);
this.logger.verbose("Archived finalized state bytes", {epoch: finalized.epoch, slot, root: rootHex});
} else {
// state
await this.db.stateArchive.put(finalizedStateOrBytes.slot, finalizedStateOrBytes);
// don't delete states before the finalized state, auto-prune will take care of it
this.logger.verbose("Archived finalized state", {
epoch: finalized.epoch,
slot: finalizedStateOrBytes.slot,
root: rootHex,
});
}
await this.db.stateArchive.put(finalizedState.slot, finalizedState);
// don't delete states before the finalized state, auto-prune will take care of it
this.logger.verbose("Archived finalized state", {finalizedEpoch: finalized.epoch});
}
}

Expand Down
44 changes: 22 additions & 22 deletions packages/beacon-node/src/chain/blocks/importBlock.ts
Original file line number Diff line number Diff line change
Expand Up @@ -59,10 +59,11 @@ export async function importBlock(
): Promise<void> {
const {blockInput, postState, parentBlockSlot, executionStatus} = fullyVerifiedBlock;
const {block, source} = blockInput;
const blockRoot = this.config.getForkTypes(block.message.slot).BeaconBlock.hashTreeRoot(block.message);
const {slot: blockSlot} = block.message;
const blockRoot = this.config.getForkTypes(blockSlot).BeaconBlock.hashTreeRoot(block.message);
const blockRootHex = toHexString(blockRoot);
const currentEpoch = computeEpochAtSlot(this.forkChoice.getTime());
const blockEpoch = computeEpochAtSlot(block.message.slot);
const blockEpoch = computeEpochAtSlot(blockSlot);
const parentEpoch = computeEpochAtSlot(parentBlockSlot);
const prevFinalizedEpoch = this.forkChoice.getFinalizedCheckpoint().epoch;
const blockDelaySec = (fullyVerifiedBlock.seenTimestampSec - postState.genesisTime) % this.config.SECONDS_PER_SLOT;
Expand All @@ -87,17 +88,16 @@ export async function importBlock(

// This adds the state necessary to process the next block
// Some block event handlers require state being in state cache so need to do this before emitting EventType.block
this.regen.addPostState(postState);
this.regen.processState(blockRootHex, postState);

this.metrics?.importBlock.bySource.inc({source});
this.logger.verbose("Added block to forkchoice and state cache", {slot: block.message.slot, root: blockRootHex});
this.logger.verbose("Added block to forkchoice and state cache", {slot: blockSlot, root: blockRootHex});

// We want to import block asap so call all event handler in the next event loop
setTimeout(() => {
const slot = block.message.slot;
this.emitter.emit(routes.events.EventType.block, {
block: blockRootHex,
slot,
slot: blockSlot,
executionOptimistic: blockSummary != null && isOptimisticBlock(blockSummary),
});

Expand All @@ -106,7 +106,7 @@ export async function importBlock(
const {index, kzgCommitment} = blobSidecar;
this.emitter.emit(routes.events.EventType.blobSidecar, {
blockRoot: blockRootHex,
slot,
slot: blockSlot,
index,
kzgCommitment: toHexString(kzgCommitment),
versionedHash: toHexString(kzgCommitmentToVersionedHash(kzgCommitment)),
Expand Down Expand Up @@ -171,7 +171,7 @@ export async function importBlock(
correctHead,
missedSlotVote,
blockRootHex,
block.message.slot
blockSlot
);
} catch (e) {
// a block has a lot of attestations and it may has same error, we don't want to log all of them
Expand All @@ -185,15 +185,15 @@ export async function importBlock(
}
} else {
// always log other errors
this.logger.warn("Error processing attestation from block", {slot: block.message.slot}, e as Error);
this.logger.warn("Error processing attestation from block", {slot: blockSlot}, e as Error);
}
}
}

for (const {error, count} of invalidAttestationErrorsByCode.values()) {
this.logger.warn(
"Error processing attestations from block",
{slot: block.message.slot, erroredAttestations: count},
{slot: blockSlot, erroredAttestations: count},
error
);
}
Expand All @@ -214,7 +214,7 @@ export async function importBlock(
// all AttesterSlashings are valid before reaching this
this.forkChoice.onAttesterSlashing(slashing);
} catch (e) {
this.logger.warn("Error processing AttesterSlashing from block", {slot: block.message.slot}, e as Error);
this.logger.warn("Error processing AttesterSlashing from block", {slot: blockSlot}, e as Error);
}
}
}
Expand Down Expand Up @@ -297,7 +297,7 @@ export async function importBlock(
parentBlockSlot
);
} catch (e) {
this.logger.verbose("Error lightClientServer.onImportBlock", {slot: block.message.slot}, e as Error);
this.logger.verbose("Error lightClientServer.onImportBlock", {slot: blockSlot}, e as Error);
}
}, 0);
}
Expand Down Expand Up @@ -351,15 +351,15 @@ export async function importBlock(
if (parentEpoch < blockEpoch) {
// current epoch and previous epoch are likely cached in previous states
this.shufflingCache.processState(postState, postState.epochCtx.nextShuffling.epoch);
this.logger.verbose("Processed shuffling for next epoch", {parentEpoch, blockEpoch, slot: block.message.slot});
this.logger.verbose("Processed shuffling for next epoch", {parentEpoch, blockEpoch, slot: blockSlot});
}

if (block.message.slot % SLOTS_PER_EPOCH === 0) {
if (blockSlot % SLOTS_PER_EPOCH === 0) {
// Cache state to preserve epoch transition work
const checkpointState = postState;
const cp = getCheckpointFromState(checkpointState);
this.regen.addCheckpointState(cp, checkpointState);
this.emitter.emit(ChainEvent.checkpoint, cp, checkpointState);
this.emitter.emit(ChainEvent.checkpoint, cp, checkpointState.clone());

// Note: in-lined code from previos handler of ChainEvent.checkpoint
this.logger.verbose("Checkpoint processed", toCheckpointHex(cp));
Expand Down Expand Up @@ -397,7 +397,7 @@ export async function importBlock(

// Send block events, only for recent enough blocks

if (this.clock.currentSlot - block.message.slot < EVENTSTREAM_EMIT_RECENT_BLOCK_SLOTS) {
if (this.clock.currentSlot - blockSlot < EVENTSTREAM_EMIT_RECENT_BLOCK_SLOTS) {
// NOTE: Skip looping if there are no listeners from the API
if (this.emitter.listenerCount(routes.events.EventType.voluntaryExit)) {
for (const voluntaryExit of block.message.body.voluntaryExits) {
Expand All @@ -417,10 +417,10 @@ export async function importBlock(
}

// Register stat metrics about the block after importing it
this.metrics?.parentBlockDistance.observe(block.message.slot - parentBlockSlot);
this.metrics?.parentBlockDistance.observe(blockSlot - parentBlockSlot);
this.metrics?.proposerBalanceDeltaAny.observe(fullyVerifiedBlock.proposerBalanceDelta);
this.metrics?.registerImportedBlock(block.message, fullyVerifiedBlock);
if (this.config.getForkSeq(block.message.slot) >= ForkSeq.altair) {
if (this.config.getForkSeq(blockSlot) >= ForkSeq.altair) {
this.metrics?.registerSyncAggregateInBlock(
blockEpoch,
(block as altair.SignedBeaconBlock).message.body.syncAggregate,
Expand All @@ -433,18 +433,18 @@ export async function importBlock(
// Gossip blocks need to be imported as soon as possible, waiting attestations could be processed
// in the next event loop. See https://github.com/ChainSafe/lodestar/issues/4789
setTimeout(() => {
this.reprocessController.onBlockImported({slot: block.message.slot, root: blockRootHex}, advancedSlot);
this.reprocessController.onBlockImported({slot: blockSlot, root: blockRootHex}, advancedSlot);
}, 0);

if (opts.seenTimestampSec !== undefined) {
const recvToImportedBlock = Date.now() / 1000 - opts.seenTimestampSec;
this.metrics?.gossipBlock.receivedToBlockImport.observe(recvToImportedBlock);
this.logger.verbose("Imported block", {slot: block.message.slot, recvToImportedBlock});
this.logger.verbose("Imported block", {slot: blockSlot, recvToImportedBlock});
}

this.logger.verbose("Block processed", {
slot: block.message.slot,
slot: blockSlot,
root: blockRootHex,
delaySec: this.clock.secFromSlot(block.message.slot),
delaySec: this.clock.secFromSlot(blockSlot),
});
}
33 changes: 29 additions & 4 deletions packages/beacon-node/src/chain/chain.ts
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ import {IExecutionEngine, IExecutionBuilder} from "../execution/index.js";
import {Clock, ClockEvent, IClock} from "../util/clock.js";
import {ensureDir, writeIfNotExist} from "../util/file.js";
import {isOptimisticBlock} from "../util/forkChoice.js";
import {BufferPool} from "../util/bufferPool.js";
import {BlockProcessor, ImportBlockOpts} from "./blocks/index.js";
import {ChainEventEmitter, ChainEvent} from "./emitter.js";
import {IBeaconChain, ProposerPreparationData, BlockHash, StateGetOpts} from "./interface.js";
Expand Down Expand Up @@ -80,7 +81,11 @@ import {SeenAttestationDatas} from "./seenCache/seenAttestationData.js";
import {ShufflingCache} from "./shufflingCache.js";
import {StateContextCache} from "./stateCache/stateContextCache.js";
import {SeenGossipBlockInput} from "./seenCache/index.js";
import {CheckpointStateCache} from "./stateCache/stateContextCheckpointsCache.js";
import {InMemoryCheckpointStateCache} from "./stateCache/stateContextCheckpointsCache.js";
import {FIFOBlockStateCache} from "./stateCache/fifoBlockStateCache.js";
import {PersistentCheckpointStateCache} from "./stateCache/persistentCheckpointsCache.js";
import {DbCPStateDatastore} from "./stateCache/datastore/db.js";
import {FileCPStateDatastore} from "./stateCache/datastore/file.js";

/**
* Arbitrary constants, blobs and payloads should be consumed immediately in the same slot
Expand Down Expand Up @@ -238,9 +243,28 @@ export class BeaconChain implements IBeaconChain {
this.pubkey2index = cachedState.epochCtx.pubkey2index;
this.index2pubkey = cachedState.epochCtx.index2pubkey;

const stateCache = new StateContextCache({metrics});
const checkpointStateCache = new CheckpointStateCache({metrics});

const fileDataStore = opts.nHistoricalStatesFileDataStore ?? false;
const stateCache = this.opts.nHistoricalStates
? new FIFOBlockStateCache(this.opts, {metrics})
: new StateContextCache({metrics});
const checkpointStateCache = this.opts.nHistoricalStates
? new PersistentCheckpointStateCache(
{
metrics,
logger,
clock,
shufflingCache: this.shufflingCache,
getHeadState: this.getHeadState.bind(this),
bufferPool: new BufferPool(anchorState.type.tree_serializedSize(anchorState.node), metrics),
datastore: fileDataStore
? // debug option if we want to investigate any issues with the DB
new FileCPStateDatastore()
: // production option
new DbCPStateDatastore(this.db),
},
this.opts
)
: new InMemoryCheckpointStateCache({metrics});
const {checkpoint} = computeAnchorCheckpoint(config, anchorState);
stateCache.add(cachedState);
stateCache.setHeadState(cachedState);
Expand Down Expand Up @@ -330,6 +354,7 @@ export class BeaconChain implements IBeaconChain {

/** Populate in-memory caches with persisted data. Call at least once on startup */
async loadFromDisk(): Promise<void> {
await this.regen.init();
await this.opPool.fromPersisted(this.db);
}

Expand Down
13 changes: 10 additions & 3 deletions packages/beacon-node/src/chain/forkChoice/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ import {
ForkChoiceStore,
ExecutionStatus,
JustifiedBalancesGetter,
ForkChoiceOpts,
ForkChoiceOpts as RealForkChoiceOpts,
} from "@lodestar/fork-choice";
import {
CachedBeaconStateAllForks,
Expand All @@ -21,7 +21,10 @@ import {ChainEventEmitter} from "../emitter.js";
import {ChainEvent} from "../emitter.js";
import {GENESIS_SLOT} from "../../constants/index.js";

export type {ForkChoiceOpts};
export type ForkChoiceOpts = RealForkChoiceOpts & {
// for testing only
forkchoiceConstructor?: typeof ForkChoice;
};

/**
* Fork Choice extended with a ChainEventEmitter
Expand All @@ -47,7 +50,11 @@ export function initializeForkChoice(

const justifiedBalances = getEffectiveBalanceIncrementsZeroInactive(state);

return new ForkChoice(
// forkchoiceConstructor is only used for some test cases
// production code use ForkChoice constructor directly
const forkchoiceConstructor = opts.forkchoiceConstructor ?? ForkChoice;

return new forkchoiceConstructor(
config,

new ForkChoiceStore(
Expand Down
11 changes: 11 additions & 0 deletions packages/beacon-node/src/chain/options.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,12 +4,17 @@ import {ArchiverOpts} from "./archiver/index.js";
import {ForkChoiceOpts} from "./forkChoice/index.js";
import {LightClientServerOpts} from "./lightClient/index.js";
import {ShufflingCacheOpts} from "./shufflingCache.js";
import {DEFAULT_MAX_BLOCK_STATES, FIFOBlockStateCacheOpts} from "./stateCache/fifoBlockStateCache.js";
import {PersistentCheckpointStateCacheOpts} from "./stateCache/persistentCheckpointsCache.js";
import {DEFAULT_MAX_CP_STATE_EPOCHS_IN_MEMORY} from "./stateCache/persistentCheckpointsCache.js";

export type IChainOptions = BlockProcessOpts &
PoolOpts &
SeenCacheOpts &
ForkChoiceOpts &
ArchiverOpts &
FIFOBlockStateCacheOpts &
PersistentCheckpointStateCacheOpts &
ShufflingCacheOpts &
LightClientServerOpts & {
blsVerifyAllMainThread?: boolean;
Expand All @@ -30,6 +35,8 @@ export type IChainOptions = BlockProcessOpts &
trustedSetup?: string;
broadcastValidationStrictness?: string;
minSameMessageSignatureSetsToBatch: number;
nHistoricalStates?: boolean;
nHistoricalStatesFileDataStore?: boolean;
};

export type BlockProcessOpts = {
Expand Down Expand Up @@ -102,4 +109,8 @@ export const defaultChainOptions: IChainOptions = {
// batching too much may block the I/O thread so if useWorker=false, suggest this value to be 32
// since this batch attestation work is designed to work with useWorker=true, make this the lowest value
minSameMessageSignatureSetsToBatch: 2,
nHistoricalStates: false,
nHistoricalStatesFileDataStore: false,
maxBlockStates: DEFAULT_MAX_BLOCK_STATES,
maxCPStateEpochsInMemory: DEFAULT_MAX_CP_STATE_EPOCHS_IN_MEMORY,
};
4 changes: 3 additions & 1 deletion packages/beacon-node/src/chain/regen/errors.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ export enum RegenErrorCode {
TOO_MANY_BLOCK_PROCESSED = "REGEN_ERROR_TOO_MANY_BLOCK_PROCESSED",
BLOCK_NOT_IN_DB = "REGEN_ERROR_BLOCK_NOT_IN_DB",
STATE_TRANSITION_ERROR = "REGEN_ERROR_STATE_TRANSITION_ERROR",
INVALID_STATE_ROOT = "REGEN_ERROR_INVALID_STATE_ROOT",
}

export type RegenErrorType =
Expand All @@ -17,7 +18,8 @@ export type RegenErrorType =
| {code: RegenErrorCode.NO_SEED_STATE}
| {code: RegenErrorCode.TOO_MANY_BLOCK_PROCESSED; stateRoot: RootHex | Root}
| {code: RegenErrorCode.BLOCK_NOT_IN_DB; blockRoot: RootHex | Root}
| {code: RegenErrorCode.STATE_TRANSITION_ERROR; error: Error};
| {code: RegenErrorCode.STATE_TRANSITION_ERROR; error: Error}
| {code: RegenErrorCode.INVALID_STATE_ROOT; slot: Slot; expected: RootHex; actual: RootHex};

export class RegenError extends Error {
type: RegenErrorType;
Expand Down
3 changes: 2 additions & 1 deletion packages/beacon-node/src/chain/regen/interface.ts
Original file line number Diff line number Diff line change
Expand Up @@ -35,11 +35,12 @@ export interface IStateRegenerator extends IStateRegeneratorInternal {
dropCache(): void;
dumpCacheSummary(): routes.lodestar.StateCacheItem[];
getStateSync(stateRoot: RootHex): CachedBeaconStateAllForks | null;
getCheckpointStateOrBytes(cp: CheckpointHex): Promise<CachedBeaconStateAllForks | Uint8Array | null>;
getCheckpointStateSync(cp: CheckpointHex): CachedBeaconStateAllForks | null;
getClosestHeadState(head: ProtoBlock): CachedBeaconStateAllForks | null;
pruneOnCheckpoint(finalizedEpoch: Epoch, justifiedEpoch: Epoch, headStateRoot: RootHex): void;
pruneOnFinalized(finalizedEpoch: Epoch): void;
addPostState(postState: CachedBeaconStateAllForks): void;
processState(blockRootHex: RootHex, postState: CachedBeaconStateAllForks): void;
addCheckpointState(cp: phase0.Checkpoint, item: CachedBeaconStateAllForks): void;
updateHeadState(newHeadStateRoot: RootHex, maybeHeadState: CachedBeaconStateAllForks): void;
updatePreComputedCheckpoint(rootHex: RootHex, epoch: Epoch): number | null;
Expand Down
Loading
Loading