Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor: move validator api endpoints to standalone functions #6238

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
import {ServerApi, routes} from "@lodestar/api";
import {ApiModules} from "../../types.js";
import {ValidatorEndpointDependencies} from "./types.js";

export function buildGetAggregatedAttestation(
{chain, metrics}: ApiModules,
{notWhileSyncing, waitForSlotWithDisparity}: ValidatorEndpointDependencies
): ServerApi<routes.validator.Api>["getAggregatedAttestation"] {
return async function getAggregatedAttestation(attestationDataRoot, slot) {
notWhileSyncing();

await waitForSlotWithDisparity(slot); // Must never request for a future slot > currentSlot

const aggregate = chain.attestationPool.getAggregate(slot, attestationDataRoot);
metrics?.production.producedAggregateParticipants.observe(aggregate.aggregationBits.getTrueBitIndexes().length);

return {
data: aggregate,
};
};
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
import {ServerApi, routes} from "@lodestar/api";
import {toHex} from "@lodestar/utils";
import {attesterShufflingDecisionRoot} from "@lodestar/state-transition";
import {RegenCaller} from "../../../../chain/regen/interface.js";
import {isOptimisticBlock} from "../../../../util/forkChoice.js";
import {ApiModules} from "../../types.js";
import {getPubkeysForIndices} from "../utils.js";
import {ApiError} from "../../errors.js";
import {ValidatorEndpointDependencies} from "./types.js";

export function buildGetAttesterDuties(
{chain}: ApiModules,
{notWhileSyncing, getGenesisBlockRoot, waitForNextClosestEpoch}: ValidatorEndpointDependencies
): ServerApi<routes.validator.Api>["getAttesterDuties"] {
return async function getAttesterDuties(epoch, validatorIndices) {
notWhileSyncing();

if (validatorIndices.length === 0) {
throw new ApiError(400, "No validator to get attester duties");
}

// May request for an epoch that's in the future
await waitForNextClosestEpoch();

// should not compare to headEpoch in order to handle skipped slots
// Check if the epoch is in the future after waiting for requested slot
if (epoch > chain.clock.currentEpoch + 1) {
throw new ApiError(400, "Cannot get duties for epoch more than one ahead");
}

const head = chain.forkChoice.getHead();
const state = await chain.getHeadStateAtCurrentEpoch(RegenCaller.getDuties);

// TODO: Determine what the current epoch would be if we fast-forward our system clock by
// `MAXIMUM_GOSSIP_CLOCK_DISPARITY`.
//
// Most of the time, `tolerantCurrentEpoch` will be equal to `currentEpoch`. However, during
// the first `MAXIMUM_GOSSIP_CLOCK_DISPARITY` duration of the epoch `tolerantCurrentEpoch`
// will equal `currentEpoch + 1`

// Check that all validatorIndex belong to the state before calling getCommitteeAssignments()
const pubkeys = getPubkeysForIndices(state.validators, validatorIndices);
const committeeAssignments = state.epochCtx.getCommitteeAssignments(epoch, validatorIndices);
const duties: routes.validator.AttesterDuty[] = [];
for (let i = 0, len = validatorIndices.length; i < len; i++) {
const validatorIndex = validatorIndices[i];
const duty = committeeAssignments.get(validatorIndex) as routes.validator.AttesterDuty | undefined;
if (duty) {
// Mutate existing object instead of re-creating another new object with spread operator
// Should be faster and require less memory
duty.pubkey = pubkeys[i];
duties.push(duty);
}
}

const dependentRoot = attesterShufflingDecisionRoot(state, epoch) || (await getGenesisBlockRoot(state));

return {
data: duties,
dependentRoot: toHex(dependentRoot),
executionOptimistic: isOptimisticBlock(head),
};
};
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
import {ServerApi, routes} from "@lodestar/api";
import {ApiModules} from "../../types.js";
import {ApiError} from "../../errors.js";
import {ValidatorEndpointDependencies} from "./types.js";

export function buildGetLiveness(
{chain}: ApiModules,
_deps: ValidatorEndpointDependencies
): ServerApi<routes.validator.Api>["getLiveness"] {
return async function getLiveness(epoch, validatorIndices) {
if (validatorIndices.length === 0) {
return {
data: [],
};
}
const currentEpoch = chain.clock.currentEpoch;
if (epoch < currentEpoch - 1 || epoch > currentEpoch + 1) {
throw new ApiError(
400,
`Request epoch ${epoch} is more than one epoch before or after the current epoch ${currentEpoch}`
);
}

return {
data: validatorIndices.map((index) => ({
index,
isLive: chain.validatorSeenAtEpoch(index, epoch),
})),
};
};
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,95 @@
import {ServerApi, routes} from "@lodestar/api";
import {
CachedBeaconStateAllForks,
computeStartSlotAtEpoch,
proposerShufflingDecisionRoot,
} from "@lodestar/state-transition";
import {ValidatorIndex} from "@lodestar/types";
import {SLOTS_PER_EPOCH} from "@lodestar/params";
import {toHex} from "@lodestar/utils";
import {RegenCaller} from "../../../../chain/regen/interface.js";
import {ApiModules} from "../../types.js";
import {getPubkeysForIndices} from "../utils.js";
import {SCHEDULER_LOOKAHEAD_FACTOR} from "../../../../chain/prepareNextSlot.js";
import {isOptimisticBlock} from "../../../../util/forkChoice.js";
import {ValidatorEndpointDependencies} from "./types.js";

export function buildGetProposerDuties(
{chain, config, metrics}: ApiModules,
{
notWhileSyncing,
getGenesisBlockRoot,
currentEpochWithDisparity,
msToNextEpoch,
waitForCheckpointState,
}: ValidatorEndpointDependencies
): ServerApi<routes.validator.Api>["getProposerDuties"] {
return async function getProposerDuties(epoch) {
notWhileSyncing();

// Early check that epoch is within [current_epoch, current_epoch + 1], or allow for pre-genesis
const currentEpoch = currentEpochWithDisparity();
const nextEpoch = currentEpoch + 1;
if (currentEpoch >= 0 && epoch !== currentEpoch && epoch !== nextEpoch) {
throw Error(`Requested epoch ${epoch} must equal current ${currentEpoch} or next epoch ${nextEpoch}`);
}

const head = chain.forkChoice.getHead();
let state: CachedBeaconStateAllForks | undefined = undefined;
const slotMs = config.SECONDS_PER_SLOT * 1000;
const prepareNextSlotLookAheadMs = slotMs / SCHEDULER_LOOKAHEAD_FACTOR;
const toNextEpochMs = msToNextEpoch();
// validators may request next epoch's duties when it's close to next epoch
// this is to avoid missed block proposal due to 0 epoch look ahead
if (epoch === nextEpoch && toNextEpochMs < prepareNextSlotLookAheadMs) {
// wait for maximum 1 slot for cp state which is the timeout of validator api
const cpState = await waitForCheckpointState({rootHex: head.blockRoot, epoch});
if (cpState) {
state = cpState;
metrics?.duties.requestNextEpochProposalDutiesHit.inc();
} else {
metrics?.duties.requestNextEpochProposalDutiesMiss.inc();
}
}

if (!state) {
state = await chain.getHeadStateAtCurrentEpoch(RegenCaller.getDuties);
}

const stateEpoch = state.epochCtx.epoch;
let indexes: ValidatorIndex[] = [];

if (epoch === stateEpoch) {
indexes = state.epochCtx.getBeaconProposers();
} else if (epoch === stateEpoch + 1) {
// Requesting duties for next epoch is allow since they can be predicted with high probabilities.
// @see `epochCtx.getBeaconProposersNextEpoch` JSDocs for rationale.
indexes = state.epochCtx.getBeaconProposersNextEpoch();
} else {
// Should never happen, epoch is checked to be in bounds above
throw Error(`Proposer duties for epoch ${epoch} not supported, current epoch ${stateEpoch}`);
}

// NOTE: this is the fastest way of getting compressed pubkeys.
// See benchmark -> packages/lodestar/test/perf/api/impl/validator/attester.test.ts
// After dropping the flat caches attached to the CachedBeaconState it's no longer available.
// TODO: Add a flag to just send 0x00 as pubkeys since the Lodestar validator does not need them.
const pubkeys = getPubkeysForIndices(state.validators, indexes);

const startSlot = computeStartSlotAtEpoch(stateEpoch);
const duties: routes.validator.ProposerDuty[] = [];
for (let i = 0; i < SLOTS_PER_EPOCH; i++) {
duties.push({slot: startSlot + i, validatorIndex: indexes[i], pubkey: pubkeys[i]});
}

// Returns `null` on the one-off scenario where the genesis block decides its own shuffling.
// It should be set to the latest block applied to `self` or the genesis block root.
const dependentRoot = proposerShufflingDecisionRoot(state) || (await getGenesisBlockRoot(state));

return {
data: duties,
dependentRoot: toHex(dependentRoot),
executionOptimistic: isOptimisticBlock(head),
};
};
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
import {ServerApi, routes} from "@lodestar/api";
import {isOptimisticBlock} from "../../../../util/forkChoice.js";
import {ApiModules} from "../../types.js";
import {getPubkeysForIndices} from "../utils.js";
import {ApiError} from "../../errors.js";
import {ValidatorEndpointDependencies} from "./types.js";

export function buildGetSyncCommitteeDuties(
{chain}: ApiModules,
{notWhileSyncing, waitForNextClosestEpoch}: ValidatorEndpointDependencies
): ServerApi<routes.validator.Api>["getSyncCommitteeDuties"] {
/**
* `POST /eth/v1/validator/duties/sync/{epoch}`
*
* Requests the beacon node to provide a set of sync committee duties for a particular epoch.
* - Although pubkey can be inferred from the index we return it to keep this call analogous with the one that
* fetches attester duties.
* - `sync_committee_index` is the index of the validator in the sync committee. This can be used to infer the
* subnet to which the contribution should be broadcast. Note, there can be multiple per validator.
*
* https://github.com/ethereum/beacon-APIs/pull/134
*
* @param validatorIndices an array of the validator indices for which to obtain the duties.
*/
return async function getSyncCommitteeDuties(epoch, validatorIndices) {
notWhileSyncing();

if (validatorIndices.length === 0) {
throw new ApiError(400, "No validator to get attester duties");
}

// May request for an epoch that's in the future
await waitForNextClosestEpoch();

// sync committee duties have a lookahead of 1 day. Assuming the validator only requests duties for upcoming
// epochs, the head state will very likely have the duties available for the requested epoch.
// Note: does not support requesting past duties
const head = chain.forkChoice.getHead();
const state = chain.getHeadState();

// Check that all validatorIndex belong to the state before calling getCommitteeAssignments()
const pubkeys = getPubkeysForIndices(state.validators, validatorIndices);
// Ensures `epoch // EPOCHS_PER_SYNC_COMMITTEE_PERIOD <= current_epoch // EPOCHS_PER_SYNC_COMMITTEE_PERIOD + 1`
const syncCommitteeCache = state.epochCtx.getIndexedSyncCommitteeAtEpoch(epoch);
const syncCommitteeValidatorIndexMap = syncCommitteeCache.validatorIndexMap;

const duties: routes.validator.SyncDuty[] = [];
for (let i = 0, len = validatorIndices.length; i < len; i++) {
const validatorIndex = validatorIndices[i];
const validatorSyncCommitteeIndices = syncCommitteeValidatorIndexMap.get(validatorIndex);
if (validatorSyncCommitteeIndices) {
duties.push({
pubkey: pubkeys[i],
validatorIndex,
validatorSyncCommitteeIndices,
});
}
}

return {
data: duties,
executionOptimistic: isOptimisticBlock(head),
};
};
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
import {ServerApi, routes} from "@lodestar/api";
import {ApiModules} from "../../types.js";
import {computeSubnetForCommitteesAtSlot} from "../utils.js";
import {ValidatorEndpointDependencies} from "./types.js";

export function buildPrepareBeaconCommitteeSubnet(
{metrics, network}: ApiModules,
{notWhileSyncing}: ValidatorEndpointDependencies
): ServerApi<routes.validator.Api>["prepareBeaconCommitteeSubnet"] {
return async function prepareBeaconCommitteeSubnet(subscriptions) {
notWhileSyncing();

await network.prepareBeaconCommitteeSubnets(
subscriptions.map(({validatorIndex, slot, isAggregator, committeesAtSlot, committeeIndex}) => ({
validatorIndex: validatorIndex,
subnet: computeSubnetForCommitteesAtSlot(slot, committeesAtSlot, committeeIndex),
slot: slot,
isAggregator: isAggregator,
}))
);

// TODO:
// If the discovery mechanism isn't disabled, attempt to set up a peer discovery for the
// required subnets.

if (metrics) {
for (const subscription of subscriptions) {
metrics.registerLocalValidator(subscription.validatorIndex);
}
}
};
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
import {ServerApi, routes} from "@lodestar/api";
import {ApiModules} from "../../types.js";
import {ValidatorEndpointDependencies} from "./types.js";

export function buildPrepareBeaconProposer(
{chain}: ApiModules,
_deps: ValidatorEndpointDependencies
): ServerApi<routes.validator.Api>["prepareBeaconProposer"] {
return async function prepareBeaconProposer(proposers) {
await chain.updateBeaconProposerData(chain.clock.currentEpoch, proposers);
};
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
import {ServerApi, routes} from "@lodestar/api";
import {SYNC_COMMITTEE_SUBNET_SIZE} from "@lodestar/params";
import {computeStartSlotAtEpoch} from "@lodestar/state-transition";
import {CommitteeSubscription} from "../../../../network/subnets/interface.js";
import {ApiModules} from "../../types.js";
import {ValidatorEndpointDependencies} from "./types.js";

export function buildPrepareSyncCommitteeSubnets(
{network, metrics}: ApiModules,
{notWhileSyncing}: ValidatorEndpointDependencies
): ServerApi<routes.validator.Api>["prepareSyncCommitteeSubnets"] {
/**
* POST `/eth/v1/validator/sync_committee_subscriptions`
*
* Subscribe to a number of sync committee subnets.
* Sync committees are not present in phase0, but are required for Altair networks.
* Subscribing to sync committee subnets is an action performed by VC to enable network participation in Altair networks,
* and only required if the VC has an active validator in an active sync committee.
*
* https://github.com/ethereum/beacon-APIs/pull/136
*/
return async function prepareSyncCommitteeSubnets(subscriptions) {
notWhileSyncing();

// A `validatorIndex` can be in multiple subnets, so compute the CommitteeSubscription with double for loop
const subs: CommitteeSubscription[] = [];
for (const sub of subscriptions) {
for (const committeeIndex of sub.syncCommitteeIndices) {
const subnet = Math.floor(committeeIndex / SYNC_COMMITTEE_SUBNET_SIZE);
subs.push({
validatorIndex: sub.validatorIndex,
subnet: subnet,
// Subscribe until the end of `untilEpoch`: https://github.com/ethereum/beacon-APIs/pull/136#issuecomment-840315097
slot: computeStartSlotAtEpoch(sub.untilEpoch + 1),
isAggregator: true,
});
}
}

await network.prepareSyncCommitteeSubnets(subs);

if (metrics) {
for (const subscription of subscriptions) {
metrics.registerLocalValidatorInSyncCommittee(subscription.validatorIndex, subscription.untilEpoch);
}
}
};
}
Loading
Loading