Skip to content

Commit

Permalink
Merge 16f8c7c into 2ee8ac6
Browse files Browse the repository at this point in the history
  • Loading branch information
twoeths authored Apr 5, 2024
2 parents 2ee8ac6 + 16f8c7c commit 9f75183
Show file tree
Hide file tree
Showing 3 changed files with 128 additions and 32 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -87,8 +87,10 @@ export const DEFAULT_MAX_CP_STATE_EPOCHS_IN_MEMORY = 2;
* ╚════════════════════════════════════╝═══════════════╝
*
* The "in memory" checkpoint states are similar to the old implementation: we have both Previous Root Checkpoint State and Current Root Checkpoint State per epoch.
* However in the "persisted to db or fs" part, we usually only persist 1 checkpoint state per epoch, the one that could potentially be justified/finalized later
* based on the view of blocks.
* However in the "persisted to db or fs" part
* - if there is no reorg, we only store 1 checkpoint state per epoch, the one that could potentially be justified/finalized later based on the view of the state
* - if there is reorg, we may store >=2 checkpoint states per epoch, including any checkpoints with unknown roots to the processed state
* - the goal is to make sure we can regen any states later if needed, and we have the checkpoint state that could be justified/finalized later
*/
export class PersistentCheckpointStateCache implements CheckpointStateCache {
private readonly cache: MapTracker<CacheKey, CacheItem>;
Expand All @@ -101,6 +103,7 @@ export class PersistentCheckpointStateCache implements CheckpointStateCache {
private preComputedCheckpoint: string | null = null;
private preComputedCheckpointHits: number | null = null;
private readonly maxEpochsInMemory: number;
// only for testing, default false for production
private readonly processLateBlock: boolean;
private readonly datastore: CPStateDatastore;
private readonly shufflingCache: ShufflingCache;
Expand Down Expand Up @@ -476,9 +479,7 @@ export class PersistentCheckpointStateCache implements CheckpointStateCache {
* - 1 then we'll persist {root: b1, epoch n-1} checkpoint state to disk. Note that at epoch n there is both {root: b0, epoch: n} and {root: c0, epoch: n} checkpoint states in memory
* - 2 then we'll persist {root: b2, epoch n-2} checkpoint state to disk, there are also 2 checkpoint states in memory at epoch n, same to the above (maxEpochsInMemory=1)
*
* As of Jan 2024, it takes 1.2s to persist a holesky state on fast server. TODO:
* - improve state serialization time
* - or research how to only store diff against the finalized state
* As of Mar 2024, it takes <=350ms to persist a holesky state on fast server
*/
async processState(blockRootHex: RootHex, state: CachedBeaconStateAllForks): Promise<number> {
let persistCount = 0;
Expand Down Expand Up @@ -602,7 +603,41 @@ export class PersistentCheckpointStateCache implements CheckpointStateCache {
}

/**
* Prune or persist checkpoint states in an epoch, see the description in `processState()` function
* Prune or persist checkpoint states in an epoch
* 1) If there is 1 checkpoint state with known root, persist it. This is when there is skipped slot at block 0 of epoch
* slot: n
* |-----------------------|-----------------------|
* PRCS root |
*
* 2) If there are 2 checkpoint states, PRCS and CRCS and both roots are known to this state, persist CRCS. If the block is reorged,
* PRCS is regen and populated to this cache again.
* slot: n
* |-----------------------|-----------------------|
* PRCS root - prune |
* CRCS root - persist |
*
* 3) If there are any roots that unknown to this state, persist their cp state. This is to handle the current block is reorged later
*
* 4) (derived from above) If there are 2 checkpoint states, PRCS and an unknown root, persist both.
* - In the example below block slot (n + 1) reorged n
* - If we process state n + 1, CRCS is unknown to it
* - we need to also store CRCS to handle the case (n+2) switches to n again
*
* PRCS - persist
* | processState()
* | |
* -------------n+1
* / |
* n-1 ------n------------n+2
* |
* CRCS - persist
*
* - PRCS is the checkpoint state that could be justified/finalized later based on the view of the state
* - unknown root checkpoint state is persisted to handle the reorg back to that branch later
*
* Performance note:
* - In normal condition, we persist 1 checkpoint state per epoch.
* - In reorged condition, we may persist multiple (most likely 2) checkpoint states per epoch.
*/
private async processPastEpoch(
blockRootHex: RootHex,
Expand All @@ -614,14 +649,29 @@ export class PersistentCheckpointStateCache implements CheckpointStateCache {
const epochBoundaryRoot =
epochBoundarySlot === state.slot ? fromHexString(blockRootHex) : getBlockRootAtSlot(state, epochBoundarySlot);
const epochBoundaryHex = toHexString(epochBoundaryRoot);
const prevEpochRoot = toHexString(getBlockRootAtSlot(state, epochBoundarySlot - 1));

// for each epoch, usually there are 2 rootHexes respective to the 2 checkpoint states: Previous Root Checkpoint State and Current Root Checkpoint State
const cpRootHexes = this.epochIndex.get(epoch) ?? [];
const persistedRootHexes = new Set<RootHex>();

// 1) if there is no CRCS, persist PRCS (block 0 of epoch is skipped). In this case prevEpochRoot === epochBoundaryHex
// 2) if there are PRCS and CRCS, persist CRCS => persist CRCS
// => this is simplified to always persist epochBoundaryHex
persistedRootHexes.add(epochBoundaryHex);

// 3) persist any states with unknown roots to this state
for (const rootHex of cpRootHexes) {
if (rootHex !== epochBoundaryHex && rootHex !== prevEpochRoot) {
persistedRootHexes.add(rootHex);
}
}

// for each epoch, usually there are 2 rootHex respective to the 2 checkpoint states: Previous Root Checkpoint State and Current Root Checkpoint State
for (const rootHex of this.epochIndex.get(epoch) ?? []) {
for (const rootHex of cpRootHexes) {
const cpKey = toCacheKey({epoch: epoch, rootHex});
const cacheItem = this.cache.get(cpKey);

if (cacheItem !== undefined && isInMemoryCacheItem(cacheItem)) {
// this is state in memory, we don't care if the checkpoint state is already persisted
let {persistedKey} = cacheItem;
const {state} = cacheItem;
const logMeta = {
Expand All @@ -631,14 +681,14 @@ export class PersistentCheckpointStateCache implements CheckpointStateCache {
persistedKey: persistedKey ? toHexString(persistedKey) : "",
};

if (rootHex === epochBoundaryHex) {
if (persistedRootHexes.has(rootHex)) {
if (persistedKey) {
// no need to persist
// we don't care if the checkpoint state is already persisted
this.logger.verbose("Pruned checkpoint state from memory but no need to persist", logMeta);
} else {
// persist and do not update epochIndex
this.metrics?.statePersistSecFromSlot.observe(this.clock?.secFromSlot(this.clock?.currentSlot ?? 0) ?? 0);
const cpPersist = {epoch: epoch, root: epochBoundaryRoot};
const cpPersist = {epoch: epoch, root: fromHexString(rootHex)};
{
const timer = this.metrics?.stateSerializeDuration.startTimer();
// automatically free the buffer pool after this scope
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,8 +57,10 @@ describe(
maxBlockStates: number;
maxCPStateEpochsInMemory: number;
reloadCount: number;
// total persist count, to compare to metrics
persistCount: number;
numStatesInMemory: number;
// number of states persisted at the end of test
numStatesPersisted: number;
numEpochsInMemory: number;
numEpochsPersisted: number;
Expand Down Expand Up @@ -162,7 +164,7 @@ describe(
* 16 ^ ^ ^ ^ ^
* 23 24 25 26 27
* ^
* PRCS at epoch 3 is persisted, CRCS is pruned
* both PRCS and CRCS are persisted
*/
{
name: "maxCPStateEpochsInMemory=1, reorg last slot of previous epoch",
Expand All @@ -172,12 +174,12 @@ describe(
maxCPStateEpochsInMemory: 1,
// PRCS at epoch 3 is available in memory so no need to reload
reloadCount: 0,
// 1 cp state for epoch 0 1 2 3
persistCount: 4,
// {root0, epoch: 0} {root8, epoch: 1} {root16, epoch: 2} {root23, epoch: 3} {root24, epoch: 3}
persistCount: 5,
// epoch 4, one for Current Root Checkpoint State and one for Previous Root Checkpoint State
numStatesInMemory: 2,
// chain is not finalized, epoch 4 is in-memory so CP state at epoch 0 1 2 3 are persisted
numStatesPersisted: 4,
// chain is not finalized, same to persistCount
numStatesPersisted: 5,
// epoch 4
numEpochsInMemory: 1,
// chain is not finalized, epoch 4 is in-memory so CP state at epoch 0 1 2 3 are persisted
Expand All @@ -186,15 +188,15 @@ describe(
skip: true,
},
/**
* Block slot 28 has parent slot 23, block slot 24 25 26 and 27 are reorged
* Block slot 28 has parent slot 19, block slot 24 25 26 and 27 are reorged
* --------------------------------|---
* / | ^ ^ ^ ^
* / | 28 29 32 33
* |----------------|----------
* 16 ^ ^ ^ ^ ^ ^
* 19 23 24 25 26 27
* ^
* PRCS at epoch 3 is persisted, CRCS is pruned
* both PRCS and CRCS are persisted since their roots are unknown to block state 33
*/
{
name: "maxCPStateEpochsInMemory=1, reorg middle slot of previous epoch",
Expand All @@ -204,12 +206,12 @@ describe(
maxCPStateEpochsInMemory: 1,
// reload CP state epoch 2 (slot = 16)
reloadCount: 1,
// 1 cp state for epoch 0 1 2 3
persistCount: 4,
// {root0, epoch: 0} {root8, epoch: 1} {root16, epoch: 2} {root23, epoch: 3} {root24, epoch: 3} {root19, epoch: 3}
persistCount: 6,
// epoch 4, one for Current Root Checkpoint State and one for Previous Root Checkpoint State
numStatesInMemory: 2,
// chain is not finalized, epoch 4 is in-memory so CP state at epoch 0 1 2 3 are persisted
numStatesPersisted: 4,
// chain is not finalized, same to persist count
numStatesPersisted: 6,
// epoch 4
numEpochsInMemory: 1,
// chain is not finalized, epoch 4 is in-memory so CP state at epoch 0 1 2 3 are persisted
Expand All @@ -218,15 +220,15 @@ describe(
skip: true,
},
/**
* Block slot 28 has parent slot 23, block slot 24 25 26 and 27 are reorged
* Block slot 28 has parent slot 15, block slot 24 25 26 and 27 are reorged
* --------------------------------------------|---
* / | ^ ^ ^ ^
* / | 28 29 32 33
* |----------------|----------------|----------
* ^ ^ 16 ^ ^ ^ ^ ^ ^
* |----------------|----------------|---------- ^
* ^ ^ 16 ^ ^ ^ ^ ^ ^ test end
* 8 15 19 23 24 25 26 27
*reload ^
* PRCS at epoch 3 is persisted, CRCS is pruned
* both PRCS and CRCS are persisted because roots are unknown to block 28
*/
{
name: "maxCPStateEpochsInMemory=1, reorg 2 epochs",
Expand All @@ -236,12 +238,12 @@ describe(
maxCPStateEpochsInMemory: 1,
// reload CP state epoch 2 (slot = 16)
reloadCount: 1,
// 1 cp state for epoch 0 1, 2 CP states for epoch 2, 1 cp state for epoch 3
persistCount: 5,
// {root0, epoch: 0} {root8, epoch: 1} {root16, epoch: 2} {root15, epoch: 2} {root23, epoch: 3} {root24, epoch: 3} {root15, epoch: 3}
persistCount: 7,
// epoch 4, one for Current Root Checkpoint State and one for Previous Root Checkpoint State
numStatesInMemory: 2,
// chain is not finalized, epoch 4 is in-memory so CP state at epoch 0 1 2 3 are persisted, epoch 2 has 2 CP states
numStatesPersisted: 5,
// chain is not finalized, so same number to persistCount
numStatesPersisted: 7,
// epoch 4
numEpochsInMemory: 1,
// chain is not finalized, epoch 4 is in-memory so CP state at epoch 0 1 2 3 are persisted
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -867,14 +867,58 @@ describe("PersistentCheckpointStateCache", function () {
expect(await cache.getStateOrBytes(cp0aHex)).toBeNull();
});

// Real mainnet scenario: root1b reorg root1a, and later on it's reorged back to root1a
// processState is skipped for root1a because it's late, it's only called for root1b
// we should persist both checkpoint states {0a, 20} and {0b, 20} in order to have finalized checkpoint states later
// - {0a, 20} is persisted because it's the view of root1b state
// - {0b, 20} is persisted because it's unknown in root1b state
// epoch: 19 20 21 22 23
// |-----------|-----------|-----------|-----------|
// ^^ ^ ^
// || | |
// |0b --root1a|
// | |
// 0a---------root1b
it("reorg 1 epoch", async () => {
it("reorg 1 epoch, processState once", async () => {
fileApisBuffer = new Map();
const datastore = getTestDatastore(fileApisBuffer);
cache = new PersistentCheckpointStateCache(
{datastore, logger: testLogger(), shufflingCache: new ShufflingCache()},
{maxCPStateEpochsInMemory: 0, processLateBlock: true}
);

const root1a = Buffer.alloc(32, 100);
const state1a = states["cp0b"].clone();
state1a.slot = 20 * SLOTS_PER_EPOCH + SLOTS_PER_EPOCH + 3;
state1a.blockRoots.set(state1a.slot % SLOTS_PER_HISTORICAL_ROOT, root1a);
// state transition add to cache
cache.add(cp0b, states["cp0b"]);
// do not processState root1a because it's late

// no need to reload cp0b because it's available in block state
const root1b = Buffer.alloc(32, 101);
const state1b = states["cp0a"].clone();
state1b.slot = state1a.slot + 1;
state1b.blockRoots.set(state1b.slot % SLOTS_PER_HISTORICAL_ROOT, root1b);
// state transition add to cache
cache.add(cp0a, states["cp0a"]);

// need to persist 2 checkpoint states
expect(await cache.processState(toHexString(root1b), state1b)).toEqual(2);
// both are persisited
expect(await cache.getStateOrBytes(cp0bHex)).toEqual(stateBytes["cp0b"]);
expect(await cache.getStateOrBytes(cp0aHex)).toEqual(stateBytes["cp0a"]);
});

// Same to above, but we processState for both root1a and root1b
// epoch: 19 20 21 22 23
// |-----------|-----------|-----------|-----------|
// ^^ ^ ^
// || | |
// |0b --root1a|
// | |
// 0a---------root1b
it("reorg 1 epoch, processState twice", async () => {
expect(await cache.processState(toHexString(root0b), states["cp0b"])).toEqual(1);
await assertPersistedCheckpointState([cp0b], [stateBytes["cp0b"]]);
expect(await cache.getStateOrBytes(cp0aHex)).toBeNull();
Expand Down

0 comments on commit 9f75183

Please sign in to comment.