Skip to content

Commit

Permalink
chore: refactoring
Browse files Browse the repository at this point in the history
  • Loading branch information
wemeetagain committed Jul 9, 2024
1 parent 61c7a50 commit 104aa56
Show file tree
Hide file tree
Showing 8 changed files with 145 additions and 116 deletions.
13 changes: 8 additions & 5 deletions packages/beacon-node/src/chain/chain.ts
Original file line number Diff line number Diff line change
Expand Up @@ -826,9 +826,6 @@ export class BeaconChain implements IBeaconChain {
attHeadBlock: ProtoBlock,
regenCaller: RegenCaller
): Promise<EpochShuffling> {
// this is to prevent multiple calls to get shuffling for the same epoch and dependent root
// any subsequent calls of the same epoch and dependent root will wait for this promise to resolve
this.shufflingCache.insertPromise(attEpoch, shufflingDependentRoot);
const blockEpoch = computeEpochAtSlot(attHeadBlock.slot);

let state: CachedBeaconStateAllForks;
Expand All @@ -853,8 +850,14 @@ export class BeaconChain implements IBeaconChain {
state = await this.regen.getState(attHeadBlock.stateRoot, regenCaller);
}

// resolve the promise to unblock other calls of the same epoch and dependent root
return this.shufflingCache.get(attEpoch, getShufflingDecisionBlock(state, attEpoch));
const decisionRoot = getShufflingDecisionBlock(state, attEpoch);
const shuffling = await this.shufflingCache.get(attEpoch, decisionRoot);
if (!shuffling) {
// This will be essentially unreachable considering regen should build the shuffling for this epoch
// but need to handle anyhow
throw Error(`UNREACHABLE: Shuffling not found for attestation epoch ${attEpoch} decisionRoot ${decisionRoot}`);
}
return shuffling;
}

/**
Expand Down
147 changes: 64 additions & 83 deletions packages/beacon-node/src/chain/shufflingCache.ts
Original file line number Diff line number Diff line change
Expand Up @@ -68,42 +68,12 @@ export class ShufflingCache implements IShufflingCache {
this.maxEpochs = opts.maxShufflingCacheEpochs ?? MAX_EPOCHS;
}

/**
* Insert a promise to make sure we don't regen state for the same shuffling.
* Bound by MAX_SHUFFLING_PROMISE to make sure our node does not blow up.
*/
insertPromise(shufflingEpoch: Epoch, decisionRootHex: RootHex): void {
const promiseCount = Array.from(this.itemsByDecisionRootByEpoch.values())
.flatMap((innerMap) => Array.from(innerMap.values()))
.filter((item) => isPromiseCacheItem(item)).length;
if (promiseCount >= MAX_PROMISES) {
throw new Error(
`Too many shuffling promises: ${promiseCount}, shufflingEpoch: ${shufflingEpoch}, decisionRootHex: ${decisionRootHex}`
);
}
let resolveFn: ((shuffling: EpochShuffling) => void) | null = null;
const promise = new Promise<EpochShuffling>((resolve) => {
resolveFn = resolve;
});
if (resolveFn === null) {
throw new Error("Promise Constructor was not executed immediately");
}

const cacheItem: PromiseCacheItem = {
type: CacheItemType.promise,
promise,
resolveFn,
};
this.add(shufflingEpoch, decisionRootHex, cacheItem);
this.metrics?.shufflingCache.insertPromiseCount.inc();
}

/**
* Most of the time, this should return a shuffling immediately.
* If there's a promise, it means we are computing the same shuffling, so we wait for the promise to resolve.
* Return null if we don't have a shuffling for this epoch and dependentRootHex.
*/
async getShufflingOrNull(shufflingEpoch: Epoch, decisionRootHex: RootHex): Promise<EpochShuffling | null> {
async get(shufflingEpoch: Epoch, decisionRootHex: RootHex): Promise<EpochShuffling | null> {
const cacheItem = this.itemsByDecisionRootByEpoch.getOrDefault(shufflingEpoch).get(decisionRootHex);
if (cacheItem === undefined) {
return null;
Expand All @@ -117,18 +87,6 @@ export class ShufflingCache implements IShufflingCache {
}
}

async get(shufflingEpoch: Epoch, decisionRootHex: RootHex): Promise<EpochShuffling> {
const item = await this.getShufflingOrNull(shufflingEpoch, decisionRootHex);
if (!item) {
throw new ShufflingCacheError({
code: ShufflingCacheErrorCode.NO_SHUFFLING_FOUND,
epoch: shufflingEpoch,
decisionRoot: decisionRootHex,
});
}
return item;
}

/**
* Same to getShufflingOrNull() function but synchronous.
*/
Expand All @@ -146,59 +104,82 @@ export class ShufflingCache implements IShufflingCache {
return null;
}

getOrBuildSync(
buildSync(epoch: number, decisionRoot: string, state: BeaconStateAllForks, activeIndices: number[]): EpochShuffling {
const cachedShuffling = this.getSync(epoch, decisionRoot);
if (cachedShuffling) {
return cachedShuffling;
}

const shuffling = computeEpochShuffling(state, activeIndices, epoch);
this.set(shuffling, decisionRoot);
return shuffling;
}

async build(
epoch: number,
decisionRoot: string,
state: BeaconStateAllForks,
activeIndices: number[]
): EpochShuffling {
const cacheItem = this.itemsByDecisionRootByEpoch.getOrDefault(epoch).get(decisionRoot);
if (cacheItem && isShufflingCacheItem(cacheItem)) {
// this.metrics?.shufflingCache.cacheHitInEpochTransition();
return cacheItem.shuffling;
): Promise<EpochShuffling> {
const cachedShuffling = await this.get(epoch, decisionRoot);
if (cachedShuffling) {
return cachedShuffling;
}
// if (cacheItem) {
// this.metrics?.shufflingCache.cacheMissInEpochTransition();
// } else {
// this.metrics?.shufflingCache.shufflingPromiseNotResolvedInEpochTransition();
// }
const shuffling = computeEpochShuffling(state, activeIndices, epoch);
this.add(epoch, decisionRoot, {
type: CacheItemType.shuffling,
shuffling,
});
return shuffling;
}

build(epoch: number, decisionRoot: string, state: BeaconStateAllForks, activeIndices: number[]): void {
let resolveFn: (shuffling: EpochShuffling) => void = () => {};
this.add(epoch, decisionRoot, {
type: CacheItemType.promise,
resolveFn,
promise: new Promise<EpochShuffling>((resolve) => {
resolveFn = resolve;
}),
});
// this is to prevent multiple calls to get shuffling for the same epoch and dependent root
// any subsequent calls of the same epoch and dependent root will wait for this promise to resolve
const cacheItem = this.insertPromise(epoch, decisionRoot);
// TODO: replace this call with a worker
setTimeout(() => {
const shuffling = computeEpochShuffling(state, activeIndices, epoch);
this.add(epoch, decisionRoot, {
type: CacheItemType.shuffling,
shuffling,
});
}, 100);
this.set(shuffling, decisionRoot);
}, 5);
return cacheItem.promise;
}

set(shuffling: EpochShuffling, decisionRoot: string): void {
const cacheItem: ShufflingCacheItem = {
shuffling,
type: CacheItemType.shuffling,
};
this.add(shuffling.epoch, decisionRoot, cacheItem);
const items = this.itemsByDecisionRootByEpoch.getOrDefault(shuffling.epoch);
// if a pending shuffling promise exists, resolve it
const item = items.get(decisionRoot);
if (item !== undefined && isPromiseCacheItem(item)) {
item.resolveFn(shuffling);
}
// set the shuffling
items.set(decisionRoot, {type: CacheItemType.shuffling, shuffling});
// prune the cache
pruneSetToMax(this.itemsByDecisionRootByEpoch, this.maxEpochs);
}

private add(shufflingEpoch: Epoch, decisionBlock: RootHex, cacheItem: CacheItem): void {
this.itemsByDecisionRootByEpoch.getOrDefault(shufflingEpoch).set(decisionBlock, cacheItem);
pruneSetToMax(this.itemsByDecisionRootByEpoch, this.maxEpochs);
/**
* Insert a promise to make sure we don't regen state for the same shuffling.
* Bound by MAX_SHUFFLING_PROMISE to make sure our node does not blow up.
*/
private insertPromise(shufflingEpoch: Epoch, decisionRoot: RootHex): PromiseCacheItem {
const promiseCount = Array.from(this.itemsByDecisionRootByEpoch.values())
.flatMap((innerMap) => Array.from(innerMap.values()))
.filter((item) => isPromiseCacheItem(item)).length;
if (promiseCount >= MAX_PROMISES) {
throw new Error(
`Too many shuffling promises: ${promiseCount}, shufflingEpoch: ${shufflingEpoch}, decisionRootHex: ${decisionRoot}`
);
}
let resolveFn: ((shuffling: EpochShuffling) => void) | null = null;
const promise = new Promise<EpochShuffling>((resolve) => {
resolveFn = resolve;
});
if (resolveFn === null) {
throw new Error("Promise Constructor was not executed immediately");
}

this.metrics?.shufflingCache.insertPromiseCount.inc();

const cacheItem: PromiseCacheItem = {
type: CacheItemType.promise,
promise,
resolveFn,
};
this.itemsByDecisionRootByEpoch.getOrDefault(shufflingEpoch).set(decisionRoot, cacheItem);
return cacheItem;
}
}

Expand Down
2 changes: 1 addition & 1 deletion packages/beacon-node/src/chain/validation/attestation.ts
Original file line number Diff line number Diff line change
Expand Up @@ -584,7 +584,7 @@ export async function getShufflingForAttestationVerification(
const blockEpoch = computeEpochAtSlot(attHeadBlock.slot);
const shufflingDependentRoot = getShufflingDependentRoot(chain.forkChoice, attEpoch, blockEpoch, attHeadBlock);

const shuffling = await chain.shufflingCache.getShufflingOrNull(attEpoch, shufflingDependentRoot);
const shuffling = await chain.shufflingCache.get(attEpoch, shufflingDependentRoot);
if (shuffling) {
// most of the time, we should get the shuffling from cache
chain.metrics?.gossipAttestation.shufflingCacheHit.inc({caller: regenCaller});
Expand Down
23 changes: 13 additions & 10 deletions packages/beacon-node/test/unit/chain/shufflingCache.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -10,11 +10,13 @@ describe("ShufflingCache", function () {
const stateSlot = 100;
const state = generateTestCachedBeaconStateOnlyValidators({vc, slot: stateSlot});
const currentEpoch = state.epochCtx.currentShuffling.epoch;
const activeIndices = Array.from(state.epochCtx.currentShuffling.activeIndices);
let shufflingCache: ShufflingCache;

beforeEach(() => {
shufflingCache = new ShufflingCache(null, {maxShufflingCacheEpochs: 1});
shufflingCache.processState(state, currentEpoch);
const decisionRoot = getShufflingDecisionBlock(state, currentEpoch);
shufflingCache.buildSync(currentEpoch, decisionRoot, state, activeIndices);
});

it("should get shuffling from cache", async function () {
Expand All @@ -26,29 +28,30 @@ describe("ShufflingCache", function () {
const decisionRoot = getShufflingDecisionBlock(state, currentEpoch);
expect(await shufflingCache.get(currentEpoch, decisionRoot)).toEqual(state.epochCtx.currentShuffling);
// insert promises at the same epoch does not prune the cache
shufflingCache.insertPromise(currentEpoch, "0x00");
shufflingCache["insertPromise"](currentEpoch, "0x00");
expect(await shufflingCache.get(currentEpoch, decisionRoot)).toEqual(state.epochCtx.currentShuffling);
// insert shufflings at other epochs does prune the cache
shufflingCache.processState(state, currentEpoch + 1);
shufflingCache.buildSync(currentEpoch + 1, decisionRoot, state, activeIndices);
// the current shuffling is not available anymore
expect(await shufflingCache.get(currentEpoch, decisionRoot)).toBeNull();
});

it("should return shuffling from promise", async function () {
const nextEpoch = currentEpoch + 1;
const nextDecisionRoot = getShufflingDecisionBlock(state, currentEpoch + 1);
shufflingCache.insertPromise(currentEpoch + 1, nextDecisionRoot);
const shufflingRequest0 = shufflingCache.get(currentEpoch + 1, nextDecisionRoot);
const shufflingRequest1 = shufflingCache.get(currentEpoch + 1, nextDecisionRoot);
shufflingCache.processState(state, currentEpoch + 1);
shufflingCache["insertPromise"](nextEpoch, nextDecisionRoot);
const shufflingRequest0 = shufflingCache.get(nextEpoch, nextDecisionRoot);
const shufflingRequest1 = shufflingCache.get(nextEpoch, nextDecisionRoot);
shufflingCache.buildSync(nextEpoch, nextDecisionRoot, state, activeIndices);
expect(await shufflingRequest0).toEqual(state.epochCtx.nextShuffling);
expect(await shufflingRequest1).toEqual(state.epochCtx.nextShuffling);
});

it("should support up to 2 promises at a time", async function () {
// insert 2 promises at the same epoch
shufflingCache.insertPromise(currentEpoch, "0x00");
shufflingCache.insertPromise(currentEpoch, "0x01");
shufflingCache["insertPromise"](currentEpoch, "0x00");
shufflingCache["insertPromise"](currentEpoch, "0x01");
// inserting other promise should throw error
expect(() => shufflingCache.insertPromise(currentEpoch, "0x02")).toThrow();
expect(() => shufflingCache["insertPromise"](currentEpoch, "0x02")).toThrow();
});
});
8 changes: 6 additions & 2 deletions packages/beacon-node/test/utils/validationData/attestation.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ import {
computeEpochAtSlot,
computeSigningRoot,
computeStartSlotAtEpoch,
getActiveValidatorIndices,
getShufflingDecisionBlock,
} from "@lodestar/state-transition";
import {ProtoBlock, IForkChoice, ExecutionStatus, DataAvailabilityStatus} from "@lodestar/fork-choice";
Expand Down Expand Up @@ -82,9 +83,12 @@ export function getAttestationValidData(opts: AttestationValidDataOpts): {
};

const shufflingCache = new ShufflingCache();
shufflingCache.processState(state, state.epochCtx.currentShuffling.epoch);
shufflingCache.processState(state, state.epochCtx.nextShuffling.epoch);
const dependentRoot = getShufflingDecisionBlock(state, state.epochCtx.currentShuffling.epoch);
shufflingCache.set(state.epochCtx.currentShuffling, dependentRoot);

const nextEpoch = state.epochCtx.currentShuffling.epoch + 1;
const nextDependentRoot = getShufflingDecisionBlock(state, nextEpoch);
shufflingCache.buildSync(nextEpoch, nextDependentRoot, state, getActiveValidatorIndices(state, nextEpoch));

const forkChoice = {
getBlock: (root) => {
Expand Down
5 changes: 3 additions & 2 deletions packages/state-transition/src/cache/epochCache.ts
Original file line number Diff line number Diff line change
Expand Up @@ -581,15 +581,16 @@ export class EpochCache {
this.currentShuffling = this.nextShuffling;
} else {
this.currentShuffling =
this.shufflingCache?.getOrBuildSync(this.nextEpoch, this.nextDecisionRoot, state, this.nextActiveIndices) ??
this.shufflingCache?.buildSync(this.nextEpoch, this.nextDecisionRoot, state, this.nextActiveIndices) ??
computeEpochShuffling(state, this.nextActiveIndices, this.nextEpoch);
}

const currentEpoch = this.nextEpoch;
this.nextShuffling = null;
this.nextDecisionRoot = getShufflingDecisionBlock(state, currentEpoch + 1);
this.nextActiveIndices = epochTransitionCache.nextEpochShufflingActiveValidatorIndices;
this.shufflingCache?.build(currentEpoch + 1, this.nextDecisionRoot, state, this.nextActiveIndices);
// TODO move this out to beacon node
void this.shufflingCache?.build(currentEpoch + 1, this.nextDecisionRoot, state, this.nextActiveIndices);

// Roll current proposers into previous proposers for metrics
this.proposersPrevEpoch = this.proposers;
Expand Down
30 changes: 28 additions & 2 deletions packages/state-transition/src/util/epochShuffling.ts
Original file line number Diff line number Diff line change
Expand Up @@ -14,14 +14,40 @@ import {computeStartSlotAtEpoch} from "./epoch.js";
import {getBlockRootAtSlot} from "./blockRoot.js";

export interface IShufflingCache {
/**
* Will synchronously get a shuffling if it is available or will return null if not.
*/
getSync(epoch: Epoch, decisionRoot: RootHex): EpochShuffling | null;
getOrBuildSync(

/**
* Will synchronously get a shuffling if it is available.
*
* If a shuffling is not immediately available, a shuffling will be calculated.
*
* NOTE: this may recalculate an already in-progress shuffling.
*/
buildSync(
epoch: Epoch,
decisionRoot: RootHex,
state: BeaconStateAllForks,
activeIndices: ValidatorIndex[]
): EpochShuffling;
build(epoch: Epoch, decisionRoot: RootHex, state: BeaconStateAllForks, activeIndices: ValidatorIndex[]): void;

/**
* Will immediately return a shuffling if it is available, or a promise to an in-progress shuffling calculation.
*
* If neither is available, a shuffling will be calculated.
*/
build(
epoch: Epoch,
decisionRoot: RootHex,
state: BeaconStateAllForks,
activeIndices: ValidatorIndex[]
): Promise<EpochShuffling>;

/**
* Set a shuffling for a given epoch and decisionRoot.
*/
set(shuffling: EpochShuffling, decisionRoot: RootHex): void;
}

Expand Down
Loading

0 comments on commit 104aa56

Please sign in to comment.