Skip to content

Commit

Permalink
Merge 8ad8647 into 20c18ad
Browse files Browse the repository at this point in the history
  • Loading branch information
nazarhussain authored Aug 15, 2024
2 parents 20c18ad + 8ad8647 commit 28d7b6c
Show file tree
Hide file tree
Showing 47 changed files with 1,409 additions and 437 deletions.
49 changes: 49 additions & 0 deletions docs/pages/contribution/advanced-topics/historical-state-regen.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
---
title: Understanding Historical Sate Regeneration
---

# Understanding Historical Sate Regeneration

To run a blockchain client and establish consensus we need latest headers and forkchoice data. This operation does not require to historical data, specially after the epochs which are finalized. Storing the full state information for the finalized slots increase the storage requirement a lot and not suitable for running the node for long time.

## Solution

To overcome the storage problem for the archive nodes we implemented following algorithm to store and fetch the historical sates.

**Approach**

Assume we have following chain represents the state object every slot, with following diff layer configurations `1,2,3,5`. With assumption that we have 8 slots each epoch, The following configuration for layers implies:

1. We store the snapshot every 5th epoch.
2. We take diff every epoch, every 2nd epoch and every 3rd epoch.

Please see the following table for more understanding of these layers.

![historical-regen](docs/static/images/historical-regen/historical-regen.png)

These are the rules we follow:

1. If two layers frequency collide on one slot, we use the lower layer. Shown as the black border around slots.
2. The lowest layer is called the snapshot layer and we store fully serialized bytes of state object for that slot.
3. We always try to find the shortest hierarchical path to reach to the snapshot layer, starting from the top most layer.
4. For rest of the layers we recursively find the binary difference and only store the diffs on the upper layers.

Let's take few scenarios:

1. For slot `0` all layers collide, so we use the lowest layer which is the snapshot layer. So for the slot `0` we store and fetch the snapshot.
2. For slots (0-7) within first epoch we there is no intermediary layer, so we read the snapshot from slot `0`.
3. For slots (8-15) the path we follow is `8 -> 0`. e.g. For slot `12`, we apply diff from slot `8` on snapshot from slot `0`. Then we replay blocks from 9-12.
4. For slot `18` the shortest path to nearest snapshot is `16 -> 0` and rest will follow same as above.
5. For slot `34` the path we follow `32 -> 24 -> 0`.
6. For slot `41` path for the nearest snapshot slot is just one layer directly at slot `40`.

As you can see with this approach we can find a shorter paths with smaller number of diffs to apply, which generate the nearest full state and reduce the number of blocks we have to replay to reach to actual slot.


**Constants**

Following constants values are used for the implementation.

| Name | Value | Description |
| -------------------------- | ----- | ----------------------------------------------- |
| DEFAULT_DIFF_LAYERS | 4, 64, 256, 1024 | Default value for layers |
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
2 changes: 2 additions & 0 deletions packages/beacon-node/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -149,6 +149,8 @@
"strict-event-emitter-types": "^2.0.0",
"systeminformation": "^5.22.9",
"uint8arraylist": "^2.4.7",
"xdelta3-wasm": "^1.0.0",
"vcdiff-wasm": "^1.0.10",
"xxhash-wasm": "1.0.2"
},
"devDependencies": {
Expand Down
2 changes: 1 addition & 1 deletion packages/beacon-node/src/api/impl/lodestar/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -183,7 +183,7 @@ export function getLodestarApi({
},

async dumpDbStateIndex() {
return {data: await db.stateArchive.dumpRootIndexEntries()};
return {data: await db.stateSnapshotArchive.dumpRootIndexEntries()};
},
};
}
Expand Down
114 changes: 18 additions & 96 deletions packages/beacon-node/src/chain/archiver/archiveStates.ts
Original file line number Diff line number Diff line change
@@ -1,24 +1,12 @@
import {Logger} from "@lodestar/utils";
import {SLOTS_PER_EPOCH} from "@lodestar/params";
import {Slot, Epoch} from "@lodestar/types";
import {computeEpochAtSlot, computeStartSlotAtEpoch} from "@lodestar/state-transition";
import {CheckpointWithHex} from "@lodestar/fork-choice";
import {Logger} from "@lodestar/utils";
import {computeStartSlotAtEpoch} from "@lodestar/state-transition";
import {CachedBeaconStateAllForks} from "@lodestar/state-transition";
import {IBeaconDb} from "../../db/index.js";
import {IStateRegenerator} from "../regen/interface.js";
import {getStateSlotFromBytes} from "../../util/multifork.js";

/**
* Minimum number of epochs between single temp archived states
* These states will be pruned once a new state is persisted
*/
const PERSIST_TEMP_STATE_EVERY_EPOCHS = 32;
import {IHistoricalStateRegen} from "../historicalState/types.js";

export interface StatesArchiverOpts {
/**
* Minimum number of epochs between archived states
*/
archiveStateEpochFrequency: number;
}
export interface StatesArchiverOpts {}

/**
* Archives finalized states from active bucket to archive bucket.
Expand All @@ -27,56 +15,15 @@ export interface StatesArchiverOpts {
*/
export class StatesArchiver {
constructor(
private readonly historicalSates: IHistoricalStateRegen | undefined,
private readonly regen: IStateRegenerator,
private readonly db: IBeaconDb,
private readonly logger: Logger,
private readonly opts: StatesArchiverOpts
) {}

/**
* Persist states every some epochs to
* - Minimize disk space, storing the least states possible
* - Minimize the sync progress lost on unexpected crash, storing temp state every few epochs
*
* At epoch `e` there will be states peristed at intervals of `PERSIST_STATE_EVERY_EPOCHS` = 32
* and one at `PERSIST_TEMP_STATE_EVERY_EPOCHS` = 1024
* ```
* | | | .
* epoch - 1024*2 epoch - 1024 epoch - 32 epoch
* ```
*/
async maybeArchiveState(finalized: CheckpointWithHex): Promise<void> {
const lastStoredSlot = await this.db.stateArchive.lastKey();
const lastStoredEpoch = computeEpochAtSlot(lastStoredSlot ?? 0);
const {archiveStateEpochFrequency} = this.opts;

if (finalized.epoch - lastStoredEpoch >= Math.min(PERSIST_TEMP_STATE_EVERY_EPOCHS, archiveStateEpochFrequency)) {
await this.archiveState(finalized);

// Only check the current and previous intervals
const minEpoch = Math.max(
0,
(Math.floor(finalized.epoch / archiveStateEpochFrequency) - 1) * archiveStateEpochFrequency
);

const storedStateSlots = await this.db.stateArchive.keys({
lt: computeStartSlotAtEpoch(finalized.epoch),
gte: computeStartSlotAtEpoch(minEpoch),
});

const statesSlotsToDelete = computeStateSlotsToDelete(storedStateSlots, archiveStateEpochFrequency);
if (statesSlotsToDelete.length > 0) {
await this.db.stateArchive.batchDelete(statesSlotsToDelete);
}

// More logs to investigate the rss spike issue https://github.com/ChainSafe/lodestar/issues/5591
this.logger.verbose("Archived state completed", {
finalizedEpoch: finalized.epoch,
minEpoch,
storedStateSlots: storedStateSlots.join(","),
statesSlotsToDelete: statesSlotsToDelete.join(","),
});
}
await this.archiveState(finalized);
}

/**
Expand All @@ -85,44 +32,19 @@ export class StatesArchiver {
*/
async archiveState(finalized: CheckpointWithHex): Promise<void> {
// starting from Mar 2024, the finalized state could be from disk or in memory
const finalizedStateOrBytes = await this.regen.getCheckpointStateOrBytes(finalized);
const {rootHex} = finalized;
if (!finalizedStateOrBytes) {
throw Error(`No state in cache for finalized checkpoint state epoch #${finalized.epoch} root ${rootHex}`);
}
if (finalizedStateOrBytes instanceof Uint8Array) {
const slot = getStateSlotFromBytes(finalizedStateOrBytes);
await this.db.stateArchive.putBinary(slot, finalizedStateOrBytes);
this.logger.verbose("Archived finalized state bytes", {epoch: finalized.epoch, slot, root: rootHex});
} else {
// state
await this.db.stateArchive.put(finalizedStateOrBytes.slot, finalizedStateOrBytes);
// don't delete states before the finalized state, auto-prune will take care of it
this.logger.verbose("Archived finalized state", {
epoch: finalized.epoch,
slot: finalizedStateOrBytes.slot,
root: rootHex,
});
const state = await this.regen.getCheckpointStateOrBytes(finalized);
if (state === null) {
this.logger.warn("Checkpoint state not available to archive.", {epoch: finalized.epoch, root: finalized.rootHex});
return;
}
}
}

/**
* Keeps first epoch per interval of persistEveryEpochs, deletes the rest
*/
export function computeStateSlotsToDelete(storedStateSlots: Slot[], persistEveryEpochs: Epoch): Slot[] {
const persistEverySlots = persistEveryEpochs * SLOTS_PER_EPOCH;
const intervalsWithStates = new Set<number>();
const stateSlotsToDelete = new Set<number>();

for (const slot of storedStateSlots) {
const interval = Math.floor(slot / persistEverySlots);
if (intervalsWithStates.has(interval)) {
stateSlotsToDelete.add(slot);
} else {
intervalsWithStates.add(interval);
if (Array.isArray(state) && state.constructor === Uint8Array) {
return this.historicalSates?.storeHistoricalState(computeStartSlotAtEpoch(finalized.epoch), state);
}
}

return Array.from(stateSlotsToDelete.values());
return this.historicalSates?.storeHistoricalState(
(state as CachedBeaconStateAllForks).slot,
(state as CachedBeaconStateAllForks).serialize()
);
}
}
4 changes: 3 additions & 1 deletion packages/beacon-node/src/chain/archiver/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import {IBeaconDb} from "../../db/index.js";
import {JobItemQueue} from "../../util/queue/index.js";
import {IBeaconChain} from "../interface.js";
import {ChainEvent} from "../emitter.js";
import {DiffLayers} from "../historicalState/diffLayers.js";
import {StatesArchiver, StatesArchiverOpts} from "./archiveStates.js";
import {archiveBlocks} from "./archiveBlocks.js";

Expand Down Expand Up @@ -43,12 +44,13 @@ export class Archiver {
constructor(
private readonly db: IBeaconDb,
private readonly chain: IBeaconChain,
private readonly diffLayers: DiffLayers,
private readonly logger: Logger,
signal: AbortSignal,
opts: ArchiverOpts
) {
this.archiveBlobEpochs = opts.archiveBlobEpochs;
this.statesArchiver = new StatesArchiver(chain.regen, db, logger, opts);
this.statesArchiver = new StatesArchiver(chain.historicalStateRegen, chain.regen, db, logger, opts);
this.prevFinalized = chain.forkChoice.getFinalizedCheckpoint();
this.jobQueue = new JobItemQueue<[CheckpointWithHex], void>(this.processFinalizedCheckpoint, {
maxLength: PROCESS_FINALIZED_CHECKPOINT_QUEUE_LEN,
Expand Down
12 changes: 7 additions & 5 deletions packages/beacon-node/src/chain/chain.ts
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,6 @@ import {BlockAttributes, produceBlockBody, produceCommonBlockBody} from "./produ
import {computeNewStateRoot} from "./produceBlock/computeNewStateRoot.js";
import {BlockInput} from "./blocks/types.js";
import {SeenAttestationDatas} from "./seenCache/seenAttestationData.js";
import {HistoricalStateRegen} from "./historicalState/index.js";
import {BlockRewards, computeBlockRewards} from "./rewards/blockRewards.js";
import {ShufflingCache} from "./shufflingCache.js";
import {BlockStateCacheImpl} from "./stateCache/blockStateCacheImpl.js";
Expand All @@ -101,6 +100,8 @@ import {DbCPStateDatastore} from "./stateCache/datastore/db.js";
import {FileCPStateDatastore} from "./stateCache/datastore/file.js";
import {SyncCommitteeRewards, computeSyncCommitteeRewards} from "./rewards/syncCommitteeRewards.js";
import {AttestationsRewards, computeAttestationsRewards} from "./rewards/attestationsRewards.js";
import {DiffLayers} from "./historicalState/diffLayers.js";
import {IHistoricalStateRegen} from "./historicalState/types.js";

/**
* Arbitrary constants, blobs and payloads should be consumed immediately in the same slot
Expand Down Expand Up @@ -129,7 +130,7 @@ export class BeaconChain implements IBeaconChain {
readonly regen: QueuedStateRegenerator;
readonly lightClientServer?: LightClientServer;
readonly reprocessController: ReprocessController;
readonly historicalStateRegen?: HistoricalStateRegen;
readonly historicalStateRegen?: IHistoricalStateRegen;

// Ops pool
readonly attestationPool: AttestationPool;
Expand Down Expand Up @@ -200,7 +201,7 @@ export class BeaconChain implements IBeaconChain {
eth1: IEth1ForBlockProduction;
executionEngine: IExecutionEngine;
executionBuilder?: IExecutionBuilder;
historicalStateRegen?: HistoricalStateRegen;
historicalStateRegen?: IHistoricalStateRegen;
}
) {
this.opts = opts;
Expand Down Expand Up @@ -324,7 +325,8 @@ export class BeaconChain implements IBeaconChain {
this.bls = bls;
this.emitter = emitter;

this.archiver = new Archiver(db, this, logger, signal, opts);
// TODO: Decouple DiffLayers from archiver
this.archiver = new Archiver(db, this, new DiffLayers(), logger, signal, opts);
// always run PrepareNextSlotScheduler except for fork_choice spec tests
if (!opts?.disablePrepareNextSlot) {
new PrepareNextSlotScheduler(this, this.config, metrics, this.logger, signal);
Expand Down Expand Up @@ -511,7 +513,7 @@ export class BeaconChain implements IBeaconChain {
};
}

const data = await this.db.stateArchive.getByRoot(fromHexString(stateRoot));
const data = await this.db.stateSnapshotArchive.getByRoot(fromHexString(stateRoot));
return data && {state: data, executionOptimistic: false, finalized: true};
}

Expand Down
Loading

0 comments on commit 28d7b6c

Please sign in to comment.