Skip to content

Commit

Permalink
Move endpoints to standalone functions
Browse files Browse the repository at this point in the history
  • Loading branch information
nazarhussain committed Dec 26, 2023
1 parent 1200c59 commit f6907c2
Show file tree
Hide file tree
Showing 21 changed files with 1,193 additions and 937 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
import {ServerApi, routes} from "@lodestar/api";
import {ApiModules} from "../../types.js";
import {ValidatorEndpointDependencies} from "./types.js";

export function buildGetAggregatedAttestation(
{chain, metrics}: ApiModules,
{notWhileSyncing, waitForSlotWithDisparity}: ValidatorEndpointDependencies
): ServerApi<routes.validator.Api>["getAggregatedAttestation"] {
return async function getAggregatedAttestation(attestationDataRoot, slot) {
notWhileSyncing();

await waitForSlotWithDisparity(slot); // Must never request for a future slot > currentSlot

const aggregate = chain.attestationPool.getAggregate(slot, attestationDataRoot);
metrics?.production.producedAggregateParticipants.observe(aggregate.aggregationBits.getTrueBitIndexes().length);

return {
data: aggregate,
};
};
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
import {ServerApi, routes} from "@lodestar/api";
import {toHex} from "@lodestar/utils";
import {attesterShufflingDecisionRoot} from "@lodestar/state-transition";
import {RegenCaller} from "../../../../chain/regen/interface.js";
import {isOptimisticBlock} from "../../../../util/forkChoice.js";
import {ApiModules} from "../../types.js";
import {getPubkeysForIndices} from "../utils.js";
import {ApiError} from "../../errors.js";
import {ValidatorEndpointDependencies} from "./types.js";

export function buildGetAttesterDuties(
{chain}: ApiModules,
{notWhileSyncing, getGenesisBlockRoot, waitForNextClosestEpoch}: ValidatorEndpointDependencies
): ServerApi<routes.validator.Api>["getAttesterDuties"] {
return async function getAttesterDuties(epoch, validatorIndices) {
notWhileSyncing();

if (validatorIndices.length === 0) {
throw new ApiError(400, "No validator to get attester duties");
}

// May request for an epoch that's in the future
await waitForNextClosestEpoch();

// should not compare to headEpoch in order to handle skipped slots
// Check if the epoch is in the future after waiting for requested slot
if (epoch > chain.clock.currentEpoch + 1) {
throw new ApiError(400, "Cannot get duties for epoch more than one ahead");
}

const head = chain.forkChoice.getHead();
const state = await chain.getHeadStateAtCurrentEpoch(RegenCaller.getDuties);

// TODO: Determine what the current epoch would be if we fast-forward our system clock by
// `MAXIMUM_GOSSIP_CLOCK_DISPARITY`.
//
// Most of the time, `tolerantCurrentEpoch` will be equal to `currentEpoch`. However, during
// the first `MAXIMUM_GOSSIP_CLOCK_DISPARITY` duration of the epoch `tolerantCurrentEpoch`
// will equal `currentEpoch + 1`

// Check that all validatorIndex belong to the state before calling getCommitteeAssignments()
const pubkeys = getPubkeysForIndices(state.validators, validatorIndices);
const committeeAssignments = state.epochCtx.getCommitteeAssignments(epoch, validatorIndices);
const duties: routes.validator.AttesterDuty[] = [];
for (let i = 0, len = validatorIndices.length; i < len; i++) {
const validatorIndex = validatorIndices[i];
const duty = committeeAssignments.get(validatorIndex) as routes.validator.AttesterDuty | undefined;
if (duty) {
// Mutate existing object instead of re-creating another new object with spread operator
// Should be faster and require less memory
duty.pubkey = pubkeys[i];
duties.push(duty);
}
}

const dependentRoot = attesterShufflingDecisionRoot(state, epoch) || (await getGenesisBlockRoot(state));

return {
data: duties,
dependentRoot: toHex(dependentRoot),
executionOptimistic: isOptimisticBlock(head),
};
};
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
import {ServerApi, routes} from "@lodestar/api";
import {ApiModules} from "../../types.js";
import {ApiError} from "../../errors.js";
import {ValidatorEndpointDependencies} from "./types.js";

export function buildGetLiveness(
{chain}: ApiModules,
_deps: ValidatorEndpointDependencies
): ServerApi<routes.validator.Api>["getLiveness"] {
return async function getLiveness(epoch, validatorIndices) {
if (validatorIndices.length === 0) {
return {
data: [],
};
}
const currentEpoch = chain.clock.currentEpoch;
if (epoch < currentEpoch - 1 || epoch > currentEpoch + 1) {
throw new ApiError(
400,
`Request epoch ${epoch} is more than one epoch before or after the current epoch ${currentEpoch}`
);
}

return {
data: validatorIndices.map((index) => ({
index,
isLive: chain.validatorSeenAtEpoch(index, epoch),
})),
};
};
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,95 @@
import {ServerApi, routes} from "@lodestar/api";
import {
CachedBeaconStateAllForks,
computeStartSlotAtEpoch,
proposerShufflingDecisionRoot,
} from "@lodestar/state-transition";
import {ValidatorIndex} from "@lodestar/types";
import {SLOTS_PER_EPOCH} from "@lodestar/params";
import {toHex} from "@lodestar/utils";
import {RegenCaller} from "../../../../chain/regen/interface.js";
import {ApiModules} from "../../types.js";
import {getPubkeysForIndices} from "../utils.js";
import {SCHEDULER_LOOKAHEAD_FACTOR} from "../../../../chain/prepareNextSlot.js";
import {isOptimisticBlock} from "../../../../util/forkChoice.js";
import {ValidatorEndpointDependencies} from "./types.js";

export function buildGetProposerDuties(
{chain, config, metrics}: ApiModules,
{
notWhileSyncing,
getGenesisBlockRoot,
currentEpochWithDisparity,
msToNextEpoch,
waitForCheckpointState,
}: ValidatorEndpointDependencies
): ServerApi<routes.validator.Api>["getProposerDuties"] {
return async function getProposerDuties(epoch) {
notWhileSyncing();

// Early check that epoch is within [current_epoch, current_epoch + 1], or allow for pre-genesis
const currentEpoch = currentEpochWithDisparity();
const nextEpoch = currentEpoch + 1;
if (currentEpoch >= 0 && epoch !== currentEpoch && epoch !== nextEpoch) {
throw Error(`Requested epoch ${epoch} must equal current ${currentEpoch} or next epoch ${nextEpoch}`);
}

const head = chain.forkChoice.getHead();
let state: CachedBeaconStateAllForks | undefined = undefined;
const slotMs = config.SECONDS_PER_SLOT * 1000;
const prepareNextSlotLookAheadMs = slotMs / SCHEDULER_LOOKAHEAD_FACTOR;
const toNextEpochMs = msToNextEpoch();
// validators may request next epoch's duties when it's close to next epoch
// this is to avoid missed block proposal due to 0 epoch look ahead
if (epoch === nextEpoch && toNextEpochMs < prepareNextSlotLookAheadMs) {
// wait for maximum 1 slot for cp state which is the timeout of validator api
const cpState = await waitForCheckpointState({rootHex: head.blockRoot, epoch});
if (cpState) {
state = cpState;
metrics?.duties.requestNextEpochProposalDutiesHit.inc();
} else {
metrics?.duties.requestNextEpochProposalDutiesMiss.inc();
}
}

if (!state) {
state = await chain.getHeadStateAtCurrentEpoch(RegenCaller.getDuties);
}

const stateEpoch = state.epochCtx.epoch;
let indexes: ValidatorIndex[] = [];

if (epoch === stateEpoch) {
indexes = state.epochCtx.getBeaconProposers();
} else if (epoch === stateEpoch + 1) {
// Requesting duties for next epoch is allow since they can be predicted with high probabilities.
// @see `epochCtx.getBeaconProposersNextEpoch` JSDocs for rationale.
indexes = state.epochCtx.getBeaconProposersNextEpoch();
} else {
// Should never happen, epoch is checked to be in bounds above
throw Error(`Proposer duties for epoch ${epoch} not supported, current epoch ${stateEpoch}`);
}

// NOTE: this is the fastest way of getting compressed pubkeys.
// See benchmark -> packages/lodestar/test/perf/api/impl/validator/attester.test.ts
// After dropping the flat caches attached to the CachedBeaconState it's no longer available.
// TODO: Add a flag to just send 0x00 as pubkeys since the Lodestar validator does not need them.
const pubkeys = getPubkeysForIndices(state.validators, indexes);

const startSlot = computeStartSlotAtEpoch(stateEpoch);
const duties: routes.validator.ProposerDuty[] = [];
for (let i = 0; i < SLOTS_PER_EPOCH; i++) {
duties.push({slot: startSlot + i, validatorIndex: indexes[i], pubkey: pubkeys[i]});
}

// Returns `null` on the one-off scenario where the genesis block decides its own shuffling.
// It should be set to the latest block applied to `self` or the genesis block root.
const dependentRoot = proposerShufflingDecisionRoot(state) || (await getGenesisBlockRoot(state));

return {
data: duties,
dependentRoot: toHex(dependentRoot),
executionOptimistic: isOptimisticBlock(head),
};
};
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
import {ServerApi, routes} from "@lodestar/api";
import {isOptimisticBlock} from "../../../../util/forkChoice.js";
import {ApiModules} from "../../types.js";
import {getPubkeysForIndices} from "../utils.js";
import {ApiError} from "../../errors.js";
import {ValidatorEndpointDependencies} from "./types.js";

export function buildGetSyncCommitteeDuties(
{chain}: ApiModules,
{notWhileSyncing, waitForNextClosestEpoch}: ValidatorEndpointDependencies
): ServerApi<routes.validator.Api>["getSyncCommitteeDuties"] {
/**
* `POST /eth/v1/validator/duties/sync/{epoch}`
*
* Requests the beacon node to provide a set of sync committee duties for a particular epoch.
* - Although pubkey can be inferred from the index we return it to keep this call analogous with the one that
* fetches attester duties.
* - `sync_committee_index` is the index of the validator in the sync committee. This can be used to infer the
* subnet to which the contribution should be broadcast. Note, there can be multiple per validator.
*
* https://github.com/ethereum/beacon-APIs/pull/134
*
* @param validatorIndices an array of the validator indices for which to obtain the duties.
*/
return async function getSyncCommitteeDuties(epoch, validatorIndices) {
notWhileSyncing();

if (validatorIndices.length === 0) {
throw new ApiError(400, "No validator to get attester duties");
}

// May request for an epoch that's in the future
await waitForNextClosestEpoch();

// sync committee duties have a lookahead of 1 day. Assuming the validator only requests duties for upcoming
// epochs, the head state will very likely have the duties available for the requested epoch.
// Note: does not support requesting past duties
const head = chain.forkChoice.getHead();
const state = chain.getHeadState();

// Check that all validatorIndex belong to the state before calling getCommitteeAssignments()
const pubkeys = getPubkeysForIndices(state.validators, validatorIndices);
// Ensures `epoch // EPOCHS_PER_SYNC_COMMITTEE_PERIOD <= current_epoch // EPOCHS_PER_SYNC_COMMITTEE_PERIOD + 1`
const syncCommitteeCache = state.epochCtx.getIndexedSyncCommitteeAtEpoch(epoch);
const syncCommitteeValidatorIndexMap = syncCommitteeCache.validatorIndexMap;

const duties: routes.validator.SyncDuty[] = [];
for (let i = 0, len = validatorIndices.length; i < len; i++) {
const validatorIndex = validatorIndices[i];
const validatorSyncCommitteeIndices = syncCommitteeValidatorIndexMap.get(validatorIndex);
if (validatorSyncCommitteeIndices) {
duties.push({
pubkey: pubkeys[i],
validatorIndex,
validatorSyncCommitteeIndices,
});
}
}

return {
data: duties,
executionOptimistic: isOptimisticBlock(head),
};
};
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
import {ServerApi, routes} from "@lodestar/api";
import {ApiModules} from "../../types.js";
import {computeSubnetForCommitteesAtSlot} from "../utils.js";
import {ValidatorEndpointDependencies} from "./types.js";

export function buildPrepareBeaconCommitteeSubnet(
{metrics, network}: ApiModules,
{notWhileSyncing}: ValidatorEndpointDependencies
): ServerApi<routes.validator.Api>["prepareBeaconCommitteeSubnet"] {
return async function prepareBeaconCommitteeSubnet(subscriptions) {
notWhileSyncing();

await network.prepareBeaconCommitteeSubnets(
subscriptions.map(({validatorIndex, slot, isAggregator, committeesAtSlot, committeeIndex}) => ({
validatorIndex: validatorIndex,
subnet: computeSubnetForCommitteesAtSlot(slot, committeesAtSlot, committeeIndex),
slot: slot,
isAggregator: isAggregator,
}))
);

// TODO:
// If the discovery mechanism isn't disabled, attempt to set up a peer discovery for the
// required subnets.

if (metrics) {
for (const subscription of subscriptions) {
metrics.registerLocalValidator(subscription.validatorIndex);
}
}
};
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
import {ServerApi, routes} from "@lodestar/api";
import {ApiModules} from "../../types.js";
import {ValidatorEndpointDependencies} from "./types.js";

export function buildPrepareBeaconProposer(
{chain}: ApiModules,
_deps: ValidatorEndpointDependencies
): ServerApi<routes.validator.Api>["prepareBeaconProposer"] {
return async function prepareBeaconProposer(proposers) {
await chain.updateBeaconProposerData(chain.clock.currentEpoch, proposers);
};
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
import {ServerApi, routes} from "@lodestar/api";
import {SYNC_COMMITTEE_SUBNET_SIZE} from "@lodestar/params";
import {computeStartSlotAtEpoch} from "@lodestar/state-transition";
import {CommitteeSubscription} from "../../../../network/subnets/interface.js";
import {ApiModules} from "../../types.js";
import {ValidatorEndpointDependencies} from "./types.js";

export function buildPrepareSyncCommitteeSubnets(
{network, metrics}: ApiModules,
{notWhileSyncing}: ValidatorEndpointDependencies
): ServerApi<routes.validator.Api>["prepareSyncCommitteeSubnets"] {
/**
* POST `/eth/v1/validator/sync_committee_subscriptions`
*
* Subscribe to a number of sync committee subnets.
* Sync committees are not present in phase0, but are required for Altair networks.
* Subscribing to sync committee subnets is an action performed by VC to enable network participation in Altair networks,
* and only required if the VC has an active validator in an active sync committee.
*
* https://github.com/ethereum/beacon-APIs/pull/136
*/
return async function prepareSyncCommitteeSubnets(subscriptions) {
notWhileSyncing();

// A `validatorIndex` can be in multiple subnets, so compute the CommitteeSubscription with double for loop
const subs: CommitteeSubscription[] = [];
for (const sub of subscriptions) {
for (const committeeIndex of sub.syncCommitteeIndices) {
const subnet = Math.floor(committeeIndex / SYNC_COMMITTEE_SUBNET_SIZE);
subs.push({
validatorIndex: sub.validatorIndex,
subnet: subnet,
// Subscribe until the end of `untilEpoch`: https://github.com/ethereum/beacon-APIs/pull/136#issuecomment-840315097
slot: computeStartSlotAtEpoch(sub.untilEpoch + 1),
isAggregator: true,
});
}
}

await network.prepareSyncCommitteeSubnets(subs);

if (metrics) {
for (const subscription of subscriptions) {
metrics.registerLocalValidatorInSyncCommittee(subscription.validatorIndex, subscription.untilEpoch);
}
}
};
}
Loading

0 comments on commit f6907c2

Please sign in to comment.